added timeout argument to Collection::watch

pull/48/head
inv2004 7 years ago
parent cce22b84a5
commit f47fd8473b

@ -13,18 +13,21 @@ use super::Result;
pub struct ChangeStream<'a> { pub struct ChangeStream<'a> {
_collection: &'a Collection<'a>, _collection: &'a Collection<'a>,
inner: *mut bindings::mongoc_change_stream_t inner: *mut bindings::mongoc_change_stream_t,
timeout: bool
} }
impl<'a> ChangeStream<'a> { impl<'a> ChangeStream<'a> {
#[doc(hidden)] #[doc(hidden)]
pub fn new( pub fn new(
_collection: &'a Collection<'a>, _collection: &'a Collection<'a>,
inner: *mut bindings::mongoc_change_stream_t inner: *mut bindings::mongoc_change_stream_t,
timeout: bool
) -> Self { ) -> Self {
Self { Self {
_collection, _collection,
inner inner,
timeout
} }
} }
@ -51,6 +54,7 @@ impl<'a> Iterator for ChangeStream<'a> {
let mut bson_ptr: *const bindings::bson_t = ptr::null(); let mut bson_ptr: *const bindings::bson_t = ptr::null();
loop {
let success = unsafe { let success = unsafe {
bindings::mongoc_change_stream_next( bindings::mongoc_change_stream_next(
self.inner, self.inner,
@ -69,9 +73,14 @@ impl<'a> Iterator for ChangeStream<'a> {
} else { } else {
let error = self.error(); let error = self.error();
if error.is_empty() { if error.is_empty() {
None if self.timeout {
return None;
} else { } else {
Some(Err(error.into())) continue; // do not exit from stream
}
} else {
return Some(Err(error.into()));
}
} }
} }

@ -715,24 +715,30 @@ impl<'a> Collection<'a> {
) )
} }
/// This function returns change stream for the colletion /// This function returns change stream for the collection
/// if timeout is None, stream does not exit on timeout
/// ///
/// Returns `Stream` struct /// Returns `Stream` struct
pub fn watch( pub fn watch(
&'a self, &'a self,
pipeline: &Document, pipeline: &Document,
opts: &Document opts: &Document,
timeout: Option<u64>
) -> Result<ChangeStream<'a>> { ) -> Result<ChangeStream<'a>> {
let mut new_opts = opts.clone();
if let Some(timeout) = timeout {
new_opts.insert("maxAwaitTimeMS", timeout);
}
let inner = unsafe { let inner = unsafe {
bindings::mongoc_collection_watch( bindings::mongoc_collection_watch(
self.inner, self.inner,
try!(Bsonc::from_document(&pipeline)).inner(), try!(Bsonc::from_document(&pipeline)).inner(),
try!(Bsonc::from_document(&opts)).inner() try!(Bsonc::from_document(&new_opts)).inner()
) )
}; };
Ok(ChangeStream::new(self, inner)) Ok(ChangeStream::new(self, inner, timeout.is_some()))
} }
} }
impl<'a> Drop for Collection<'a> { impl<'a> Drop for Collection<'a> {

@ -17,7 +17,7 @@ fn test_change_stream() {
let guard = thread::spawn(move || { let guard = thread::spawn(move || {
let client = cloned_pool.pop(); let client = cloned_pool.pop();
let collection = client.get_collection("rust_driver_test", "change_stream"); let collection = client.get_collection("rust_driver_test", "change_stream");
let stream = collection.watch(&doc!{"$match": {"$gte": 10}}, &doc!{"maxAwaitTimeMS": 1_000}).unwrap(); let stream = collection.watch(&doc!{"$match": {"$gte": 10}}, &doc!{}, Some(1000)).unwrap();
let mut counter = 10; let mut counter = 10;
for x in stream { for x in stream {

Loading…
Cancel
Save