diff --git a/src/change_stream.rs b/src/change_stream.rs index 2b40276..2f822c5 100644 --- a/src/change_stream.rs +++ b/src/change_stream.rs @@ -13,18 +13,21 @@ use super::Result; pub struct ChangeStream<'a> { _collection: &'a Collection<'a>, - inner: *mut bindings::mongoc_change_stream_t + inner: *mut bindings::mongoc_change_stream_t, + timeout: bool } impl<'a> ChangeStream<'a> { #[doc(hidden)] pub fn new( _collection: &'a Collection<'a>, - inner: *mut bindings::mongoc_change_stream_t + inner: *mut bindings::mongoc_change_stream_t, + timeout: bool ) -> Self { Self { _collection, - inner + inner, + timeout } } @@ -51,27 +54,33 @@ impl<'a> Iterator for ChangeStream<'a> { let mut bson_ptr: *const bindings::bson_t = ptr::null(); - let success = unsafe { - bindings::mongoc_change_stream_next( - self.inner, - &mut bson_ptr - ) - }; - - if success == 1 { - assert!(!bson_ptr.is_null()); - - let bsonc = Bsonc::from_ptr(bson_ptr); - match bsonc.as_document() { - Ok(document) => return Some(Ok(document)), - Err(error) => return Some(Err(error.into())) - } - } else { - let error = self.error(); - if error.is_empty() { - None + loop { + let success = unsafe { + bindings::mongoc_change_stream_next( + self.inner, + &mut bson_ptr + ) + }; + + if success == 1 { + assert!(!bson_ptr.is_null()); + + let bsonc = Bsonc::from_ptr(bson_ptr); + match bsonc.as_document() { + Ok(document) => return Some(Ok(document)), + Err(error) => return Some(Err(error.into())) + } } else { - Some(Err(error.into())) + let error = self.error(); + if error.is_empty() { + if self.timeout { + return None; + } else { + continue; // do not exit from stream + } + } else { + return Some(Err(error.into())); + } } } diff --git a/src/collection.rs b/src/collection.rs index 180e7ae..218ad81 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -715,24 +715,30 @@ impl<'a> Collection<'a> { ) } - /// This function returns change stream for the colletion + /// This function returns change stream for the collection + /// if timeout is None, stream does not exit on timeout /// /// Returns `Stream` struct pub fn watch( &'a self, pipeline: &Document, - opts: &Document + opts: &Document, + timeout: Option ) -> Result> { + let mut new_opts = opts.clone(); + if let Some(timeout) = timeout { + new_opts.insert("maxAwaitTimeMS", timeout); + } + let inner = unsafe { bindings::mongoc_collection_watch( self.inner, try!(Bsonc::from_document(&pipeline)).inner(), - try!(Bsonc::from_document(&opts)).inner() + try!(Bsonc::from_document(&new_opts)).inner() ) }; - Ok(ChangeStream::new(self, inner)) + Ok(ChangeStream::new(self, inner, timeout.is_some())) } - } impl<'a> Drop for Collection<'a> { diff --git a/tests/change_stream.rs b/tests/change_stream.rs index d2ac0b7..86fdd65 100644 --- a/tests/change_stream.rs +++ b/tests/change_stream.rs @@ -17,7 +17,7 @@ fn test_change_stream() { let guard = thread::spawn(move || { let client = cloned_pool.pop(); let collection = client.get_collection("rust_driver_test", "change_stream"); - let stream = collection.watch(&doc!{"$match": {"$gte": 10}}, &doc!{"maxAwaitTimeMS": 1_000}).unwrap(); + let stream = collection.watch(&doc!{"$match": {"$gte": 10}}, &doc!{}, Some(1000)).unwrap(); let mut counter = 10; for x in stream {