ChangeStream + bindings

pull/48/head
inv2004 7 years ago
parent 95430f2997
commit 2e424b43df

@ -1,6 +1,6 @@
[package]
name = "mongoc-sys"
version = "1.8.2-1"
version = "1.9.2"
description = "Sys package with installer and bindings for mongoc"
authors = ["Thijs Cadier <thijs@appsignal.com>"]
build = "build.rs"

@ -154,6 +154,7 @@ pub mod bindings {
pub fn mongoc_collection_save(collection: *mut mongoc_collection_t, document: *const bson_t, write_concern: *const mongoc_write_concern_t, error: *mut bson_error_t) -> u8;
pub fn mongoc_collection_update(collection: *mut mongoc_collection_t, flags: mongoc_update_flags_t, selector: *const bson_t, update: *const bson_t, write_concern: *const mongoc_write_concern_t, error: *mut bson_error_t) -> u8;
pub fn mongoc_collection_destroy(collection: *mut mongoc_collection_t) -> ();
pub fn mongoc_collection_watch(collection: *mut mongoc_collection_t, pipeline: *const bson_t, opts: *const bson_t) -> *mut mongoc_change_stream_t;
}
// Cursor
@ -179,6 +180,13 @@ pub mod bindings {
pub fn mongoc_bulk_operation_destroy(bulk: *mut mongoc_bulk_operation_t) -> ();
}
// Change stream
pub enum mongoc_change_stream_t {}
extern "C" {
pub fn mongoc_change_stream_next(stream: *mut mongoc_change_stream_t, bson: *mut *const bson_t) -> ();
pub fn mongoc_change_stream_destroy(stream: *mut mongoc_change_stream_t) -> ();
}
// Flags
pub type mongoc_query_flags_t = ::libc::c_uint;
pub const MONGOC_DELETE_NONE: ::libc::c_uint = 0;
@ -253,4 +261,5 @@ pub mod bindings {
pub const MONGOC_ERROR_PROTOCOL_ERROR: ::libc::c_uint = 17;
pub const MONGOC_ERROR_WRITE_CONCERN_ERROR: ::libc::c_uint = 64;
pub const MONGOC_ERROR_DUPLICATE_KEY: ::libc::c_uint = 11000;
pub const MONGOC_ERROR_CHANGE_STREAM_NO_RESUME_TOKEN: ::libc::c_uint = 11001;
}

@ -0,0 +1,33 @@
//! Access to a MongoDB change stream.
use super::collection::Collection;
use mongoc::bindings;
pub struct ChangeStream<'a> {
_collection: &'a Collection<'a>,
inner: *mut bindings::mongoc_change_stream_t
}
impl<'a> ChangeStream<'a> {
#[doc(hidden)]
pub fn new(
_collection: &'a Collection<'a>,
inner: *mut bindings::mongoc_change_stream_t
) -> Self {
Self {
_collection,
inner
}
}
}
impl<'a> Drop for ChangeStream<'a> {
fn drop(&mut self) {
assert!(!self.inner.is_null());
unsafe {
bindings::mongoc_change_stream_destroy(self.inner);
}
}
}

@ -23,6 +23,7 @@ use super::database::Database;
use super::flags::{Flags,FlagsValue,InsertFlag,QueryFlag,RemoveFlag,UpdateFlag};
use super::write_concern::WriteConcern;
use super::read_prefs::ReadPrefs;
use super::change_stream::ChangeStream;
#[doc(hidden)]
pub enum CreatedBy<'a> {
@ -713,6 +714,25 @@ impl<'a> Collection<'a> {
tail_options.unwrap_or(TailOptions::default())
)
}
/// This function returns change stream for the colletion
///
/// Returns `Stream` struct
pub fn watch(
&'a self,
pipeline: &Document,
opts: &Document
) -> Result<ChangeStream<'a>> {
let inner = unsafe {
bindings::mongoc_collection_watch(
self.inner,
try!(Bsonc::from_document(&pipeline)).inner(),
try!(Bsonc::from_document(&opts)).inner()
)
};
Ok(ChangeStream::new(self, inner))
}
}
impl<'a> Drop for Collection<'a> {

@ -49,6 +49,7 @@ pub mod database;
pub mod flags;
pub mod read_prefs;
pub mod write_concern;
pub mod change_stream;
mod bsonc;
mod error;

@ -0,0 +1,4 @@
#[test]
fn test_change_stream() {
assert_eq!(true, true);
}

@ -13,3 +13,4 @@ mod flags;
mod read_prefs;
mod uri;
mod write_concern;
mod change_stream;

Loading…
Cancel
Save