From 2e424b43df2da27cce19b48c478993d619a77013 Mon Sep 17 00:00:00 2001
From: inv2004 <gPoiuhjkl1>
Date: Fri, 9 Nov 2018 01:08:37 -0500
Subject: [PATCH] ChangeStream + bindings

---
 mongoc-sys/Cargo.toml  |  2 +-
 mongoc-sys/src/lib.rs  |  9 +++++++++
 src/change_stream.rs   | 33 +++++++++++++++++++++++++++++++++
 src/collection.rs      | 20 ++++++++++++++++++++
 src/lib.rs             |  1 +
 tests/change_stream.rs |  4 ++++
 tests/tests.rs         |  1 +
 7 files changed, 69 insertions(+), 1 deletion(-)
 create mode 100644 src/change_stream.rs
 create mode 100644 tests/change_stream.rs

diff --git a/mongoc-sys/Cargo.toml b/mongoc-sys/Cargo.toml
index 4772675..0376106 100644
--- a/mongoc-sys/Cargo.toml
+++ b/mongoc-sys/Cargo.toml
@@ -1,6 +1,6 @@
 [package]
 name        = "mongoc-sys"
-version     = "1.8.2-1"
+version     = "1.9.2"
 description = "Sys package with installer and bindings for mongoc"
 authors     = ["Thijs Cadier <thijs@appsignal.com>"]
 build       = "build.rs"
diff --git a/mongoc-sys/src/lib.rs b/mongoc-sys/src/lib.rs
index cf4640c..2e68400 100644
--- a/mongoc-sys/src/lib.rs
+++ b/mongoc-sys/src/lib.rs
@@ -154,6 +154,7 @@ pub mod bindings {
         pub fn mongoc_collection_save(collection: *mut mongoc_collection_t, document: *const bson_t, write_concern: *const mongoc_write_concern_t, error: *mut bson_error_t) -> u8;
         pub fn mongoc_collection_update(collection: *mut mongoc_collection_t, flags: mongoc_update_flags_t, selector: *const bson_t, update: *const bson_t, write_concern: *const mongoc_write_concern_t, error: *mut bson_error_t) -> u8;
         pub fn mongoc_collection_destroy(collection: *mut mongoc_collection_t) -> ();
+        pub fn mongoc_collection_watch(collection: *mut mongoc_collection_t, pipeline: *const bson_t, opts: *const bson_t) -> *mut mongoc_change_stream_t;
     }
 
     // Cursor
@@ -179,6 +180,13 @@ pub mod bindings {
         pub fn mongoc_bulk_operation_destroy(bulk: *mut mongoc_bulk_operation_t) -> ();
     }
 
+    // Change stream
+    pub enum mongoc_change_stream_t {}
+    extern "C" {
+        pub fn mongoc_change_stream_next(stream: *mut mongoc_change_stream_t, bson: *mut *const bson_t) -> ();
+        pub fn mongoc_change_stream_destroy(stream: *mut mongoc_change_stream_t) -> ();
+    }
+
     // Flags
     pub type mongoc_query_flags_t = ::libc::c_uint;
     pub const MONGOC_DELETE_NONE: ::libc::c_uint = 0;
@@ -253,4 +261,5 @@ pub mod bindings {
     pub const MONGOC_ERROR_PROTOCOL_ERROR: ::libc::c_uint = 17;
     pub const MONGOC_ERROR_WRITE_CONCERN_ERROR: ::libc::c_uint = 64;
     pub const MONGOC_ERROR_DUPLICATE_KEY: ::libc::c_uint = 11000;
+    pub const MONGOC_ERROR_CHANGE_STREAM_NO_RESUME_TOKEN: ::libc::c_uint = 11001;
 }
diff --git a/src/change_stream.rs b/src/change_stream.rs
new file mode 100644
index 0000000..a19209b
--- /dev/null
+++ b/src/change_stream.rs
@@ -0,0 +1,33 @@
+//! Access to a MongoDB change stream.
+use super::collection::Collection;
+
+use mongoc::bindings;
+
+pub struct ChangeStream<'a> {
+    _collection: &'a Collection<'a>,
+    inner:       *mut bindings::mongoc_change_stream_t
+}
+
+impl<'a> ChangeStream<'a> {
+    #[doc(hidden)]
+    pub fn new(
+        _collection: &'a Collection<'a>,
+        inner:      *mut bindings::mongoc_change_stream_t
+    ) -> Self {
+        Self {
+            _collection,
+            inner
+        }
+    }
+}
+
+impl<'a> Drop for ChangeStream<'a> {
+    fn drop(&mut self) {
+        assert!(!self.inner.is_null());
+        unsafe {
+            bindings::mongoc_change_stream_destroy(self.inner);
+        }
+    }
+}
+
+
diff --git a/src/collection.rs b/src/collection.rs
index 729551f..180e7ae 100644
--- a/src/collection.rs
+++ b/src/collection.rs
@@ -23,6 +23,7 @@ use super::database::Database;
 use super::flags::{Flags,FlagsValue,InsertFlag,QueryFlag,RemoveFlag,UpdateFlag};
 use super::write_concern::WriteConcern;
 use super::read_prefs::ReadPrefs;
+use super::change_stream::ChangeStream;
 
 #[doc(hidden)]
 pub enum CreatedBy<'a> {
@@ -713,6 +714,25 @@ impl<'a> Collection<'a> {
             tail_options.unwrap_or(TailOptions::default())
         )
     }
+
+    /// This function returns change stream for the colletion
+    ///
+    /// Returns `Stream` struct
+    pub fn watch(
+        &'a self,
+        pipeline: &Document,
+        opts: &Document
+    ) -> Result<ChangeStream<'a>> {
+        let inner = unsafe {
+            bindings::mongoc_collection_watch(
+                self.inner,
+                try!(Bsonc::from_document(&pipeline)).inner(),
+                try!(Bsonc::from_document(&opts)).inner()
+            )
+        };
+        Ok(ChangeStream::new(self, inner))
+    }
+
 }
 
 impl<'a> Drop for Collection<'a> {
diff --git a/src/lib.rs b/src/lib.rs
index 12c233a..6c7dae3 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -49,6 +49,7 @@ pub mod database;
 pub mod flags;
 pub mod read_prefs;
 pub mod write_concern;
+pub mod change_stream;
 
 mod bsonc;
 mod error;
diff --git a/tests/change_stream.rs b/tests/change_stream.rs
new file mode 100644
index 0000000..9c4e2ee
--- /dev/null
+++ b/tests/change_stream.rs
@@ -0,0 +1,4 @@
+#[test]
+fn test_change_stream() {
+    assert_eq!(true, true);
+}
diff --git a/tests/tests.rs b/tests/tests.rs
index 6f83d62..94ff05e 100644
--- a/tests/tests.rs
+++ b/tests/tests.rs
@@ -13,3 +13,4 @@ mod flags;
 mod read_prefs;
 mod uri;
 mod write_concern;
+mod change_stream;