From bdca01123497a05b3a9f921b630ec44c3c35af74 Mon Sep 17 00:00:00 2001 From: inv2004 Date: Fri, 9 Nov 2018 01:08:37 -0500 Subject: [PATCH 1/6] ChangeStream + bindings --- mongoc-sys/Cargo.toml | 2 +- mongoc-sys/src/lib.rs | 9 +++++++++ src/change_stream.rs | 33 +++++++++++++++++++++++++++++++++ src/collection.rs | 20 ++++++++++++++++++++ src/lib.rs | 1 + tests/change_stream.rs | 4 ++++ tests/tests.rs | 1 + 7 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 src/change_stream.rs create mode 100644 tests/change_stream.rs diff --git a/mongoc-sys/Cargo.toml b/mongoc-sys/Cargo.toml index 4772675..0376106 100644 --- a/mongoc-sys/Cargo.toml +++ b/mongoc-sys/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mongoc-sys" -version = "1.8.2-1" +version = "1.9.2" description = "Sys package with installer and bindings for mongoc" authors = ["Thijs Cadier "] build = "build.rs" diff --git a/mongoc-sys/src/lib.rs b/mongoc-sys/src/lib.rs index cf4640c..2e68400 100644 --- a/mongoc-sys/src/lib.rs +++ b/mongoc-sys/src/lib.rs @@ -154,6 +154,7 @@ pub mod bindings { pub fn mongoc_collection_save(collection: *mut mongoc_collection_t, document: *const bson_t, write_concern: *const mongoc_write_concern_t, error: *mut bson_error_t) -> u8; pub fn mongoc_collection_update(collection: *mut mongoc_collection_t, flags: mongoc_update_flags_t, selector: *const bson_t, update: *const bson_t, write_concern: *const mongoc_write_concern_t, error: *mut bson_error_t) -> u8; pub fn mongoc_collection_destroy(collection: *mut mongoc_collection_t) -> (); + pub fn mongoc_collection_watch(collection: *mut mongoc_collection_t, pipeline: *const bson_t, opts: *const bson_t) -> *mut mongoc_change_stream_t; } // Cursor @@ -179,6 +180,13 @@ pub mod bindings { pub fn mongoc_bulk_operation_destroy(bulk: *mut mongoc_bulk_operation_t) -> (); } + // Change stream + pub enum mongoc_change_stream_t {} + extern "C" { + pub fn mongoc_change_stream_next(stream: *mut mongoc_change_stream_t, bson: *mut *const bson_t) -> (); + pub fn mongoc_change_stream_destroy(stream: *mut mongoc_change_stream_t) -> (); + } + // Flags pub type mongoc_query_flags_t = ::libc::c_uint; pub const MONGOC_DELETE_NONE: ::libc::c_uint = 0; @@ -253,4 +261,5 @@ pub mod bindings { pub const MONGOC_ERROR_PROTOCOL_ERROR: ::libc::c_uint = 17; pub const MONGOC_ERROR_WRITE_CONCERN_ERROR: ::libc::c_uint = 64; pub const MONGOC_ERROR_DUPLICATE_KEY: ::libc::c_uint = 11000; + pub const MONGOC_ERROR_CHANGE_STREAM_NO_RESUME_TOKEN: ::libc::c_uint = 11001; } diff --git a/src/change_stream.rs b/src/change_stream.rs new file mode 100644 index 0000000..a19209b --- /dev/null +++ b/src/change_stream.rs @@ -0,0 +1,33 @@ +//! Access to a MongoDB change stream. +use super::collection::Collection; + +use mongoc::bindings; + +pub struct ChangeStream<'a> { + _collection: &'a Collection<'a>, + inner: *mut bindings::mongoc_change_stream_t +} + +impl<'a> ChangeStream<'a> { + #[doc(hidden)] + pub fn new( + _collection: &'a Collection<'a>, + inner: *mut bindings::mongoc_change_stream_t + ) -> Self { + Self { + _collection, + inner + } + } +} + +impl<'a> Drop for ChangeStream<'a> { + fn drop(&mut self) { + assert!(!self.inner.is_null()); + unsafe { + bindings::mongoc_change_stream_destroy(self.inner); + } + } +} + + diff --git a/src/collection.rs b/src/collection.rs index 729551f..180e7ae 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -23,6 +23,7 @@ use super::database::Database; use super::flags::{Flags,FlagsValue,InsertFlag,QueryFlag,RemoveFlag,UpdateFlag}; use super::write_concern::WriteConcern; use super::read_prefs::ReadPrefs; +use super::change_stream::ChangeStream; #[doc(hidden)] pub enum CreatedBy<'a> { @@ -713,6 +714,25 @@ impl<'a> Collection<'a> { tail_options.unwrap_or(TailOptions::default()) ) } + + /// This function returns change stream for the colletion + /// + /// Returns `Stream` struct + pub fn watch( + &'a self, + pipeline: &Document, + opts: &Document + ) -> Result> { + let inner = unsafe { + bindings::mongoc_collection_watch( + self.inner, + try!(Bsonc::from_document(&pipeline)).inner(), + try!(Bsonc::from_document(&opts)).inner() + ) + }; + Ok(ChangeStream::new(self, inner)) + } + } impl<'a> Drop for Collection<'a> { diff --git a/src/lib.rs b/src/lib.rs index 12c233a..6c7dae3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,6 +49,7 @@ pub mod database; pub mod flags; pub mod read_prefs; pub mod write_concern; +pub mod change_stream; mod bsonc; mod error; diff --git a/tests/change_stream.rs b/tests/change_stream.rs new file mode 100644 index 0000000..9c4e2ee --- /dev/null +++ b/tests/change_stream.rs @@ -0,0 +1,4 @@ +#[test] +fn test_change_stream() { + assert_eq!(true, true); +} diff --git a/tests/tests.rs b/tests/tests.rs index 6f83d62..94ff05e 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -13,3 +13,4 @@ mod flags; mod read_prefs; mod uri; mod write_concern; +mod change_stream; From bdc045eeb2dd0e16d40103e75ca2d8e1521c42a0 Mon Sep 17 00:00:00 2001 From: inv2004 Date: Fri, 9 Nov 2018 02:32:47 -0500 Subject: [PATCH 2/6] ChangeStream test in_progress --- mongoc-sys/src/lib.rs | 3 ++- src/change_stream.rs | 61 ++++++++++++++++++++++++++++++++++++++++++ tests/change_stream.rs | 18 ++++++++++++- 3 files changed, 80 insertions(+), 2 deletions(-) diff --git a/mongoc-sys/src/lib.rs b/mongoc-sys/src/lib.rs index 2e68400..40811d5 100644 --- a/mongoc-sys/src/lib.rs +++ b/mongoc-sys/src/lib.rs @@ -183,7 +183,8 @@ pub mod bindings { // Change stream pub enum mongoc_change_stream_t {} extern "C" { - pub fn mongoc_change_stream_next(stream: *mut mongoc_change_stream_t, bson: *mut *const bson_t) -> (); + pub fn mongoc_change_stream_next(stream: *mut mongoc_change_stream_t, bson: *mut *const bson_t) -> u8; + pub fn mongoc_change_stream_error_document(stream: *mut mongoc_change_stream_t, error: *mut bson_error_t, reply: *mut bson_t) -> u8; pub fn mongoc_change_stream_destroy(stream: *mut mongoc_change_stream_t) -> (); } diff --git a/src/change_stream.rs b/src/change_stream.rs index a19209b..72e89e0 100644 --- a/src/change_stream.rs +++ b/src/change_stream.rs @@ -1,7 +1,15 @@ //! Access to a MongoDB change stream. + +use std::ptr; +use std::iter::Iterator; use super::collection::Collection; use mongoc::bindings; +use bson::{Bson,Document}; +use super::bsonc::Bsonc; +use super::BsoncError; +use super::Result; + pub struct ChangeStream<'a> { _collection: &'a Collection<'a>, @@ -19,8 +27,61 @@ impl<'a> ChangeStream<'a> { inner } } + + fn error(&self) -> BsoncError { + assert!(!self.inner.is_null()); + let mut error = BsoncError::empty(); + let mut reply = Bsonc::new(); + + unsafe { + bindings::mongoc_change_stream_error_document( + self.inner, + error.mut_inner(), + reply.mut_inner() + ) + }; + error + } +} + +impl<'a> Iterator for ChangeStream<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + + let mut bson_ptr: *const bindings::bson_t = ptr::null(); + + let success = unsafe { + bindings::mongoc_change_stream_next( + self.inner, + &mut bson_ptr + ) + }; + + if success == 1 { + assert!(!bson_ptr.is_null()); + println!("DEBUG0: {}", success); + + let bsonc = Bsonc::from_ptr(bson_ptr); + match bsonc.as_document() { + Ok(document) => return Some(Ok(document)), + Err(error) => return Some(Err(error.into())) + } + } else { + println!("DEBUG1: {}", success); + let error = self.error(); + println!("DEBUG11: {}", error); + if error.is_empty() { + None + } else { + Some(Err(error.into())) + } + } + + } } + impl<'a> Drop for ChangeStream<'a> { fn drop(&mut self) { assert!(!self.inner.is_null()); diff --git a/tests/change_stream.rs b/tests/change_stream.rs index 9c4e2ee..9700995 100644 --- a/tests/change_stream.rs +++ b/tests/change_stream.rs @@ -1,4 +1,20 @@ +//use bson; + +use std::sync::Arc; +use std::thread; + +use mongo_driver::client::{ClientPool,Uri}; +use mongo_driver::Result; + #[test] fn test_change_stream() { - assert_eq!(true, true); + let uri = Uri::new("mongodb://localhost:27017/").unwrap(); + let pool = Arc::new(ClientPool::new(uri, None)); + let client = pool.pop(); + let mut collection = client.get_collection("rust_driver_test", "change_stream"); + + let stream = collection.watch(&doc!{}, &doc!{"maxAwaitTimeMS": 10_000}).unwrap(); + let next = stream.into_iter().next().unwrap().unwrap(); + assert_eq!(true, false); } + From ba4979e9cbe07e80d42073d766b9f3b3ff30c4ec Mon Sep 17 00:00:00 2001 From: inv2004 Date: Fri, 9 Nov 2018 03:06:22 -0500 Subject: [PATCH 3/6] ChangeStream test done --- src/change_stream.rs | 3 --- tests/change_stream.rs | 33 ++++++++++++++++++++++++++++----- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/change_stream.rs b/src/change_stream.rs index 72e89e0..2b40276 100644 --- a/src/change_stream.rs +++ b/src/change_stream.rs @@ -60,7 +60,6 @@ impl<'a> Iterator for ChangeStream<'a> { if success == 1 { assert!(!bson_ptr.is_null()); - println!("DEBUG0: {}", success); let bsonc = Bsonc::from_ptr(bson_ptr); match bsonc.as_document() { @@ -68,9 +67,7 @@ impl<'a> Iterator for ChangeStream<'a> { Err(error) => return Some(Err(error.into())) } } else { - println!("DEBUG1: {}", success); let error = self.error(); - println!("DEBUG11: {}", error); if error.is_empty() { None } else { diff --git a/tests/change_stream.rs b/tests/change_stream.rs index 9700995..188e55b 100644 --- a/tests/change_stream.rs +++ b/tests/change_stream.rs @@ -2,19 +2,42 @@ use std::sync::Arc; use std::thread; +use std::time::Duration; use mongo_driver::client::{ClientPool,Uri}; -use mongo_driver::Result; #[test] fn test_change_stream() { let uri = Uri::new("mongodb://localhost:27017/").unwrap(); let pool = Arc::new(ClientPool::new(uri, None)); let client = pool.pop(); - let mut collection = client.get_collection("rust_driver_test", "change_stream"); + let collection = client.get_collection("rust_driver_test", "change_stream"); - let stream = collection.watch(&doc!{}, &doc!{"maxAwaitTimeMS": 10_000}).unwrap(); - let next = stream.into_iter().next().unwrap().unwrap(); - assert_eq!(true, false); + let cloned_pool = pool.clone(); + let guard = thread::spawn(move || { + let client = cloned_pool.pop(); + let collection = client.get_collection("rust_driver_test", "change_stream"); + let stream = collection.watch(&doc!{}, &doc!{"maxAwaitTimeMS": 1_000}).unwrap(); + let mut counter = 0; + for x in stream { + let c = x.unwrap().get_document("fullDocument").unwrap().get_i32("c").unwrap(); + if c == counter { + counter += 1; + } + if counter == 15 { + break; + } + }; + counter + }); + + thread::sleep(Duration::from_millis(1)); + + for i in 0..15 { + collection.insert(&doc! {"c": i}, None).unwrap(); + } + + + assert_eq!(15, guard.join().unwrap()); } From 64d2501b54e6298d2dbcf80601ead36dabd684d5 Mon Sep 17 00:00:00 2001 From: inv2004 Date: Fri, 9 Nov 2018 03:11:38 -0500 Subject: [PATCH 4/6] timeout after thread::spawn is increased (more stable result). --- tests/change_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/change_stream.rs b/tests/change_stream.rs index 188e55b..0dc84dd 100644 --- a/tests/change_stream.rs +++ b/tests/change_stream.rs @@ -31,7 +31,7 @@ fn test_change_stream() { counter }); - thread::sleep(Duration::from_millis(1)); + thread::sleep(Duration::from_millis(100)); for i in 0..15 { collection.insert(&doc! {"c": i}, None).unwrap(); From cce22b84a5ef5b2ca4fbcf6b467a2d96842f1fe2 Mon Sep 17 00:00:00 2001 From: inv2004 Date: Fri, 9 Nov 2018 03:19:12 -0500 Subject: [PATCH 5/6] added $match into test --- tests/change_stream.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/change_stream.rs b/tests/change_stream.rs index 0dc84dd..d2ac0b7 100644 --- a/tests/change_stream.rs +++ b/tests/change_stream.rs @@ -17,14 +17,15 @@ fn test_change_stream() { let guard = thread::spawn(move || { let client = cloned_pool.pop(); let collection = client.get_collection("rust_driver_test", "change_stream"); - let stream = collection.watch(&doc!{}, &doc!{"maxAwaitTimeMS": 1_000}).unwrap(); - let mut counter = 0; + let stream = collection.watch(&doc!{"$match": {"$gte": 10}}, &doc!{"maxAwaitTimeMS": 1_000}).unwrap(); + + let mut counter = 10; for x in stream { let c = x.unwrap().get_document("fullDocument").unwrap().get_i32("c").unwrap(); if c == counter { counter += 1; } - if counter == 15 { + if counter == 25 { break; } }; @@ -33,11 +34,11 @@ fn test_change_stream() { thread::sleep(Duration::from_millis(100)); - for i in 0..15 { + for i in 0..25 { collection.insert(&doc! {"c": i}, None).unwrap(); } - assert_eq!(15, guard.join().unwrap()); + assert_eq!(25, guard.join().unwrap()); } From f47fd8473b88c0f6ed74f8585da8c95948a83b2b Mon Sep 17 00:00:00 2001 From: inv2004 Date: Sat, 10 Nov 2018 00:14:56 -0500 Subject: [PATCH 6/6] added timeout argument to Collection::watch --- src/change_stream.rs | 55 ++++++++++++++++++++++++------------------ src/collection.rs | 16 ++++++++---- tests/change_stream.rs | 2 +- 3 files changed, 44 insertions(+), 29 deletions(-) diff --git a/src/change_stream.rs b/src/change_stream.rs index 2b40276..2f822c5 100644 --- a/src/change_stream.rs +++ b/src/change_stream.rs @@ -13,18 +13,21 @@ use super::Result; pub struct ChangeStream<'a> { _collection: &'a Collection<'a>, - inner: *mut bindings::mongoc_change_stream_t + inner: *mut bindings::mongoc_change_stream_t, + timeout: bool } impl<'a> ChangeStream<'a> { #[doc(hidden)] pub fn new( _collection: &'a Collection<'a>, - inner: *mut bindings::mongoc_change_stream_t + inner: *mut bindings::mongoc_change_stream_t, + timeout: bool ) -> Self { Self { _collection, - inner + inner, + timeout } } @@ -51,27 +54,33 @@ impl<'a> Iterator for ChangeStream<'a> { let mut bson_ptr: *const bindings::bson_t = ptr::null(); - let success = unsafe { - bindings::mongoc_change_stream_next( - self.inner, - &mut bson_ptr - ) - }; - - if success == 1 { - assert!(!bson_ptr.is_null()); - - let bsonc = Bsonc::from_ptr(bson_ptr); - match bsonc.as_document() { - Ok(document) => return Some(Ok(document)), - Err(error) => return Some(Err(error.into())) - } - } else { - let error = self.error(); - if error.is_empty() { - None + loop { + let success = unsafe { + bindings::mongoc_change_stream_next( + self.inner, + &mut bson_ptr + ) + }; + + if success == 1 { + assert!(!bson_ptr.is_null()); + + let bsonc = Bsonc::from_ptr(bson_ptr); + match bsonc.as_document() { + Ok(document) => return Some(Ok(document)), + Err(error) => return Some(Err(error.into())) + } } else { - Some(Err(error.into())) + let error = self.error(); + if error.is_empty() { + if self.timeout { + return None; + } else { + continue; // do not exit from stream + } + } else { + return Some(Err(error.into())); + } } } diff --git a/src/collection.rs b/src/collection.rs index 180e7ae..218ad81 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -715,24 +715,30 @@ impl<'a> Collection<'a> { ) } - /// This function returns change stream for the colletion + /// This function returns change stream for the collection + /// if timeout is None, stream does not exit on timeout /// /// Returns `Stream` struct pub fn watch( &'a self, pipeline: &Document, - opts: &Document + opts: &Document, + timeout: Option ) -> Result> { + let mut new_opts = opts.clone(); + if let Some(timeout) = timeout { + new_opts.insert("maxAwaitTimeMS", timeout); + } + let inner = unsafe { bindings::mongoc_collection_watch( self.inner, try!(Bsonc::from_document(&pipeline)).inner(), - try!(Bsonc::from_document(&opts)).inner() + try!(Bsonc::from_document(&new_opts)).inner() ) }; - Ok(ChangeStream::new(self, inner)) + Ok(ChangeStream::new(self, inner, timeout.is_some())) } - } impl<'a> Drop for Collection<'a> { diff --git a/tests/change_stream.rs b/tests/change_stream.rs index d2ac0b7..86fdd65 100644 --- a/tests/change_stream.rs +++ b/tests/change_stream.rs @@ -17,7 +17,7 @@ fn test_change_stream() { let guard = thread::spawn(move || { let client = cloned_pool.pop(); let collection = client.get_collection("rust_driver_test", "change_stream"); - let stream = collection.watch(&doc!{"$match": {"$gte": 10}}, &doc!{"maxAwaitTimeMS": 1_000}).unwrap(); + let stream = collection.watch(&doc!{"$match": {"$gte": 10}}, &doc!{}, Some(1000)).unwrap(); let mut counter = 10; for x in stream {