diff --git a/src/bsonc.rs b/src/bsonc.rs index b42984a..8fd8800 100644 --- a/src/bsonc.rs +++ b/src/bsonc.rs @@ -17,6 +17,10 @@ pub struct Bsonc { } impl Bsonc { + pub fn new() -> Bsonc { + Bsonc::from_ptr(unsafe { bindings::bson_new() }) + } + pub fn from_ptr(inner: *const bindings::bson_t) -> Bsonc { assert!(!inner.is_null()); Bsonc { inner: inner as *mut bindings::bson_t } @@ -94,6 +98,10 @@ impl Bsonc { pub fn inner(&self) -> *const bindings::bson_t { self.inner } + + pub fn mut_inner(&mut self) -> *mut bindings::bson_t { + self.inner as *mut bindings::bson_t + } } impl fmt::Debug for Bsonc { diff --git a/src/bulk_operation.rs b/src/bulk_operation.rs new file mode 100644 index 0000000..0b698b0 --- /dev/null +++ b/src/bulk_operation.rs @@ -0,0 +1,373 @@ +extern crate libc; +extern crate mongo_c_driver_wrapper; +extern crate bson; + +use mongo_c_driver_wrapper::bindings; +use bson::Document; + +use super::BsoncError; +use super::bsonc::Bsonc; +use super::collection::Collection; + +use super::Result; + +pub struct BulkOperation<'a> { + _collection: &'a Collection<'a>, + inner: *mut bindings::mongoc_bulk_operation_t +} + +impl<'a> BulkOperation<'a> { + pub fn new( + collection: &'a Collection<'a>, + inner: *mut bindings::mongoc_bulk_operation_t + ) -> BulkOperation<'a> { + assert!(!inner.is_null()); + BulkOperation { + _collection: collection, + inner: inner + } + } + + /// Queue an insert of a single document into a bulk operation. + /// The insert is not performed until `execute` is called. + /// + /// See: http://api.mongodb.org/c/current/mongoc_bulk_operation_insert.html + pub fn insert( + &self, + document: &Document + ) -> Result<()> { + assert!(!self.inner.is_null()); + unsafe { + bindings::mongoc_bulk_operation_insert( + self.inner, + try!(Bsonc::from_document(&document)).inner() + ) + } + Ok(()) + } + + /// Queue removal of al documents matching selector into a bulk operation. + /// The removal is not performed until `execute` is called. + /// + /// See: http://api.mongodb.org/c/current/mongoc_bulk_operation_remove.html + pub fn remove( + &self, + selector: &Document + ) -> Result<()> { + assert!(!self.inner.is_null()); + unsafe { + bindings::mongoc_bulk_operation_remove( + self.inner, + try!(Bsonc::from_document(&selector)).inner() + ) + } + Ok(()) + } + + /// Queue removal of a single document into a bulk operation. + /// The removal is not performed until `execute` is called. + /// + /// See: http://api.mongodb.org/c/current/mongoc_bulk_operation_remove_one.html + pub fn remove_one( + &self, + selector: &Document + ) -> Result<()> { + assert!(!self.inner.is_null()); + unsafe { + bindings::mongoc_bulk_operation_remove_one( + self.inner, + try!(Bsonc::from_document(&selector)).inner() + ) + } + Ok(()) + } + + /// Queue replacement of a single document into a bulk operation. + /// The replacement is not performed until `execute` is called. + /// + /// See: http://api.mongodb.org/c/current/mongoc_bulk_operation_remove_one.html + pub fn replace_one( + &self, + selector: &Document, + document: &Document, + upsert: bool + ) -> Result<()> { + assert!(!self.inner.is_null()); + unsafe { + bindings::mongoc_bulk_operation_replace_one( + self.inner, + try!(Bsonc::from_document(&selector)).inner(), + try!(Bsonc::from_document(&document)).inner(), + upsert as u8 + ) + } + Ok(()) + } + + /// Queue update of a single documents into a bulk operation. + /// The update is not performed until `execute` is called. + /// + /// TODO: document must only contain fields whose key starts + /// with $, these is no error handling for this. + /// + /// See: http://api.mongodb.org/c/current/mongoc_bulk_operation_update_one.html + pub fn update_one( + &self, + selector: &Document, + document: &Document, + upsert: bool + ) -> Result<()> { + assert!(!self.inner.is_null()); + unsafe { + bindings::mongoc_bulk_operation_update_one( + self.inner, + try!(Bsonc::from_document(&selector)).inner(), + try!(Bsonc::from_document(&document)).inner(), + upsert as u8 + ) + } + Ok(()) + } + + /// Queue update of multiple documents into a bulk operation. + /// The update is not performed until `execute` is called. + /// + /// TODO: document must only contain fields whose key starts + /// with $, these is no error handling for this. + /// + /// See: http://api.mongodb.org/c/current/mongoc_bulk_operation_update_one.html + pub fn update( + &self, + selector: &Document, + document: &Document, + upsert: bool + ) -> Result<()> { + assert!(!self.inner.is_null()); + unsafe { + bindings::mongoc_bulk_operation_update( + self.inner, + try!(Bsonc::from_document(&selector)).inner(), + try!(Bsonc::from_document(&document)).inner(), + upsert as u8 + ) + } + Ok(()) + } + + /// This function executes all operations queued into this bulk operation. + /// If ordered was set true, forward progress will be stopped upon the first error. + /// + /// This function takes ownership because it is not possible to execute a bulk operation + /// multiple times. + /// + /// Returns a document with an overview of the bulk operation if successfull. + /// + /// See: http://api.mongodb.org/c/current/mongoc_bulk_operation_execute.html + pub fn execute(self) -> Result { + // Bsonc to store the reply + let mut reply = Bsonc::new(); + // Empty error that might be filled + let mut error = BsoncError::empty(); + + // Execute the operation. This returns a non-zero hint of the peer node on + // success, otherwise 0 and error is set. + let return_value = unsafe { + bindings::mongoc_bulk_operation_execute( + self.inner, + reply.mut_inner(), + error.mut_inner() + ) + }; + + if return_value != 0 { + match reply.as_document() { + Ok(document) => return Ok(document), + Err(error) => return Err(error.into()) + } + } else { + Err(error.into()) + } + } +} + +impl<'a> Drop for BulkOperation<'a> { + fn drop(&mut self) { + assert!(!self.inner.is_null()); + unsafe { + bindings::mongoc_bulk_operation_destroy(self.inner); + } + } +} + +#[cfg(test)] +mod tests { + use bson; + use super::super::uri::Uri; + use super::super::client::ClientPool; + + #[test] + fn test_execute_error() { + let uri = Uri::new("mongodb://localhost:27017/"); + let pool = ClientPool::new(uri); + let client = pool.pop(); + let collection = client.get_collection("rust_driver_test", "bulk_operation_error"); + let bulk_operation = collection.create_bulk_operation(None); + + let result = bulk_operation.execute(); + assert!(result.is_err()); + + let error_message = format!("{:?}", result.err().unwrap()); + assert_eq!(error_message, "MongoError (BsoncError: Cannot do an empty bulk write)"); + } + + #[test] + fn test_insert_remove_replace_update() { + let uri = Uri::new("mongodb://localhost:27017/"); + let pool = ClientPool::new(uri); + let client = pool.pop(); + let mut collection = client.get_collection("rust_driver_test", "bulk_operation_insert"); + collection.drop().unwrap_or(()); + + // Insert 5 documents + { + let bulk_operation = collection.create_bulk_operation(None); + + let mut document = bson::Document::new(); + document.insert("key_1".to_string(), bson::Bson::String("Value 1".to_string())); + document.insert("key_2".to_string(), bson::Bson::String("Value 2".to_string())); + for _ in 0..5 { + bulk_operation.insert(&document).unwrap(); + } + + let result = bulk_operation.execute(); + assert!(result.is_ok()); + + assert_eq!( + result.ok().unwrap().get("nInserted").unwrap().to_json(), + bson::Bson::I32(5).to_json() + ); + assert_eq!(5, collection.count(&bson::Document::new(), None).unwrap()); + } + + let query = bson::Document::new(); + + let mut update_document = bson::Document::new(); + let mut set = bson::Document::new(); + set.insert("key_1".to_string(), bson::Bson::String("Value update".to_string())); + update_document.insert("$set".to_string(), bson::Bson::Document(set)); + + // Update one + { + let bulk_operation = collection.create_bulk_operation(None); + bulk_operation.update_one( + &query, + &update_document, + false + ).unwrap(); + + let result = bulk_operation.execute(); + println!("{:?}", result); + assert!(result.is_ok()); + + assert_eq!( + result.ok().unwrap().get("nModified").unwrap().to_json(), + bson::Bson::I32(1).to_json() + ); + + let first_document = collection.find(&bson::Document::new(), None).unwrap().next().unwrap().unwrap(); + assert_eq!( + first_document.get("key_1").unwrap().to_json(), + bson::Bson::String("Value update".to_string()).to_json() + ); + // Make sure it was updated, it should have other keys + assert!(first_document.get("key_2").is_some()); + } + + // Update all + { + let bulk_operation = collection.create_bulk_operation(None); + bulk_operation.update( + &query, + &update_document, + false + ).unwrap(); + + let result = bulk_operation.execute(); + println!("{:?}", result); + assert!(result.is_ok()); + + assert_eq!( + result.ok().unwrap().get("nModified").unwrap().to_json(), + bson::Bson::I32(4).to_json() + ); + + collection.find(&bson::Document::new(), None).unwrap().next().unwrap().unwrap(); + let second_document = collection.find(&bson::Document::new(), None).unwrap().next().unwrap().unwrap(); + assert_eq!( + second_document.get("key_1").unwrap().to_json(), + bson::Bson::String("Value update".to_string()).to_json() + ); + // Make sure it was updated, it should have other keys + assert!(second_document.get("key_2").is_some()); + } + + // Replace one + { + let mut replace_document = bson::Document::new(); + replace_document.insert("key_1".to_string(), bson::Bson::String("Value replace".to_string())); + + let bulk_operation = collection.create_bulk_operation(None); + bulk_operation.replace_one( + &query, + &replace_document, + false + ).unwrap(); + + let result = bulk_operation.execute(); + assert!(result.is_ok()); + + assert_eq!( + result.ok().unwrap().get("nModified").unwrap().to_json(), + bson::Bson::I32(1).to_json() + ); + + let first_document = collection.find(&bson::Document::new(), None).unwrap().next().unwrap().unwrap(); + assert_eq!( + first_document.get("key_1").unwrap().to_json(), + bson::Bson::String("Value replace".to_string()).to_json() + ); + // Make sure it was replaced, it shouldn't have other keys + assert!(first_document.get("key_2").is_none()); + } + + // Remove one + { + let bulk_operation = collection.create_bulk_operation(None); + bulk_operation.remove_one(&query).unwrap(); + + let result = bulk_operation.execute(); + assert!(result.is_ok()); + + assert_eq!( + result.ok().unwrap().get("nRemoved").unwrap().to_json(), + bson::Bson::I32(1).to_json() + ); + assert_eq!(4, collection.count(&query, None).unwrap()); + } + + // Remove all remaining documents + { + let bulk_operation = collection.create_bulk_operation(None); + bulk_operation.remove(&query).unwrap(); + + let result = bulk_operation.execute(); + assert!(result.is_ok()); + + assert_eq!( + result.ok().unwrap().get("nRemoved").unwrap().to_json(), + bson::Bson::I32(4).to_json() + ); + assert_eq!(0, collection.count(&query, None).unwrap()); + } + } +} diff --git a/src/collection.rs b/src/collection.rs index 6b7cd39..adbe158 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -9,6 +9,7 @@ use bson::{Bson,Document}; use super::Result; use super::{BsoncError,InvalidParamsError}; use super::bsonc::Bsonc; +use super::bulk_operation::BulkOperation; use super::client::Client; use super::cursor; use super::cursor::{Cursor,TailingCursor}; @@ -27,6 +28,20 @@ pub struct Collection<'a> { inner: *mut bindings::mongoc_collection_t } +pub struct BulkOperationOptions { + pub ordered: bool, + pub write_concern: WriteConcern +} + +impl BulkOperationOptions { + pub fn default() -> BulkOperationOptions { + BulkOperationOptions { + ordered: false, + write_concern: WriteConcern::new() + } + } +} + pub struct CountOptions { pub query_flags: Flags, pub skip: u32, @@ -160,6 +175,26 @@ impl<'a> Collection<'a> { } } + pub fn create_bulk_operation( + &'a self, + options: Option<&BulkOperationOptions> + ) -> BulkOperation<'a> { + assert!(!self.inner.is_null()); + + let default_options = BulkOperationOptions::default(); + let options = options.unwrap_or(&default_options); + + let inner = unsafe { + bindings::mongoc_collection_create_bulk_operation( + self.inner, + options.ordered as u8, + options.write_concern.inner() + ) + }; + + BulkOperation::new(self, inner) + } + pub fn drop(&mut self) -> Result<()> { assert!(!self.inner.is_null()); let mut error = BsoncError::empty(); diff --git a/src/cursor.rs b/src/cursor.rs index cda042d..43bb06c 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -111,9 +111,8 @@ impl<'a> Iterator for Cursor<'a> { assert!(!bson_ptr.is_null()); // Parse and return bson document. - let bsonc = bsonc::Bsonc::from_ptr(bson_ptr); - let document = bsonc.as_document(); - match document { + let bsonc = bsonc::Bsonc::from_ptr(bson_ptr); + match bsonc.as_document() { Ok(document) => return Some(Ok(document)), Err(error) => return Some(Err(error.into())) } diff --git a/src/lib.rs b/src/lib.rs index e42fb87..f5b1c1d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ use std::sync::{Once,ONCE_INIT}; use mongo_c_driver_wrapper::bindings; pub mod bsonc; +pub mod bulk_operation; pub mod client; pub mod collection; pub mod cursor;