Move bulk operation to collection and add docs

pull/13/head
Thijs Cadier 9 years ago
parent 283a94ce18
commit 13e0d1662f

@ -1,190 +0,0 @@
use mongoc::bindings;
use bson::Document;
use super::BsoncError;
use super::bsonc::Bsonc;
use super::collection::Collection;
use super::Result;
/// `BulkOperation` provides an abstraction for submitting multiple write operations as a single batch.
///
/// Create a `BulkOperation` by calling `create_bulk_operation` on a `Collection`. After adding all of
/// the write operations using the functions on this struct, `execute` to execute the operation on
/// the server. After executing the bulk operation is consumed and cannot be used anymore.
pub struct BulkOperation<'a> {
_collection: &'a Collection<'a>,
inner: *mut bindings::mongoc_bulk_operation_t
}
impl<'a>BulkOperation<'a> {
/// Create a new bulk operation, only for internal usage.
#[doc(hidden)]
pub unsafe fn new(
collection: &'a Collection<'a>,
inner: *mut bindings::mongoc_bulk_operation_t
) -> BulkOperation<'a> {
assert!(!inner.is_null());
BulkOperation {
_collection: collection,
inner: inner
}
}
/// Queue an insert of a single document into a bulk operation.
/// The insert is not performed until `execute` is called.
pub fn insert(
&self,
document: &Document
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_insert(
self.inner,
try!(Bsonc::from_document(&document)).inner()
)
}
Ok(())
}
/// Queue removal of all documents matching the provided selector into a bulk operation.
/// The removal is not performed until `execute` is called.
pub fn remove(
&self,
selector: &Document
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_remove(
self.inner,
try!(Bsonc::from_document(&selector)).inner()
)
}
Ok(())
}
/// Queue removal of a single document into a bulk operation.
/// The removal is not performed until `execute` is called.
pub fn remove_one(
&self,
selector: &Document
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_remove_one(
self.inner,
try!(Bsonc::from_document(&selector)).inner()
)
}
Ok(())
}
/// Queue replacement of a single document into a bulk operation.
/// The replacement is not performed until `execute` is called.
pub fn replace_one(
&self,
selector: &Document,
document: &Document,
upsert: bool
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_replace_one(
self.inner,
try!(Bsonc::from_document(&selector)).inner(),
try!(Bsonc::from_document(&document)).inner(),
upsert as u8
)
}
Ok(())
}
/// Queue update of a single documents into a bulk operation.
/// The update is not performed until `execute` is called.
///
/// TODO: document must only contain fields whose key starts
/// with $, these is no error handling for this.
pub fn update_one(
&self,
selector: &Document,
document: &Document,
upsert: bool
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_update_one(
self.inner,
try!(Bsonc::from_document(&selector)).inner(),
try!(Bsonc::from_document(&document)).inner(),
upsert as u8
)
}
Ok(())
}
/// Queue update of multiple documents into a bulk operation.
/// The update is not performed until `execute` is called.
///
/// TODO: document must only contain fields whose key starts
/// with $, these is no error handling for this.
pub fn update(
&self,
selector: &Document,
document: &Document,
upsert: bool
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_update(
self.inner,
try!(Bsonc::from_document(&selector)).inner(),
try!(Bsonc::from_document(&document)).inner(),
upsert as u8
)
}
Ok(())
}
/// This function executes all operations queued into this bulk operation.
/// If ordered was set true, forward progress will be stopped upon the first error.
///
/// This function takes ownership because it is not possible to execute a bulk operation
/// multiple times.
///
/// Returns a document with an overview of the bulk operation if successfull.
pub fn execute(self) -> Result<Document> {
// Bsonc to store the reply
let mut reply = Bsonc::new();
// Empty error that might be filled
let mut error = BsoncError::empty();
// Execute the operation. This returns a non-zero hint of the peer node on
// success, otherwise 0 and error is set.
let return_value = unsafe {
bindings::mongoc_bulk_operation_execute(
self.inner,
reply.mut_inner(),
error.mut_inner()
)
};
if return_value != 0 {
match reply.as_document() {
Ok(document) => return Ok(document),
Err(error) => return Err(error.into())
}
} else {
Err(error.into())
}
}
}
impl<'a> Drop for BulkOperation<'a> {
fn drop(&mut self) {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_destroy(self.inner);
}
}
}

