diff --git a/mongoc-sys/Cargo.toml b/mongoc-sys/Cargo.toml index 4772675..0376106 100644 --- a/mongoc-sys/Cargo.toml +++ b/mongoc-sys/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mongoc-sys" -version = "1.8.2-1" +version = "1.9.2" description = "Sys package with installer and bindings for mongoc" authors = ["Thijs Cadier "] build = "build.rs" diff --git a/mongoc-sys/src/lib.rs b/mongoc-sys/src/lib.rs index cf4640c..2e68400 100644 --- a/mongoc-sys/src/lib.rs +++ b/mongoc-sys/src/lib.rs @@ -154,6 +154,7 @@ pub mod bindings { pub fn mongoc_collection_save(collection: *mut mongoc_collection_t, document: *const bson_t, write_concern: *const mongoc_write_concern_t, error: *mut bson_error_t) -> u8; pub fn mongoc_collection_update(collection: *mut mongoc_collection_t, flags: mongoc_update_flags_t, selector: *const bson_t, update: *const bson_t, write_concern: *const mongoc_write_concern_t, error: *mut bson_error_t) -> u8; pub fn mongoc_collection_destroy(collection: *mut mongoc_collection_t) -> (); + pub fn mongoc_collection_watch(collection: *mut mongoc_collection_t, pipeline: *const bson_t, opts: *const bson_t) -> *mut mongoc_change_stream_t; } // Cursor @@ -179,6 +180,13 @@ pub mod bindings { pub fn mongoc_bulk_operation_destroy(bulk: *mut mongoc_bulk_operation_t) -> (); } + // Change stream + pub enum mongoc_change_stream_t {} + extern "C" { + pub fn mongoc_change_stream_next(stream: *mut mongoc_change_stream_t, bson: *mut *const bson_t) -> (); + pub fn mongoc_change_stream_destroy(stream: *mut mongoc_change_stream_t) -> (); + } + // Flags pub type mongoc_query_flags_t = ::libc::c_uint; pub const MONGOC_DELETE_NONE: ::libc::c_uint = 0; @@ -253,4 +261,5 @@ pub mod bindings { pub const MONGOC_ERROR_PROTOCOL_ERROR: ::libc::c_uint = 17; pub const MONGOC_ERROR_WRITE_CONCERN_ERROR: ::libc::c_uint = 64; pub const MONGOC_ERROR_DUPLICATE_KEY: ::libc::c_uint = 11000; + pub const MONGOC_ERROR_CHANGE_STREAM_NO_RESUME_TOKEN: ::libc::c_uint = 11001; } diff --git a/src/change_stream.rs b/src/change_stream.rs new file mode 100644 index 0000000..a19209b --- /dev/null +++ b/src/change_stream.rs @@ -0,0 +1,33 @@ +//! Access to a MongoDB change stream. +use super::collection::Collection; + +use mongoc::bindings; + +pub struct ChangeStream<'a> { + _collection: &'a Collection<'a>, + inner: *mut bindings::mongoc_change_stream_t +} + +impl<'a> ChangeStream<'a> { + #[doc(hidden)] + pub fn new( + _collection: &'a Collection<'a>, + inner: *mut bindings::mongoc_change_stream_t + ) -> Self { + Self { + _collection, + inner + } + } +} + +impl<'a> Drop for ChangeStream<'a> { + fn drop(&mut self) { + assert!(!self.inner.is_null()); + unsafe { + bindings::mongoc_change_stream_destroy(self.inner); + } + } +} + + diff --git a/src/collection.rs b/src/collection.rs index 729551f..180e7ae 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -23,6 +23,7 @@ use super::database::Database; use super::flags::{Flags,FlagsValue,InsertFlag,QueryFlag,RemoveFlag,UpdateFlag}; use super::write_concern::WriteConcern; use super::read_prefs::ReadPrefs; +use super::change_stream::ChangeStream; #[doc(hidden)] pub enum CreatedBy<'a> { @@ -713,6 +714,25 @@ impl<'a> Collection<'a> { tail_options.unwrap_or(TailOptions::default()) ) } + + /// This function returns change stream for the colletion + /// + /// Returns `Stream` struct + pub fn watch( + &'a self, + pipeline: &Document, + opts: &Document + ) -> Result> { + let inner = unsafe { + bindings::mongoc_collection_watch( + self.inner, + try!(Bsonc::from_document(&pipeline)).inner(), + try!(Bsonc::from_document(&opts)).inner() + ) + }; + Ok(ChangeStream::new(self, inner)) + } + } impl<'a> Drop for Collection<'a> { diff --git a/src/lib.rs b/src/lib.rs index 12c233a..6c7dae3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,6 +49,7 @@ pub mod database; pub mod flags; pub mod read_prefs; pub mod write_concern; +pub mod change_stream; mod bsonc; mod error; diff --git a/tests/change_stream.rs b/tests/change_stream.rs new file mode 100644 index 0000000..9c4e2ee --- /dev/null +++ b/tests/change_stream.rs @@ -0,0 +1,4 @@ +#[test] +fn test_change_stream() { + assert_eq!(true, true); +} diff --git a/tests/tests.rs b/tests/tests.rs index 6f83d62..94ff05e 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -13,3 +13,4 @@ mod flags; mod read_prefs; mod uri; mod write_concern; +mod change_stream;