From b12196a41e5755e50fede13d2cd037413e9bb891 Mon Sep 17 00:00:00 2001 From: Thijs Cadier Date: Fri, 26 Jun 2015 23:14:15 +0200 Subject: [PATCH] Support for tailing queries Add a tail convenience method that returns a tailing cursor that implements the recommended way of tailing and reconnecting in it's iterator implementation. Also move optional arguments to separate type that can optionally be added to the various methods. --- src/collection.rs | 314 ++++++++++++++++++++++++++++------------------ src/cursor.rs | 177 ++++++++++++++++++-------- src/flags.rs | 25 +++- 3 files changed, 337 insertions(+), 179 deletions(-) diff --git a/src/collection.rs b/src/collection.rs index 5c62117..6b7cd39 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -4,14 +4,14 @@ use std::borrow::Cow; use mongo_c_driver_wrapper::bindings; -use bson::Document; +use bson::{Bson,Document}; use super::Result; use super::{BsoncError,InvalidParamsError}; use super::bsonc::Bsonc; use super::client::Client; use super::cursor; -use super::cursor::Cursor; +use super::cursor::{Cursor,TailingCursor}; use super::database::Database; use super::flags::{Flags,FlagsValue,InsertFlag,QueryFlag,RemoveFlag}; use super::write_concern::WriteConcern; @@ -27,6 +27,90 @@ pub struct Collection<'a> { inner: *mut bindings::mongoc_collection_t } +pub struct CountOptions { + pub query_flags: Flags, + pub skip: u32, + pub limit: u32, + pub opts: Option, + pub read_prefs: Option +} + +impl CountOptions { + pub fn default() -> CountOptions { + CountOptions { + query_flags: Flags::new(), + skip: 0, + limit: 0, + opts: None, + read_prefs: None + } + } +} + +pub struct FindOptions { + pub query_flags: Flags, + pub skip: u32, + pub limit: u32, + pub batch_size: u32, + pub fields: Option, + pub read_prefs: Option +} + +impl FindOptions { + pub fn default() -> FindOptions { + FindOptions { + query_flags: Flags::new(), + skip: 0, + limit: 0, + batch_size: 0, + fields: None, + read_prefs: None + } + } +} + +pub struct InsertOptions { + pub insert_flags: Flags, + pub write_concern: WriteConcern +} + +impl InsertOptions { + pub fn default() -> InsertOptions { + InsertOptions { + insert_flags: Flags::new(), + write_concern: WriteConcern::new() + } + } +} + +pub struct RemoveOptions { + pub remove_flags: Flags, + pub write_concern: WriteConcern +} + +impl RemoveOptions { + pub fn default() -> RemoveOptions { + RemoveOptions { + remove_flags: Flags::new(), + write_concern: WriteConcern::new() + } + } +} + +pub struct TailOptions { + pub wait_time_ms: u32, + pub max_retries: u32 +} + +impl TailOptions { + pub fn default() -> TailOptions { + TailOptions { + wait_time_ms: 500, + max_retries: 5 + } + } +} + impl<'a> Collection<'a> { pub fn new( created_by: CreatedBy<'a>, @@ -39,31 +123,30 @@ impl<'a> Collection<'a> { } } - pub fn count_with_options( + pub fn count( &self, - query_flags: &Flags, - query: &Document, - skip: u32, - limit: u32, - opts: Option<&Document>, - read_prefs: Option<&ReadPrefs> + query: &Document, + options: Option<&CountOptions> ) -> Result { assert!(!self.inner.is_null()); + let default_options = CountOptions::default(); + let options = options.unwrap_or(&default_options); + let mut error = BsoncError::empty(); let count = unsafe { bindings::mongoc_collection_count_with_opts( self.inner, - query_flags.flags(), + options.query_flags.flags(), try!(Bsonc::from_document(query)).inner(), - skip as i64, - limit as i64, - match opts { - Some(o) => try!(Bsonc::from_document(o)).inner(), + options.skip as i64, + options.limit as i64, + match options.opts { + Some(ref o) => try!(Bsonc::from_document(o)).inner(), None => ptr::null() }, - match read_prefs { - Some(prefs) => prefs.inner(), + match options.read_prefs { + Some(ref prefs) => prefs.inner(), None => ptr::null() }, error.mut_inner() @@ -77,20 +160,6 @@ impl<'a> Collection<'a> { } } - pub fn count( - &self, - query: &Document - ) -> Result { - self.count_with_options( - &Flags::new(), - query, - 0, - 0, - None, - None - ) - } - pub fn drop(&mut self) -> Result<()> { assert!(!self.inner.is_null()); let mut error = BsoncError::empty(); @@ -107,35 +176,33 @@ impl<'a> Collection<'a> { Ok(()) } - pub fn find_with_options( + pub fn find( &'a self, - query_flags: &Flags, - skip: u32, - limit: u32, - batch_size: u32, - query: &Document, - fields: Option<&Document>, - read_prefs: Option<&ReadPrefs> + query: &Document, + options: Option<&FindOptions> ) -> Result> { assert!(!self.inner.is_null()); + let default_options = FindOptions::default(); + let options = options.unwrap_or(&default_options); + let inner = unsafe { bindings::mongoc_collection_find( self.inner, - query_flags.flags(), - skip, - limit, - batch_size, + options.query_flags.flags(), + options.skip, + options.limit, + options.batch_size, try!(Bsonc::from_document(query)).inner(), - match fields { - Some(f) => { + match options.fields { + Some(ref f) => { try!(Bsonc::from_document(f)).inner() }, None => ptr::null() }, - match read_prefs { - Some(prefs) => prefs.inner(), - None => ptr::null() + match options.read_prefs { + Some(ref prefs) => prefs.inner(), + None => ptr::null() } ) }; @@ -147,21 +214,6 @@ impl<'a> Collection<'a> { Ok(Cursor::new(cursor::CreatedBy::Collection(self), inner)) } - pub fn find( - &'a self, - query: &Document - ) -> Result> { - self.find_with_options( - &Flags::new(), - 0, - 0, - 0, - &query, - None, - None - ) - } - pub fn get_name(&self) -> Cow { let cstr = unsafe { CStr::from_ptr(bindings::mongoc_collection_get_name(self.inner)) @@ -169,21 +221,23 @@ impl<'a> Collection<'a> { String::from_utf8_lossy(cstr.to_bytes()) } - pub fn insert_with_options( + pub fn insert( &'a self, - insert_flags: &Flags, - document: &Document, - write_concern: &WriteConcern + document: &Document, + options: Option<&InsertOptions> ) -> Result<()> { assert!(!self.inner.is_null()); + let default_options = InsertOptions::default(); + let options = options.unwrap_or(&default_options); + let mut error = BsoncError::empty(); let success = unsafe { bindings::mongoc_collection_insert( self.inner, - insert_flags.flags(), + options.insert_flags.flags(), try!(Bsonc::from_document(&document)).inner(), - write_concern.inner(), + options.write_concern.inner(), error.mut_inner() ) }; @@ -195,29 +249,23 @@ impl<'a> Collection<'a> { } } - pub fn insert(&'a self, document: &Document) -> Result<()> { - self.insert_with_options( - &Flags::new(), - document, - &WriteConcern::new() - ) - } - - pub fn remove_with_options( + pub fn remove( &self, - remove_flags: &Flags, - selector: &Document, - write_concern: &WriteConcern + selector: &Document, + options: Option<&RemoveOptions> ) -> Result<()> { assert!(!self.inner.is_null()); + let default_options = RemoveOptions::default(); + let options = options.unwrap_or(&default_options); + let mut error = BsoncError::empty(); let success = unsafe { bindings::mongoc_collection_remove( self.inner, - remove_flags.flags(), + options.remove_flags.flags(), try!(Bsonc::from_document(&selector)).inner(), - write_concern.inner(), + options.write_concern.inner(), error.mut_inner() ) }; @@ -229,24 +277,16 @@ impl<'a> Collection<'a> { } } - pub fn remove( - &self, - selector: &Document - ) -> Result<()> { - self.remove_with_options( - &Flags::new(), - selector, - &WriteConcern::new() - ) - } - - pub fn save_with_options( + pub fn save( &self, document: &Document, - write_concern: &WriteConcern + write_concern: Option<&WriteConcern> ) -> Result<()> { assert!(!self.inner.is_null()); + let default_write_concern = WriteConcern::new(); + let write_concern = write_concern.unwrap_or(&default_write_concern); + let mut error = BsoncError::empty(); let success = unsafe { bindings::mongoc_collection_save( @@ -264,13 +304,37 @@ impl<'a> Collection<'a> { } } - pub fn save( - &self, - document: &Document, - ) -> Result<()> { - self.save_with_options( - document, - &WriteConcern::new() + /// Tails a query + /// + /// Takes ownership of query and options because they could be + /// modified and reused when the connections is disrupted and + /// we need to restart the query. The query will be placed in a + /// $query key, so the function can add configuration needed for + /// proper tailing. + /// + /// The query is executed when iterating, so this function doesn't + /// return a result itself. + /// + /// The necessary flags to configure a tailing query will be added + /// to the configured flags if you choose to supply options. + pub fn tail( + &'a self, + query: Document, + find_options: Option, + tail_options: Option + ) -> TailingCursor<'a> { + let mut query_with_options = Document::new(); + query_with_options.insert( + "$query".to_string(), + Bson::Document(query) + ); + query_with_options.insert("$natural".to_string(), Bson::I32(1)); + + TailingCursor::new( + self, + query_with_options, + find_options.unwrap_or(FindOptions::default()), + tail_options.unwrap_or(TailOptions::default()) ) } } @@ -304,23 +368,23 @@ mod tests { let mut document = bson::Document::new(); document.insert("key_1".to_string(), bson::Bson::String("Value 1".to_string())); document.insert("key_2".to_string(), bson::Bson::String("Value 2".to_string())); - assert!(collection.insert(&document).is_ok()); + assert!(collection.insert(&document, None).is_ok()); let mut second_document = bson::Document::new(); second_document.insert("key_1".to_string(), bson::Bson::String("Value 3".to_string())); - assert!(collection.insert(&second_document).is_ok()); + assert!(collection.insert(&second_document, None).is_ok()); let query = bson::Document::new(); // Count the documents in the collection - assert_eq!(2, collection.count(&query).unwrap()); + assert_eq!(2, collection.count(&query, None).unwrap()); // Find the documents assert_eq!( - collection.find(&document).unwrap().next().unwrap().unwrap().get("key_1").unwrap().to_json(), + collection.find(&document, None).unwrap().next().unwrap().unwrap().get("key_1").unwrap().to_json(), bson::Bson::String("Value 1".to_string()).to_json() ); - let mut found_document = collection.find(&second_document).unwrap().next().unwrap().unwrap(); + let mut found_document = collection.find(&second_document, None).unwrap().next().unwrap().unwrap(); assert_eq!( found_document.get("key_1").unwrap().to_json(), bson::Bson::String("Value 3".to_string()).to_json() @@ -328,42 +392,42 @@ mod tests { // Update the second document found_document.insert("key_1".to_string(), bson::Bson::String("Value 4".to_string())); - assert!(collection.save(&found_document).is_ok()); + assert!(collection.save(&found_document, None).is_ok()); // Reload and check value - let found_document = collection.find(&found_document).unwrap().next().unwrap().unwrap(); + let found_document = collection.find(&found_document, None).unwrap().next().unwrap().unwrap(); assert_eq!( found_document.get("key_1").unwrap().to_json(), bson::Bson::String("Value 4".to_string()).to_json() ); // Remove one - assert!(collection.remove(&found_document).is_ok()); + assert!(collection.remove(&found_document, None).is_ok()); // Count again - assert_eq!(1, collection.count(&query).unwrap()); + assert_eq!(1, collection.count(&query, None).unwrap()); // Find the document and see if it has the keys we expect { - let mut cursor = collection.find(&query).unwrap(); + let mut cursor = collection.find(&query, None).unwrap(); let next_document = cursor.next().unwrap().unwrap(); assert!(next_document.contains_key("key_1")); assert!(next_document.contains_key("key_2")); } // Find the document with fields set - let mut fields = bson::Document::new(); - fields.insert("key_1".to_string(), bson::Bson::Boolean(true)); { - let mut cursor = collection.find_with_options( - &flags::Flags::new(), - 0, - 0, - 0, - &query, - Some(&fields), - None - ).unwrap(); + let mut fields = bson::Document::new(); + fields.insert("key_1".to_string(), bson::Bson::Boolean(true)); + let options = super::FindOptions { + query_flags: flags::Flags::new(), + skip: 0, + limit: 0, + batch_size: 0, + fields: Some(fields), + read_prefs: None + }; + let mut cursor = collection.find(&query, Some(&options)).unwrap(); let next_document = cursor.next().unwrap().unwrap(); assert!(next_document.contains_key("key_1")); assert!(!next_document.contains_key("key_2")); @@ -371,7 +435,7 @@ mod tests { // Drop collection collection.drop().unwrap(); - assert_eq!(0, collection.count(&query).unwrap()); + assert_eq!(0, collection.count(&query, None).unwrap()); } #[test] @@ -382,7 +446,7 @@ mod tests { let collection = client.get_collection("rust_driver_test", "items"); let document = bson::Document::new(); - let result = collection.insert(&document); + let result = collection.insert(&document, None); assert!(result.is_err()); assert_eq!( "MongoError (BsoncError: Failed to connect to target host: localhost:27018)", diff --git a/src/cursor.rs b/src/cursor.rs index 35197c7..69dc1f2 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -3,12 +3,13 @@ use std::ptr; use std::thread; use mongo_c_driver_wrapper::bindings; -use bson::Document; +use bson::{Bson,Document}; use super::BsoncError; use super::bsonc; use super::client::Client; -use super::collection::Collection; +use super::flags::QueryFlag; +use super::collection::{Collection,FindOptions,TailOptions}; use super::Result; @@ -18,8 +19,10 @@ pub enum CreatedBy<'a> { } pub struct Cursor<'a> { - _created_by: CreatedBy<'a>, - inner: *mut bindings::mongoc_cursor_t, + _created_by: CreatedBy<'a>, + inner: *mut bindings::mongoc_cursor_t, + tailing: bool, + tail_wait_time_ms: u32 } impl<'a> Cursor<'a> { @@ -29,8 +32,10 @@ impl<'a> Cursor<'a> { ) -> Cursor<'a> { assert!(!inner.is_null()); Cursor { - _created_by: created_by, - inner: inner + _created_by: created_by, + inner: inner, + tailing: false, + tail_wait_time_ms: 0 } } @@ -88,15 +93,14 @@ impl<'a> Iterator for Cursor<'a> { if success == 0 { if error.is_empty() { - if self.is_alive() { - // Since there was no error and the cursor is - // alive this must be a tailing cursor and we'll - // wait for 500ms before trying again. - thread::sleep_ms(500); + if self.tailing && self.is_alive() { + // Since there was no error, this is a tailing cursor + // and the cursor is alive we'll wait before trying again. + thread::sleep_ms(self.tail_wait_time_ms); continue; } else { - // No result, no error and cursor not alive anymore - // so we must be at the end. + // No result, no error and cursor not tailing so we must + // be at the end. return None } } else { @@ -126,11 +130,108 @@ impl<'a> Drop for Cursor<'a> { } } +/// Cursor that will reconnect and resume tailing a collection +/// at the right point if the connection fails. +pub struct TailingCursor<'a> { + collection: &'a Collection<'a>, + query: Document, + find_options: FindOptions, + tail_options: TailOptions, + cursor: Option>, + last_seen_id: Option<[u8; 12]>, + retry_count: u32 +} + +impl<'a> TailingCursor<'a> { + pub fn new( + collection: &'a Collection<'a>, + query: Document, + find_options: FindOptions, + tail_options: TailOptions + ) -> TailingCursor<'a> { + // Add flags to make query tailable + let mut find_options = find_options; + find_options.query_flags.add(QueryFlag::TailableCursor); + find_options.query_flags.add(QueryFlag::AwaitData); + + TailingCursor { + collection: collection, + query: query, + find_options: find_options, + tail_options: tail_options, + cursor: None, + last_seen_id: None, + retry_count: 0 + } + } +} + +impl<'a> Iterator for TailingCursor<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + loop { + // Start a scope so we're free to set the cursor to None at the end. + { + if self.cursor.is_none() { + // Add the last seen id to the query if it's present. + match self.last_seen_id { + Some(id) => { + let mut gt_id = Document::new(); + gt_id.insert("$gt".to_string(), Bson::ObjectId(id)); + self.query.insert("_id".to_string(), Bson::Document(gt_id)); + }, + None => () + }; + + // Set the cursor + self.cursor = match self.collection.find(&self.query, Some(&self.find_options)) { + Ok(mut c) => { + c.tailing = true; + c.tail_wait_time_ms = self.tail_options.wait_time_ms; + Some(c) + }, + Err(e) => return Some(Err(e.into())) + }; + } + + let cursor = match self.cursor { + Some(ref mut c) => c, + None => panic!("It should be impossible to not have a cursor here") + }; + + match cursor.next() { + Some(next_result) => { + match next_result { + Ok(next) => { + // This was successfull, so reset retry count and return result. + self.retry_count = 0; + return Some(Ok(next)) + }, + Err(e) => { + // Retry if we haven't exceeded the maximum number of retries. + if self.retry_count >= self.tail_options.max_retries { + return Some(Err(e.into())) + } + } + } + }, + None => () + }; + } + + // We made it to the end, so we weren't able to get the next item from + // the cursor. We need to reconnect in the next iteration of the loop. + self.retry_count += 1; + self.cursor = None; + } + } +} + #[cfg(test)] mod tests { use std::thread; use bson; - use super::super::flags; use super::super::uri::Uri; use super::super::client::ClientPool; use super::super::Result; @@ -145,13 +246,13 @@ mod tests { let mut document = bson::Document::new(); document.insert("key".to_string(), bson::Bson::String("value".to_string())); - collection.drop().unwrap(); + collection.drop().unwrap_or(()); for _ in 0..10 { - assert!(collection.insert(&document).is_ok()); + assert!(collection.insert(&document, None).is_ok()); } let query = bson::Document::new(); - let cursor = collection.find(&query).unwrap(); + let cursor = collection.find(&query, None).unwrap(); assert!(cursor.is_alive()); @@ -178,20 +279,8 @@ mod tests { let capped_collection = database.create_collection("capped", Some(&options)).unwrap(); let normal_collection = database.create_collection("not_capped", None).unwrap(); - let mut flags = flags::Flags::new(); - flags.add(flags::QueryFlag::TailableCursor); - flags.add(flags::QueryFlag::AwaitData); - // Try to tail on a normal collection - let failing_cursor = normal_collection.find_with_options( - &flags, - 0, - 0, - 0, - &bson::Document::new(), - None, - None - ).unwrap(); + let failing_cursor = normal_collection.tail(bson::Document::new(), None, None); let failing_result = failing_cursor.into_iter().next().unwrap(); assert!(failing_result.is_err()); assert_eq!( @@ -201,32 +290,20 @@ mod tests { let mut document = bson::Document::new(); document.insert("key_1".to_string(), bson::Bson::String("Value 1".to_string())); - // Insert some documents into the collection - for _ in 0..5 { - capped_collection.insert(&document).unwrap(); - } + // Insert a first document into the collection + capped_collection.insert(&document, None).unwrap(); // Start a tailing iterator in a thread let cloned_pool = pool.clone(); let guard = thread::spawn(move || { let client = cloned_pool.pop(); let collection = client.get_collection("rust_test", "capped"); - - let cursor = collection.find_with_options( - &flags, - 0, - 0, - 0, - &bson::Document::new(), - None, - None - ).unwrap(); - + let cursor = collection.tail(bson::Document::new(), None, None); let mut counter = 0usize; for result in cursor.into_iter() { assert!(result.is_ok()); counter += 1; - if counter == 15 { + if counter == 25 { break; } } @@ -234,16 +311,16 @@ mod tests { }); // Wait for the thread to boot up - thread::sleep_ms(200); + thread::sleep_ms(250); // Insert some more documents into the collection - for _ in 0..10 { - capped_collection.insert(&document).unwrap(); + for _ in 0..25 { + capped_collection.insert(&document, None).unwrap(); } // See if they appeared while iterating the cursor // The for loop returns whenever we get more than // 15 results. - assert_eq!(15, guard.join().unwrap()); + assert_eq!(25, guard.join().unwrap()); } } diff --git a/src/flags.rs b/src/flags.rs index 8af7e2b..c1bc9a2 100644 --- a/src/flags.rs +++ b/src/flags.rs @@ -1,18 +1,20 @@ use mongo_c_driver_wrapper::bindings; +use std::collections::BTreeSet; + pub struct Flags { - flags: Vec + flags: BTreeSet } -impl Flags { +impl Flags where T: Ord { pub fn new() -> Flags { Flags { - flags: Vec::new() + flags: BTreeSet::new() } } pub fn add(&mut self, flag: T) { - self.flags.push(flag); + self.flags.insert(flag); } } @@ -22,6 +24,7 @@ pub trait FlagsValue { /// Flags for insert operations /// See: http://api.mongodb.org/c/current/mongoc_insert_flags_t.html +#[derive(Eq,PartialEq,Ord,PartialOrd)] pub enum InsertFlag { ContinueOnError, NoValidate @@ -44,6 +47,7 @@ impl FlagsValue for Flags { /// Flags for query operations /// See: http://api.mongodb.org/c/current/mongoc_query_flags_t.html +#[derive(Eq,PartialEq,Ord,PartialOrd)] pub enum QueryFlag { TailableCursor, SlaveOk, @@ -76,6 +80,7 @@ impl FlagsValue for Flags { /// Flags for deletion operations /// See: http://api.mongodb.org/c/1.1.8/mongoc_remove_flags_t.html +#[derive(Eq,PartialEq,Ord,PartialOrd)] pub enum RemoveFlag { SingleRemove } @@ -102,6 +107,7 @@ mod tests { flags.add(super::InsertFlag::ContinueOnError); assert_eq!(1, flags.flags()); + flags.add(super::InsertFlag::NoValidate); flags.add(super::InsertFlag::NoValidate); assert_eq!(31, flags.flags()); } @@ -114,7 +120,18 @@ mod tests { flags.add(super::QueryFlag::TailableCursor); assert_eq!(2, flags.flags()); + flags.add(super::QueryFlag::Partial); flags.add(super::QueryFlag::Partial); assert_eq!(130, flags.flags()); } + + #[test] + pub fn test_remove_flags() { + let mut flags = super::Flags::new(); + assert_eq!(0, flags.flags()); + + flags.add(super::RemoveFlag::SingleRemove); + flags.add(super::RemoveFlag::SingleRemove); + assert_eq!(1, flags.flags()); + } }