From bdc045eeb2dd0e16d40103e75ca2d8e1521c42a0 Mon Sep 17 00:00:00 2001 From: inv2004 Date: Fri, 9 Nov 2018 02:32:47 -0500 Subject: [PATCH] ChangeStream test in_progress --- mongoc-sys/src/lib.rs | 3 ++- src/change_stream.rs | 61 ++++++++++++++++++++++++++++++++++++++++++ tests/change_stream.rs | 18 ++++++++++++- 3 files changed, 80 insertions(+), 2 deletions(-) diff --git a/mongoc-sys/src/lib.rs b/mongoc-sys/src/lib.rs index 2e68400..40811d5 100644 --- a/mongoc-sys/src/lib.rs +++ b/mongoc-sys/src/lib.rs @@ -183,7 +183,8 @@ pub mod bindings { // Change stream pub enum mongoc_change_stream_t {} extern "C" { - pub fn mongoc_change_stream_next(stream: *mut mongoc_change_stream_t, bson: *mut *const bson_t) -> (); + pub fn mongoc_change_stream_next(stream: *mut mongoc_change_stream_t, bson: *mut *const bson_t) -> u8; + pub fn mongoc_change_stream_error_document(stream: *mut mongoc_change_stream_t, error: *mut bson_error_t, reply: *mut bson_t) -> u8; pub fn mongoc_change_stream_destroy(stream: *mut mongoc_change_stream_t) -> (); } diff --git a/src/change_stream.rs b/src/change_stream.rs index a19209b..72e89e0 100644 --- a/src/change_stream.rs +++ b/src/change_stream.rs @@ -1,7 +1,15 @@ //! Access to a MongoDB change stream. + +use std::ptr; +use std::iter::Iterator; use super::collection::Collection; use mongoc::bindings; +use bson::{Bson,Document}; +use super::bsonc::Bsonc; +use super::BsoncError; +use super::Result; + pub struct ChangeStream<'a> { _collection: &'a Collection<'a>, @@ -19,8 +27,61 @@ impl<'a> ChangeStream<'a> { inner } } + + fn error(&self) -> BsoncError { + assert!(!self.inner.is_null()); + let mut error = BsoncError::empty(); + let mut reply = Bsonc::new(); + + unsafe { + bindings::mongoc_change_stream_error_document( + self.inner, + error.mut_inner(), + reply.mut_inner() + ) + }; + error + } +} + +impl<'a> Iterator for ChangeStream<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + + let mut bson_ptr: *const bindings::bson_t = ptr::null(); + + let success = unsafe { + bindings::mongoc_change_stream_next( + self.inner, + &mut bson_ptr + ) + }; + + if success == 1 { + assert!(!bson_ptr.is_null()); + println!("DEBUG0: {}", success); + + let bsonc = Bsonc::from_ptr(bson_ptr); + match bsonc.as_document() { + Ok(document) => return Some(Ok(document)), + Err(error) => return Some(Err(error.into())) + } + } else { + println!("DEBUG1: {}", success); + let error = self.error(); + println!("DEBUG11: {}", error); + if error.is_empty() { + None + } else { + Some(Err(error.into())) + } + } + + } } + impl<'a> Drop for ChangeStream<'a> { fn drop(&mut self) { assert!(!self.inner.is_null()); diff --git a/tests/change_stream.rs b/tests/change_stream.rs index 9c4e2ee..9700995 100644 --- a/tests/change_stream.rs +++ b/tests/change_stream.rs @@ -1,4 +1,20 @@ +//use bson; + +use std::sync::Arc; +use std::thread; + +use mongo_driver::client::{ClientPool,Uri}; +use mongo_driver::Result; + #[test] fn test_change_stream() { - assert_eq!(true, true); + let uri = Uri::new("mongodb://localhost:27017/").unwrap(); + let pool = Arc::new(ClientPool::new(uri, None)); + let client = pool.pop(); + let mut collection = client.get_collection("rust_driver_test", "change_stream"); + + let stream = collection.watch(&doc!{}, &doc!{"maxAwaitTimeMS": 10_000}).unwrap(); + let next = stream.into_iter().next().unwrap().unwrap(); + assert_eq!(true, false); } +