ChangeStream test in_progress

pull/48/head
inv2004 7 years ago
parent bdca011234
commit bdc045eeb2

@ -183,7 +183,8 @@ pub mod bindings {
// Change stream
pub enum mongoc_change_stream_t {}
extern "C" {
pub fn mongoc_change_stream_next(stream: *mut mongoc_change_stream_t, bson: *mut *const bson_t) -> ();
pub fn mongoc_change_stream_next(stream: *mut mongoc_change_stream_t, bson: *mut *const bson_t) -> u8;
pub fn mongoc_change_stream_error_document(stream: *mut mongoc_change_stream_t, error: *mut bson_error_t, reply: *mut bson_t) -> u8;
pub fn mongoc_change_stream_destroy(stream: *mut mongoc_change_stream_t) -> ();
}

@ -1,7 +1,15 @@
//! Access to a MongoDB change stream.
use std::ptr;
use std::iter::Iterator;
use super::collection::Collection;
use mongoc::bindings;
use bson::{Bson,Document};
use super::bsonc::Bsonc;
use super::BsoncError;
use super::Result;
pub struct ChangeStream<'a> {
_collection: &'a Collection<'a>,
@ -19,8 +27,61 @@ impl<'a> ChangeStream<'a> {
inner
}
}
fn error(&self) -> BsoncError {
assert!(!self.inner.is_null());
let mut error = BsoncError::empty();
let mut reply = Bsonc::new();
unsafe {
bindings::mongoc_change_stream_error_document(
self.inner,
error.mut_inner(),
reply.mut_inner()
)
};
error
}
}
impl<'a> Iterator for ChangeStream<'a> {
type Item = Result<Document>;
fn next(&mut self) -> Option<Self::Item> {
let mut bson_ptr: *const bindings::bson_t = ptr::null();
let success = unsafe {
bindings::mongoc_change_stream_next(
self.inner,
&mut bson_ptr
)
};
if success == 1 {
assert!(!bson_ptr.is_null());
println!("DEBUG0: {}", success);
let bsonc = Bsonc::from_ptr(bson_ptr);
match bsonc.as_document() {
Ok(document) => return Some(Ok(document)),
Err(error) => return Some(Err(error.into()))
}
} else {
println!("DEBUG1: {}", success);
let error = self.error();
println!("DEBUG11: {}", error);
if error.is_empty() {
None
} else {
Some(Err(error.into()))
}
}
}
}
impl<'a> Drop for ChangeStream<'a> {
fn drop(&mut self) {
assert!(!self.inner.is_null());

@ -1,4 +1,20 @@
//use bson;
use std::sync::Arc;
use std::thread;
use mongo_driver::client::{ClientPool,Uri};
use mongo_driver::Result;
#[test]
fn test_change_stream() {
assert_eq!(true, true);
let uri = Uri::new("mongodb://localhost:27017/").unwrap();
let pool = Arc::new(ClientPool::new(uri, None));
let client = pool.pop();
let mut collection = client.get_collection("rust_driver_test", "change_stream");
let stream = collection.watch(&doc!{}, &doc!{"maxAwaitTimeMS": 10_000}).unwrap();
let next = stream.into_iter().next().unwrap().unwrap();
assert_eq!(true, false);
}

Loading…
Cancel
Save