WIP Collection bulk operation

pull/2/head
Thijs Cadier 10 years ago
parent 9cbe2b03c3
commit df9749c5c4

@ -17,6 +17,10 @@ pub struct Bsonc {
}
impl Bsonc {
pub fn new() -> Bsonc {
Bsonc::from_ptr(unsafe { bindings::bson_new() })
}
pub fn from_ptr(inner: *const bindings::bson_t) -> Bsonc {
assert!(!inner.is_null());
Bsonc { inner: inner as *mut bindings::bson_t }
@ -94,6 +98,10 @@ impl Bsonc {
pub fn inner(&self) -> *const bindings::bson_t {
self.inner
}
pub fn mut_inner(&mut self) -> *mut bindings::bson_t {
self.inner as *mut bindings::bson_t
}
}
impl fmt::Debug for Bsonc {

@ -0,0 +1,373 @@
extern crate libc;
extern crate mongo_c_driver_wrapper;
extern crate bson;
use mongo_c_driver_wrapper::bindings;
use bson::Document;
use super::BsoncError;
use super::bsonc::Bsonc;
use super::collection::Collection;
use super::Result;
pub struct BulkOperation<'a> {
_collection: &'a Collection<'a>,
inner: *mut bindings::mongoc_bulk_operation_t
}
impl<'a> BulkOperation<'a> {
pub fn new(
collection: &'a Collection<'a>,
inner: *mut bindings::mongoc_bulk_operation_t
) -> BulkOperation<'a> {
assert!(!inner.is_null());
BulkOperation {
_collection: collection,
inner: inner
}
}
/// Queue an insert of a single document into a bulk operation.
/// The insert is not performed until `execute` is called.
///
/// See: http://api.mongodb.org/c/current/mongoc_bulk_operation_insert.html
pub fn insert(
&self,
document: &Document
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_insert(
self.inner,
try!(Bsonc::from_document(&document)).inner()
)
}
Ok(())
}
/// Queue removal of al documents matching selector into a bulk operation.
/// The removal is not performed until `execute` is called.
///
/// See: http://api.mongodb.org/c/current/mongoc_bulk_operation_remove.html
pub fn remove(
&self,
selector: &Document
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_remove(
self.inner,
try!(Bsonc::from_document(&selector)).inner()
)
}
Ok(())
}
/// Queue removal of a single document into a bulk operation.
/// The removal is not performed until `execute` is called.
///
/// See: http://api.mongodb.org/c/current/mongoc_bulk_operation_remove_one.html
pub fn remove_one(
&self,
selector: &Document
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_remove_one(
self.inner,
try!(Bsonc::from_document(&selector)).inner()
)
}
Ok(())
}
/// Queue replacement of a single document into a bulk operation.
/// The replacement is not performed until `execute` is called.
///
/// See: http://api.mongodb.org/c/current/mongoc_bulk_operation_remove_one.html
pub fn replace_one(
&self,
selector: &Document,
document: &Document,
upsert: bool
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_replace_one(
self.inner,
try!(Bsonc::from_document(&selector)).inner(),
try!(Bsonc::from_document(&document)).inner(),
upsert as u8
)
}
Ok(())
}
/// Queue update of a single documents into a bulk operation.
/// The update is not performed until `execute` is called.
///
/// TODO: document must only contain fields whose key starts
/// with $, these is no error handling for this.
///
/// See: http://api.mongodb.org/c/current/mongoc_bulk_operation_update_one.html
pub fn update_one(
&self,
selector: &Document,
document: &Document,
upsert: bool
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_update_one(
self.inner,
try!(Bsonc::from_document(&selector)).inner(),
try!(Bsonc::from_document(&document)).inner(),
upsert as u8
)
}
Ok(())
}
/// Queue update of multiple documents into a bulk operation.
/// The update is not performed until `execute` is called.
///
/// TODO: document must only contain fields whose key starts
/// with $, these is no error handling for this.
///
/// See: http://api.mongodb.org/c/current/mongoc_bulk_operation_update_one.html
pub fn update(
&self,
selector: &Document,
document: &Document,
upsert: bool
) -> Result<()> {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_update(
self.inner,
try!(Bsonc::from_document(&selector)).inner(),
try!(Bsonc::from_document(&document)).inner(),
upsert as u8
)
}
Ok(())
}
/// This function executes all operations queued into this bulk operation.
/// If ordered was set true, forward progress will be stopped upon the first error.
///
/// This function takes ownership because it is not possible to execute a bulk operation
/// multiple times.
///
/// Returns a document with an overview of the bulk operation if successfull.
///
/// See: http://api.mongodb.org/c/current/mongoc_bulk_operation_execute.html
pub fn execute(self) -> Result<Document> {
// Bsonc to store the reply
let mut reply = Bsonc::new();
// Empty error that might be filled
let mut error = BsoncError::empty();
// Execute the operation. This returns a non-zero hint of the peer node on
// success, otherwise 0 and error is set.
let return_value = unsafe {
bindings::mongoc_bulk_operation_execute(
self.inner,
reply.mut_inner(),
error.mut_inner()
)
};
if return_value != 0 {
match reply.as_document() {
Ok(document) => return Ok(document),
Err(error) => return Err(error.into())
}
} else {
Err(error.into())
}
}
}
impl<'a> Drop for BulkOperation<'a> {
fn drop(&mut self) {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_bulk_operation_destroy(self.inner);
}
}
}
#[cfg(test)]
mod tests {
use bson;
use super::super::uri::Uri;
use super::super::client::ClientPool;
#[test]
fn test_execute_error() {
let uri = Uri::new("mongodb://localhost:27017/");
let pool = ClientPool::new(uri);
let client = pool.pop();
let collection = client.get_collection("rust_driver_test", "bulk_operation_error");
let bulk_operation = collection.create_bulk_operation(None);
let result = bulk_operation.execute();
assert!(result.is_err());
let error_message = format!("{:?}", result.err().unwrap());
assert_eq!(error_message, "MongoError (BsoncError: Cannot do an empty bulk write)");
}
#[test]
fn test_insert_remove_replace_update() {
let uri = Uri::new("mongodb://localhost:27017/");
let pool = ClientPool::new(uri);
let client = pool.pop();
let mut collection = client.get_collection("rust_driver_test", "bulk_operation_insert");
collection.drop().unwrap_or(());
// Insert 5 documents
{
let bulk_operation = collection.create_bulk_operation(None);
let mut document = bson::Document::new();
document.insert("key_1".to_string(), bson::Bson::String("Value 1".to_string()));
document.insert("key_2".to_string(), bson::Bson::String("Value 2".to_string()));
for _ in 0..5 {
bulk_operation.insert(&document).unwrap();
}
let result = bulk_operation.execute();
assert!(result.is_ok());
assert_eq!(
result.ok().unwrap().get("nInserted").unwrap().to_json(),
bson::Bson::I32(5).to_json()
);
assert_eq!(5, collection.count(&bson::Document::new(), None).unwrap());
}
let query = bson::Document::new();
let mut update_document = bson::Document::new();
let mut set = bson::Document::new();
set.insert("key_1".to_string(), bson::Bson::String("Value update".to_string()));
update_document.insert("$set".to_string(), bson::Bson::Document(set));
// Update one
{
let bulk_operation = collection.create_bulk_operation(None);
bulk_operation.update_one(
&query,
&update_document,
false
).unwrap();
let result = bulk_operation.execute();
println!("{:?}", result);
assert!(result.is_ok());
assert_eq!(
result.ok().unwrap().get("nModified").unwrap().to_json(),
bson::Bson::I32(1).to_json()
);
let first_document = collection.find(&bson::Document::new(), None).unwrap().next().unwrap().unwrap();
assert_eq!(
first_document.get("key_1").unwrap().to_json(),
bson::Bson::String("Value update".to_string()).to_json()
);
// Make sure it was updated, it should have other keys
assert!(first_document.get("key_2").is_some());
}
// Update all
{
let bulk_operation = collection.create_bulk_operation(None);
bulk_operation.update(
&query,
&update_document,
false
).unwrap();
let result = bulk_operation.execute();
println!("{:?}", result);
assert!(result.is_ok());
assert_eq!(
result.ok().unwrap().get("nModified").unwrap().to_json(),
bson::Bson::I32(4).to_json()
);
collection.find(&bson::Document::new(), None).unwrap().next().unwrap().unwrap();
let second_document = collection.find(&bson::Document::new(), None).unwrap().next().unwrap().unwrap();
assert_eq!(
second_document.get("key_1").unwrap().to_json(),
bson::Bson::String("Value update".to_string()).to_json()
);
// Make sure it was updated, it should have other keys
assert!(second_document.get("key_2").is_some());
}
// Replace one
{
let mut replace_document = bson::Document::new();
replace_document.insert("key_1".to_string(), bson::Bson::String("Value replace".to_string()));
let bulk_operation = collection.create_bulk_operation(None);
bulk_operation.replace_one(
&query,
&replace_document,
false
).unwrap();
let result = bulk_operation.execute();
assert!(result.is_ok());
assert_eq!(
result.ok().unwrap().get("nModified").unwrap().to_json(),
bson::Bson::I32(1).to_json()
);
let first_document = collection.find(&bson::Document::new(), None).unwrap().next().unwrap().unwrap();
assert_eq!(
first_document.get("key_1").unwrap().to_json(),
bson::Bson::String("Value replace".to_string()).to_json()
);
// Make sure it was replaced, it shouldn't have other keys
assert!(first_document.get("key_2").is_none());
}
// Remove one
{
let bulk_operation = collection.create_bulk_operation(None);
bulk_operation.remove_one(&query).unwrap();
let result = bulk_operation.execute();
assert!(result.is_ok());
assert_eq!(
result.ok().unwrap().get("nRemoved").unwrap().to_json(),
bson::Bson::I32(1).to_json()
);
assert_eq!(4, collection.count(&query, None).unwrap());
}
// Remove all remaining documents
{
let bulk_operation = collection.create_bulk_operation(None);
bulk_operation.remove(&query).unwrap();
let result = bulk_operation.execute();
assert!(result.is_ok());
assert_eq!(
result.ok().unwrap().get("nRemoved").unwrap().to_json(),
bson::Bson::I32(4).to_json()
);
assert_eq!(0, collection.count(&query, None).unwrap());
}
}
}