@ -12,7 +12,6 @@ use super::Result;
use super::CommandAndFindOptions; use super::CommandAndFindOptions;
use super::{BsoncError,InvalidParamsError}; use super::{BsoncError,InvalidParamsError};
use super::bsonc::Bsonc; use super::bsonc::Bsonc;
use super::bulk_operation::BulkOperation;
use super::client::Client; use super::client::Client;
use super::cursor; use super::cursor;
use super::cursor::{Cursor,TailingCursor}; use super::cursor::{Cursor,TailingCursor};
@ -33,12 +32,17 @@ pub struct Collection<'a> {
inner: *mut bindings::mongoc_collection_t inner: *mut bindings::mongoc_collection_t
} }
/// Options to configure a `BulkOperation`.
pub struct BulkOperationOptions { pub struct BulkOperationOptions {
/// If the operations must be performed in order.
pub ordered: bool, pub ordered: bool,
/// `WriteConcern` to use
pub write_concern: WriteConcern pub write_concern: WriteConcern
} }
impl BulkOperationOptions { impl BulkOperationOptions {
/// Default options that are used if no options are specified
/// when creating a `BulkOperation`.
pub fn default() -> BulkOperationOptions { pub fn default() -> BulkOperationOptions {
BulkOperationOptions { BulkOperationOptions {
ordered: false, ordered: false,
@ -307,7 +311,7 @@ impl<'a> Collection<'a> {
) )
}; };
unsafe { BulkOperation::new(self, inner) } BulkOperation::new(self, inner)
} }
pub fn drop(&mut self) -> Result<()> { pub fn drop(&mut self) -> Result<()> {
@ -607,3 +611,183 @@ impl<'a> Drop for Collection<'a> {
} }
} }
} }
/// Provides an abstraction for submitting multiple write operations as a single batch.
///
/// Create a `BulkOperation` by calling `create_bulk_operation` on a `Collection`. After adding all of
/// the write operations using the functions on this struct, `execute` to execute the operation on
/// the server. After executing the bulk operation is consumed and cannot be used anymore.
pub struct BulkOperation<'a> {
_collection: &'a Collection<'a>,
inner: *mut bindings::mongoc_bulk_operation_t
}
impl<'a>BulkOperation<'a> {
/// Create a new bulk operation, only for internal usage.
fn new(
collection: &'a Collection<'a>,
inner: *mut bindings::mongoc_bulk_operation_t
) -> BulkOperation<'a> {
assert!(!inner.is_null());
BulkOperation {
_collection: collection,
inner: inner
}
}
/// Queue an insert of a single document into a bulk operation.
/// The insert is not performed until `execute` is called.
pub fn insert(
&self,
document: &Document
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_insert(
self.inner,
try!(Bsonc::from_document(&document)).inner()
)
}
Ok(())
}
/// Queue removal of all documents matching the provided selector into a bulk operation.
/// The removal is not performed until `execute` is called.
pub fn remove(
&self,
selector: &Document
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_remove(
self.inner,
try!(Bsonc::from_document(&selector)).inner()
)
}
Ok(())
}
/// Queue removal of a single document into a bulk operation.
/// The removal is not performed until `execute` is called.
pub fn remove_one(
&self,
selector: &Document
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_remove_one(
self.inner,
try!(Bsonc::from_document(&selector)).inner()
)
}
Ok(())
}
/// Queue replacement of a single document into a bulk operation.
/// The replacement is not performed until `execute` is called.
pub fn replace_one(
&self,
selector: &Document,
document: &Document,
upsert: bool
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_replace_one(
self.inner,
try!(Bsonc::from_document(&selector)).inner(),
try!(Bsonc::from_document(&document)).inner(),
upsert as u8
)
}
Ok(())
}
/// Queue update of a single documents into a bulk operation.
/// The update is not performed until `execute` is called.
///
/// TODO: document must only contain fields whose key starts
/// with $, these is no error handling for this.
pub fn update_one(
&self,
selector: &Document,
document: &Document,
upsert: bool
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_update_one(
self.inner,
try!(Bsonc::from_document(&selector)).inner(),
try!(Bsonc::from_document(&document)).inner(),
upsert as u8
)
}
Ok(())
}
/// Queue update of multiple documents into a bulk operation.
/// The update is not performed until `execute` is called.
///
/// TODO: document must only contain fields whose key starts
/// with $, these is no error handling for this.
pub fn update(
&self,
selector: &Document,
document: &Document,
upsert: bool
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_update(
self.inner,
try!(Bsonc::from_document(&selector)).inner(),
try!(Bsonc::from_document(&document)).inner(),
upsert as u8
)
}
Ok(())
}
/// This function executes all operations queued into this bulk operation.
/// If ordered was set true, forward progress will be stopped upon the first error.
///
/// This function takes ownership because it is not possible to execute a bulk operation
/// multiple times.
///
/// Returns a document with an overview of the bulk operation if successfull.
pub fn execute(self) -> Result<Document> {
// Bsonc to store the reply
let mut reply = Bsonc::new();
// Empty error that might be filled
let mut error = BsoncError::empty();
// Execute the operation. This returns a non-zero hint of the peer node on
// success, otherwise 0 and error is set.
let return_value = unsafe {
bindings::mongoc_bulk_operation_execute(
self.inner,
reply.mut_inner(),
error.mut_inner()
)
};
if return_value != 0 {
match reply.as_document() {
Ok(document) => return Ok(document),
Err(error) => return Err(error.into())
}
} else {
Err(error.into())
}
}
}
impl<'a> Drop for BulkOperation<'a> {
fn drop(&mut self) {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_destroy(self.inner);
}
}
}

@ -36,7 +36,6 @@ use std::sync::{Once,ONCE_INIT};
use mongoc::bindings; use mongoc::bindings;
pub mod bulk_operation;
pub mod client; pub mod client;
pub mod collection; pub mod collection;
pub mod cursor; pub mod cursor;

Loading…
Cancel
Save