diff --git a/src/collection.rs b/src/collection.rs index 5c62117..6b7cd39 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -4,14 +4,14 @@ use std::borrow::Cow; use mongo_c_driver_wrapper::bindings; -use bson::Document; +use bson::{Bson,Document}; use super::Result; use super::{BsoncError,InvalidParamsError}; use super::bsonc::Bsonc; use super::client::Client; use super::cursor; -use super::cursor::Cursor; +use super::cursor::{Cursor,TailingCursor}; use super::database::Database; use super::flags::{Flags,FlagsValue,InsertFlag,QueryFlag,RemoveFlag}; use super::write_concern::WriteConcern; @@ -27,6 +27,90 @@ pub struct Collection<'a> { inner: *mut bindings::mongoc_collection_t } +pub struct CountOptions { + pub query_flags: Flags, + pub skip: u32, + pub limit: u32, + pub opts: Option, + pub read_prefs: Option +} + +impl CountOptions { + pub fn default() -> CountOptions { + CountOptions { + query_flags: Flags::new(), + skip: 0, + limit: 0, + opts: None, + read_prefs: None + } + } +} + +pub struct FindOptions { + pub query_flags: Flags, + pub skip: u32, + pub limit: u32, + pub batch_size: u32, + pub fields: Option, + pub read_prefs: Option +} + +impl FindOptions { + pub fn default() -> FindOptions { + FindOptions { + query_flags: Flags::new(), + skip: 0, + limit: 0, + batch_size: 0, + fields: None, + read_prefs: None + } + } +} + +pub struct InsertOptions { + pub insert_flags: Flags, + pub write_concern: WriteConcern +} + +impl InsertOptions { + pub fn default() -> InsertOptions { + InsertOptions { + insert_flags: Flags::new(), + write_concern: WriteConcern::new() + } + } +} + +pub struct RemoveOptions { + pub remove_flags: Flags, + pub write_concern: WriteConcern +} + +impl RemoveOptions { + pub fn default() -> RemoveOptions { + RemoveOptions { + remove_flags: Flags::new(), + write_concern: WriteConcern::new() + } + } +} + +pub struct TailOptions { + pub wait_time_ms: u32, + pub max_retries: u32 +} + +impl TailOptions { + pub fn default() -> TailOptions { + TailOptions { + wait_time_ms: 500, + max_retries: 5 + } + } +} + impl<'a> Collection<'a> { pub fn new( created_by: CreatedBy<'a>, @@ -39,31 +123,30 @@ impl<'a> Collection<'a> { } } - pub fn count_with_options( + pub fn count( &self, - query_flags: &Flags, - query: &Document, - skip: u32, - limit: u32, - opts: Option<&Document>, - read_prefs: Option<&ReadPrefs> + query: &Document, + options: Option<&CountOptions> ) -> Result { assert!(!self.inner.is_null()); + let default_options = CountOptions::default(); + let options = options.unwrap_or(&default_options); + let mut error = BsoncError::empty(); let count = unsafe { bindings::mongoc_collection_count_with_opts( self.inner, - query_flags.flags(), + options.query_flags.flags(), try!(Bsonc::from_document(query)).inner(), - skip as i64, - limit as i64, - match opts { - Some(o) => try!(Bsonc::from_document(o)).inner(), + options.skip as i64, + options.limit as i64, + match options.opts { + Some(ref o) => try!(Bsonc::from_document(o)).inner(), None => ptr::null() }, - match read_prefs { - Some(prefs) => prefs.inner(), + match options.read_prefs { + Some(ref prefs) => prefs.inner(), None => ptr::null() }, error.mut_inner() @@ -77,20 +160,6 @@ impl<'a> Collection<'a> { } } - pub fn count( - &self, - query: &Document - ) -> Result { - self.count_with_options( - &Flags::new(), - query, - 0, - 0, - None, - None - ) - } - pub fn drop(&mut self) -> Result<()> { assert!(!self.inner.is_null()); let mut error = BsoncError::empty(); @@ -107,35 +176,33 @@ impl<'a> Collection<'a> { Ok(()) } - pub fn find_with_options( + pub fn find( &'a self, - query_flags: &Flags, - skip: u32, - limit: u32, - batch_size: u32, - query: &Document, - fields: Option<&Document>, - read_prefs: Option<&ReadPrefs> + query: &Document, + options: Option<&FindOptions> ) -> Result> { assert!(!self.inner.is_null()); + let default_options = FindOptions::default(); + let options = options.unwrap_or(&default_options); + let inner = unsafe { bindings::mongoc_collection_find( self.inner, - query_flags.flags(), - skip, - limit, - batch_size, + options.query_flags.flags(), + options.skip, + options.limit, + options.batch_size, try!(Bsonc::from_document(query)).inner(), - match fields { - Some(f) => { + match options.fields { + Some(ref f) => { try!(Bsonc::from_document(f)).inner() }, None => ptr::null() }, - match read_prefs { - Some(prefs) => prefs.inner(), - None => ptr::null() + match options.read_prefs { + Some(ref prefs) => prefs.inner(), + None => ptr::null() } ) }; @@ -147,21 +214,6 @@ impl<'a> Collection<'a> { Ok(Cursor::new(cursor::CreatedBy::Collection(self), inner)) } - pub fn find( - &'a self, - query: &Document - ) -> Result> { - self.find_with_options( - &Flags::new(), - 0, - 0, - 0, - &query, - None, - None - ) - } - pub fn get_name(&self) -> Cow { let cstr = unsafe { CStr::from_ptr(bindings::mongoc_collection_get_name(self.inner)) @@ -169,21 +221,23 @@ impl<'a> Collection<'a> { String::from_utf8_lossy(cstr.to_bytes()) } - pub fn insert_with_options( + pub fn insert( &'a self, - insert_flags: &Flags, - document: &Document, - write_concern: &WriteConcern + document: &Document, + options: Option<&InsertOptions> ) -> Result<()> { assert!(!self.inner.is_null()); + let default_options = InsertOptions::default(); + let options = options.unwrap_or(&default_options); + let mut error = BsoncError::empty(); let success = unsafe { bindings::mongoc_collection_insert( self.inner, - insert_flags.flags(), + options.insert_flags.flags(), try!(Bsonc::from_document(&document)).inner(), - write_concern.inner(), + options.write_concern.inner(), error.mut_inner() ) }; @@ -195,29 +249,23 @@ impl<'a> Collection<'a> { } } - pub fn insert(&'a self, document: &Document) -> Result<()> { - self.insert_with_options( - &Flags::new(), - document, - &WriteConcern::new() - ) - } - - pub fn remove_with_options( + pub fn remove( &self, - remove_flags: &Flags, - selector: &Document, - write_concern: &WriteConcern + selector: &Document, + options: Option<&RemoveOptions> ) -> Result<()> { assert!(!self.inner.is_null()); + let default_options = RemoveOptions::default(); + let options = options.unwrap_or(&default_options); + let mut error = BsoncError::empty(); let success = unsafe { bindings::mongoc_collection_remove( self.inner, - remove_flags.flags(), + options.remove_flags.flags(), try!(Bsonc::from_document(&selector)).inner(), - write_concern.inner(), + options.write_concern.inner(), error.mut_inner() ) }; @@ -229,24 +277,16 @@ impl<'a> Collection<'a> { } } - pub fn remove( - &self, - selector: &Document - ) -> Result<()> { - self.remove_with_options( - &Flags::new(), - selector, - &WriteConcern::new() - ) - } - - pub fn save_with_options( + pub fn save( &self, document: &Document, - write_concern: &WriteConcern + write_concern: Option<&WriteConcern> ) -> Result<()> { assert!(!self.inner.is_null()); + let default_write_concern = WriteConcern::new(); + let write_concern = write_concern.unwrap_or(&default_write_concern); + let mut error = BsoncError::empty(); let success = unsafe { bindings::mongoc_collection_save( @@ -264,13 +304,37 @@ impl<'a> Collection<'a> { } } - pub fn save( - &self, - document: &Document, - ) -> Result<()> { - self.save_with_options( - document, - &WriteConcern::new() + /// Tails a query + /// + /// Takes ownership of query and options because they could be + /// modified and reused when the connections is disrupted and + /// we need to restart the query. The query will be placed in a + /// $query key, so the function can add configuration needed for + /// proper tailing. + /// + /// The query is executed when iterating, so this function doesn't + /// return a result itself. + /// + /// The necessary flags to configure a tailing query will be added + /// to the configured flags if you choose to supply options. + pub fn tail( + &'a self, + query: Document, + find_options: Option, + tail_options: Option + ) -> TailingCursor<'a> { + let mut query_with_options = Document::new(); + query_with_options.insert( + "$query".to_string(), + Bson::Document(query) + ); + query_with_options.insert("$natural".to_string(), Bson::I32(1)); + + TailingCursor::new( + self, + query_with_options, + find_options.unwrap_or(FindOptions::default()), + tail_options.unwrap_or(TailOptions::default()) ) } } @@ -304,23 +368,23 @@ mod tests { let mut document = bson::Document::new(); document.insert("key_1".to_string(), bson::Bson::String("Value 1".to_string())); document.insert("key_2".to_string(), bson::Bson::String("Value 2".to_string())); - assert!(collection.insert(&document).is_ok()); + assert!(collection.insert(&document, None).is_ok()); let mut second_document = bson::Document::new(); second_document.insert("key_1".to_string(), bson::Bson::String("Value 3".to_string())); - assert!(collection.insert(&second_document).is_ok()); + assert!(collection.insert(&second_document, None).is_ok()); let query = bson::Document::new(); // Count the documents in the collection - assert_eq!(2, collection.count(&query).unwrap()); + assert_eq!(2, collection.count(&query, None).unwrap()); // Find the documents assert_eq!( - collection.find(&document).unwrap().next().unwrap().unwrap().get("key_1").unwrap().to_json(), + collection.find(&document, None).unwrap().next().unwrap().unwrap().get("key_1").unwrap().to_json(), bson::Bson::String("Value 1".to_string()).to_json() ); - let mut found_document = collection.find(&second_document).unwrap().next().unwrap().unwrap(); + let mut found_document = collection.find(&second_document, None).unwrap().next().unwrap().unwrap(); assert_eq!( found_document.get("key_1").unwrap().to_json(), bson::Bson::String("Value 3".to_string()).to_json() @@ -328,42 +392,42 @@ mod tests { // Update the second document found_document.insert("key_1".to_string(), bson::Bson::String("Value 4".to_string())); - assert!(collection.save(&found_document).is_ok()); + assert!(collection.save(&found_document, None).is_ok()); // Reload and check value - let found_document = collection.find(&found_document).unwrap().next().unwrap().unwrap(); + let found_document = collection.find(&found_document, None).unwrap().next().unwrap().unwrap(); assert_eq!( found_document.get("key_1").unwrap().to_json(), bson::Bson::String("Value 4".to_string()).to_json() ); // Remove one - assert!(collection.remove(&found_document).is_ok()); + assert!(collection.remove(&found_document, None).is_ok()); // Count again - assert_eq!(1, collection.count(&query).unwrap()); + assert_eq!(1, collection.count(&query, None).unwrap()); // Find the document and see if it has the keys we expect { - let mut cursor = collection.find(&query).unwrap(); + let mut cursor = collection.find(&query, None).unwrap(); let next_document = cursor.next().unwrap().unwrap(); assert!(next_document.contains_key("key_1")); assert!(next_document.contains_key("key_2")); } // Find the document with fields set - let mut fields = bson::Document::new(); - fields.insert("key_1".to_string(), bson::Bson::Boolean(true)); { - let mut cursor = collection.find_with_options( - &flags::Flags::new(), - 0, - 0, - 0, - &query, - Some(&fields), - None - ).unwrap(); + let mut fields = bson::Document::new(); + fields.insert("key_1".to_string(), bson::Bson::Boolean(true)); + let options = super::FindOptions { + query_flags: flags::Flags::new(), + skip: 0, + limit: 0, + batch_size: 0, + fields: Some(fields), + read_prefs: None + }; + let mut cursor = collection.find(&query, Some(&options)).unwrap(); let next_document = cursor.next().unwrap().unwrap(); assert!(next_document.contains_key("key_1")); assert!(!next_document.contains_key("key_2")); @@ -371,7 +435,7 @@ mod tests { // Drop collection collection.drop().unwrap(); - assert_eq!(0, collection.count(&query).unwrap()); + assert_eq!(0, collection.count(&query, None).unwrap()); } #[test] @@ -382,7 +446,7 @@ mod tests { let collection = client.get_collection("rust_driver_test", "items"); let document = bson::Document::new(); - let result = collection.insert(&document); + let result = collection.insert(&document, None); assert!(result.is_err()); assert_eq!( "MongoError (BsoncError: Failed to connect to target host: localhost:27018)", diff --git a/src/cursor.rs b/src/cursor.rs index 35197c7..69dc1f2 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -3,12 +3,13 @@ use std::ptr; use std::thread; use mongo_c_driver_wrapper::bindings; -use bson::Document; +use bson::{Bson,Document}; use super::BsoncError; use super::bsonc; use super::client::Client; -use super::collection::Collection; +use super::flags::QueryFlag; +use super::collection::{Collection,FindOptions,TailOptions}; use super::Result; @@ -18,8 +19,10 @@ pub enum CreatedBy<'a> { } pub struct Cursor<'a> { - _created_by: CreatedBy<'a>, - inner: *mut bindings::mongoc_cursor_t, + _created_by: CreatedBy<'a>, + inner: *mut bindings::mongoc_cursor_t, + tailing: bool, + tail_wait_time_ms: u32 } impl<'a> Cursor<'a> { @@ -29,8 +32,10 @@ impl<'a> Cursor<'a> { ) -> Cursor<'a> { assert!(!inner.is_null()); Cursor { - _created_by: created_by, - inner: inner + _created_by: created_by, + inner: inner, + tailing: false, + tail_wait_time_ms: 0 } } @@ -88,15 +93,14 @@ impl<'a> Iterator for Cursor<'a> { if success == 0 { if error.is_empty() { - if self.is_alive() { - // Since there was no error and the cursor is - // alive this must be a tailing cursor and we'll - // wait for 500ms before trying again. - thread::sleep_ms(500); + if self.tailing && self.is_alive() { + // Since there was no error, this is a tailing cursor + // and the cursor is alive we'll wait before trying again. + thread::sleep_ms(self.tail_wait_time_ms); continue; } else { - // No result, no error and cursor not alive anymore - // so we must be at the end. + // No result, no error and cursor not tailing so we must + // be at the end. return None } } else { @@ -126,11 +130,108 @@ impl<'a> Drop for Cursor<'a> { } } +/// Cursor that will reconnect and resume tailing a collection +/// at the right point if the connection fails. +pub struct TailingCursor<'a> { + collection: &'a Collection<'a>, + query: Document, + find_options: FindOptions, + tail_options: TailOptions, + cursor: Option>, + last_seen_id: Option<[u8; 12]>, + retry_count: u32 +} + +impl<'a> TailingCursor<'a> { + pub fn new( + collection: &'a Collection<'a>, + query: Document, + find_options: FindOptions, + tail_options: TailOptions + ) -> TailingCursor<'a> { + // Add flags to make query tailable + let mut find_options = find_options; + find_options.query_flags.add(QueryFlag::TailableCursor); + find_options.query_flags.add(QueryFlag::AwaitData); + + TailingCursor { + collection: collection, + query: query, + find_options: find_options, + tail_options: tail_options, + cursor: None, + last_seen_id: None, + retry_count: 0 + } + } +} + +impl<'a> Iterator for TailingCursor<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + loop { + // Start a scope so we're free to set the cursor to None at the end. + { + if self.cursor.is_none() { + // Add the last seen id to the query if it's present. + match self.last_seen_id { + Some(id) => { + let mut gt_id = Document::new(); + gt_id.insert("$gt".to_string(), Bson::ObjectId(id)); + self.query.insert("_id".to_string(), Bson::Document(gt_id)); + }, + None => () + }; + + // Set the cursor + self.cursor = match self.collection.find(&self.query, Some(&self.find_options)) { + Ok(mut c) => { + c.tailing = true; + c.tail_wait_time_ms = self.tail_options.wait_time_ms; + Some(c) + }, + Err(e) => return Some(Err(e.into())) + }; + } + + let cursor = match self.cursor { + Some(ref mut c) => c, + None => panic!("It should be impossible to not have a cursor here") + }; + + match cursor.next() { + Some(next_result) => { + match next_result { + Ok(next) => { + // This was successfull, so reset retry count and return result. + self.retry_count = 0; + return Some(Ok(next)) + }, + Err(e) => { + // Retry if we haven't exceeded the maximum number of retries. + if self.retry_count >= self.tail_options.max_retries { + return Some(Err(e.into())) + } + } + } + }, + None => () + }; + } + + // We made it to the end, so we weren't able to get the next item from + // the cursor. We need to reconnect in the next iteration of the loop. + self.retry_count += 1; + self.cursor = None; + } + } +} + #[cfg(test)] mod tests { use std::thread; use bson; - use super::super::flags; use super::super::uri::Uri; use super::super::client::ClientPool; use super::super::Result; @@ -145,13 +246,13 @@ mod tests { let mut document = bson::Document::new(); document.insert("key".to_string(), bson::Bson::String("value".to_string())); - collection.drop().unwrap(); + collection.drop().unwrap_or(()); for _ in 0..10 { - assert!(collection.insert(&document).is_ok()); + assert!(collection.insert(&document, None).is_ok()); } let query = bson::Document::new(); - let cursor = collection.find(&query).unwrap(); + let cursor = collection.find(&query, None).unwrap(); assert!(cursor.is_alive()); @@ -178,20 +279,8 @@ mod tests { let capped_collection = database.create_collection("capped", Some(&options)).unwrap(); let normal_collection = database.create_collection("not_capped", None).unwrap(); - let mut flags = flags::Flags::new(); - flags.add(flags::QueryFlag::TailableCursor); - flags.add(flags::QueryFlag::AwaitData); - // Try to tail on a normal collection - let failing_cursor = normal_collection.find_with_options( - &flags, - 0, - 0, - 0, - &bson::Document::new(), - None, - None - ).unwrap(); + let failing_cursor = normal_collection.tail(bson::Document::new(), None, None); let failing_result = failing_cursor.into_iter().next().unwrap(); assert!(failing_result.is_err()); assert_eq!( @@ -201,32 +290,20 @@ mod tests { let mut document = bson::Document::new(); document.insert("key_1".to_string(), bson::Bson::String("Value 1".to_string())); - // Insert some documents into the collection - for _ in 0..5 { - capped_collection.insert(&document).unwrap(); - } + // Insert a first document into the collection + capped_collection.insert(&document, None).unwrap(); // Start a tailing iterator in a thread let cloned_pool = pool.clone(); let guard = thread::spawn(move || { let client = cloned_pool.pop(); let collection = client.get_collection("rust_test", "capped"); - - let cursor = collection.find_with_options( - &flags, - 0, - 0, - 0, - &bson::Document::new(), - None, - None - ).unwrap(); - + let cursor = collection.tail(bson::Document::new(), None, None); let mut counter = 0usize; for result in cursor.into_iter() { assert!(result.is_ok()); counter += 1; - if counter == 15 { + if counter == 25 { break; } } @@ -234,16 +311,16 @@ mod tests { }); // Wait for the thread to boot up - thread::sleep_ms(200); + thread::sleep_ms(250); // Insert some more documents into the collection - for _ in 0..10 { - capped_collection.insert(&document).unwrap(); + for _ in 0..25 { + capped_collection.insert(&document, None).unwrap(); } // See if they appeared while iterating the cursor // The for loop returns whenever we get more than // 15 results. - assert_eq!(15, guard.join().unwrap()); + assert_eq!(25, guard.join().unwrap()); } } diff --git a/src/flags.rs b/src/flags.rs index 8af7e2b..c1bc9a2 100644 --- a/src/flags.rs +++ b/src/flags.rs @@ -1,18 +1,20 @@ use mongo_c_driver_wrapper::bindings; +use std::collections::BTreeSet; + pub struct Flags { - flags: Vec + flags: BTreeSet } -impl Flags { +impl Flags where T: Ord { pub fn new() -> Flags { Flags { - flags: Vec::new() + flags: BTreeSet::new() } } pub fn add(&mut self, flag: T) { - self.flags.push(flag); + self.flags.insert(flag); } } @@ -22,6 +24,7 @@ pub trait FlagsValue { /// Flags for insert operations /// See: http://api.mongodb.org/c/current/mongoc_insert_flags_t.html +#[derive(Eq,PartialEq,Ord,PartialOrd)] pub enum InsertFlag { ContinueOnError, NoValidate @@ -44,6 +47,7 @@ impl FlagsValue for Flags { /// Flags for query operations /// See: http://api.mongodb.org/c/current/mongoc_query_flags_t.html +#[derive(Eq,PartialEq,Ord,PartialOrd)] pub enum QueryFlag { TailableCursor, SlaveOk, @@ -76,6 +80,7 @@ impl FlagsValue for Flags { /// Flags for deletion operations /// See: http://api.mongodb.org/c/1.1.8/mongoc_remove_flags_t.html +#[derive(Eq,PartialEq,Ord,PartialOrd)] pub enum RemoveFlag { SingleRemove } @@ -102,6 +107,7 @@ mod tests { flags.add(super::InsertFlag::ContinueOnError); assert_eq!(1, flags.flags()); + flags.add(super::InsertFlag::NoValidate); flags.add(super::InsertFlag::NoValidate); assert_eq!(31, flags.flags()); } @@ -114,7 +120,18 @@ mod tests { flags.add(super::QueryFlag::TailableCursor); assert_eq!(2, flags.flags()); + flags.add(super::QueryFlag::Partial); flags.add(super::QueryFlag::Partial); assert_eq!(130, flags.flags()); } + + #[test] + pub fn test_remove_flags() { + let mut flags = super::Flags::new(); + assert_eq!(0, flags.flags()); + + flags.add(super::RemoveFlag::SingleRemove); + flags.add(super::RemoveFlag::SingleRemove); + assert_eq!(1, flags.flags()); + } }