@ -9,6 +9,7 @@ use bson::{Bson,Document};
use super::Result;
use super::{BsoncError,InvalidParamsError};
use super::bsonc::Bsonc;
use super::bulk_operation::BulkOperation;
use super::client::Client;
use super::cursor;
use super::cursor::{Cursor,TailingCursor};
@ -27,6 +28,20 @@ pub struct Collection<'a> {
inner: *mut bindings::mongoc_collection_t
}
pub struct BulkOperationOptions {
pub ordered: bool,
pub write_concern: WriteConcern
}
impl BulkOperationOptions {
pub fn default() -> BulkOperationOptions {
BulkOperationOptions {
ordered: false,
write_concern: WriteConcern::new()
}
}
}
pub struct CountOptions {
pub query_flags: Flags<QueryFlag>,
pub skip: u32,
@ -160,6 +175,26 @@ impl<'a> Collection<'a> {
}
}
pub fn create_bulk_operation(
&'a self,
options: Option<&BulkOperationOptions>
) -> BulkOperation<'a> {
assert!(!self.inner.is_null());
let default_options = BulkOperationOptions::default();
let options = options.unwrap_or(&default_options);
let inner = unsafe {
bindings::mongoc_collection_create_bulk_operation(
self.inner,
options.ordered as u8,
options.write_concern.inner()
)
};
BulkOperation::new(self, inner)
}
pub fn drop(&mut self) -> Result<()> {
assert!(!self.inner.is_null());
let mut error = BsoncError::empty();

@ -111,9 +111,8 @@ impl<'a> Iterator for Cursor<'a> {
assert!(!bson_ptr.is_null());
// Parse and return bson document.
let bsonc = bsonc::Bsonc::from_ptr(bson_ptr);
let document = bsonc.as_document();
match document {
let bsonc = bsonc::Bsonc::from_ptr(bson_ptr);
match bsonc.as_document() {
Ok(document) => return Some(Ok(document)),
Err(error) => return Some(Err(error.into()))
}

@ -8,6 +8,7 @@ use std::sync::{Once,ONCE_INIT};
use mongo_c_driver_wrapper::bindings;
pub mod bsonc;
pub mod bulk_operation;
pub mod client;
pub mod collection;
pub mod cursor;

Loading…
Cancel
Save