From 4fba9e8b5a352c5e4444264a23cab822bce6f9a9 Mon Sep 17 00:00:00 2001 From: Michael Jansen Date: Fri, 23 Mar 2018 09:14:03 -0400 Subject: [PATCH 1/9] Add db.has_collection --- .gitignore | 1 + mongoc-sys/src/lib.rs | 1 + src/database.rs | 25 +++++++++++++++++++++++++ 3 files changed, 27 insertions(+) diff --git a/.gitignore b/.gitignore index 2730dc9..9e2fadf 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ target Cargo.lock mongoc-sys/mongo-c-driver* ssl_env_vars +.idea/ \ No newline at end of file diff --git a/mongoc-sys/src/lib.rs b/mongoc-sys/src/lib.rs index 57cf709..38ec17b 100644 --- a/mongoc-sys/src/lib.rs +++ b/mongoc-sys/src/lib.rs @@ -102,6 +102,7 @@ pub mod bindings { pub fn mongoc_database_get_collection(database: *mut mongoc_database_t, name: *const ::libc::c_char) -> *mut mongoc_collection_t; pub fn mongoc_database_get_name(database: *mut mongoc_database_t) -> *const ::libc::c_char; pub fn mongoc_database_destroy(database: *mut mongoc_database_t) -> (); + pub fn mongoc_database_has_collection(database: *mut mongoc_database_t, name: *const ::libc::c_char, error: *mut bson_error_t) -> ::libc::boolean_t; } // Client diff --git a/src/database.rs b/src/database.rs index 6cb05af..18bdb1a 100644 --- a/src/database.rs +++ b/src/database.rs @@ -189,6 +189,31 @@ impl<'a> Database<'a> { }; String::from_utf8_lossy(cstr.to_bytes()) } + + /// Create a new collection in this database. + pub fn has_collection>>( + &self, + name: S + ) -> Result { + let mut error = BsoncError::empty(); + let name_cstring = CString::new(name).unwrap(); + + let has_collection = unsafe { + bindings::mongoc_database_has_collection( + self.inner, + name_cstring.as_ptr(), + error.mut_inner()) + }; + + if error.is_empty() { + Ok(match has_collection{ + 0 => false, + _ => true + }) + } else { + Err(error.into()) + } + } } impl<'a> Drop for Database<'a> { From 5a0756ffaa90a8c55923039cb78f22dabba47301 Mon Sep 17 00:00:00 2001 From: Michael Jansen Date: Fri, 23 Mar 2018 09:53:01 -0400 Subject: [PATCH 2/9] change bool to in32 (missing bool on windows?) --- mongoc-sys/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mongoc-sys/src/lib.rs b/mongoc-sys/src/lib.rs index 38ec17b..8061994 100644 --- a/mongoc-sys/src/lib.rs +++ b/mongoc-sys/src/lib.rs @@ -102,7 +102,7 @@ pub mod bindings { pub fn mongoc_database_get_collection(database: *mut mongoc_database_t, name: *const ::libc::c_char) -> *mut mongoc_collection_t; pub fn mongoc_database_get_name(database: *mut mongoc_database_t) -> *const ::libc::c_char; pub fn mongoc_database_destroy(database: *mut mongoc_database_t) -> (); - pub fn mongoc_database_has_collection(database: *mut mongoc_database_t, name: *const ::libc::c_char, error: *mut bson_error_t) -> ::libc::boolean_t; + pub fn mongoc_database_has_collection(database: *mut mongoc_database_t, name: *const ::libc::c_char, error: *mut bson_error_t) -> ::libc::int32_t; } // Client From 8e0b37bc03592588eee866c95fcb2cf5b7145a1e Mon Sep 17 00:00:00 2001 From: Michael Jansen Date: Wed, 11 Apr 2018 08:47:32 -0400 Subject: [PATCH 3/9] add DS_Store to .gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 9e2fadf..f806ce4 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ target Cargo.lock mongoc-sys/mongo-c-driver* ssl_env_vars -.idea/ \ No newline at end of file +.idea/ +.DS_Store \ No newline at end of file From 0aa99ffbc5c8370ed1f1f8a2ce2526613853c4b4 Mon Sep 17 00:00:00 2001 From: Michael Jansen Date: Fri, 13 Apr 2018 10:00:32 -0400 Subject: [PATCH 4/9] add documentation not to use getMore with db.command --- src/database.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/database.rs b/src/database.rs index 18bdb1a..c037c53 100644 --- a/src/database.rs +++ b/src/database.rs @@ -48,6 +48,8 @@ impl<'a> Database<'a> { /// Execute a command on the database. /// This is performed lazily and therefore requires calling `next` on the resulting cursor. + /// Results are returned in batches as per the mongoc driver. + /// To get the next batch: https://docs.mongodb.com/manual/reference/command/getMore/ pub fn command( &'a self, command: Document, From 0a3f41bb0c33afcc0305527502920d4467e89974 Mon Sep 17 00:00:00 2001 From: Michael Jansen Date: Mon, 16 Apr 2018 17:23:11 -0400 Subject: [PATCH 5/9] Add database.command_batch which automatically calls getMore for find and append. --- Cargo.toml | 2 + src/cursor.rs | 122 +++++++++++++++++++++++++++++++++++++++++++++++- src/database.rs | 43 ++++++++++++++++- src/lib.rs | 5 ++ tests/cursor.rs | 46 ++++++++++++++++++ 5 files changed, 215 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 99fb54b..18dca7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,8 @@ name = "tests" libc = "^0.2" log = "^0.3" bson = "^0.11" +serde = "1.0" +serde_derive = "1.0" [dependencies.mongoc-sys] path = "mongoc-sys" diff --git a/src/cursor.rs b/src/cursor.rs index 6439398..30b7955 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -4,9 +4,10 @@ use std::iter::Iterator; use std::ptr; use std::thread; use std::time::Duration; +use std::collections::VecDeque; use mongoc::bindings; -use bson::{Bson,Document,oid}; +use bson::{self,Bson,Document,oid}; use super::BsoncError; use super::bsonc; @@ -15,6 +16,7 @@ use super::database::Database; use super::flags::QueryFlag; use super::collection::{Collection,TailOptions}; use super::CommandAndFindOptions; +use super::MongoError::ValueAccessError; use super::Result; @@ -251,3 +253,121 @@ impl<'a> Iterator for TailingCursor<'a> { } } } + +type DocArray = VecDeque; +type CursorId = i64; + +pub struct BatchCursor<'a> { + cursor: Cursor<'a>, + db: &'a Database<'a>, + coll_name: String, + cursor_id: Option, + documents: Option + +} + +impl<'a> BatchCursor<'a> { + pub fn new( + cursor: Cursor<'a>, + db: &'a Database<'a>, + coll_name: String + ) -> BatchCursor<'a> { + BatchCursor { + cursor, + db, + coll_name, + cursor_id: None, + documents: None + } + } + + fn get_cursor_next(&mut self) -> Option> { + let item_opt = self.cursor.next(); + if let Some(item_res) = item_opt { + if let Ok(item) = item_res { + let docs_ret = batch_to_array(item); + if let Ok(docs) = docs_ret { + self.documents = docs.0; + if docs.1.is_some() {self.cursor_id = docs.1} + let res = self.get_next_doc(); + if res.is_some() { return res; } + } else { + return Some(Err(docs_ret.err().unwrap())); + } + } + } + None + } + + fn get_next_doc(&mut self) -> Option> { + if let Some(ref mut docs) = self.documents { + if docs.len() > 0 { + let doc = docs.pop_front().unwrap(); + return Some(Ok(doc)); + } + } + None + } +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +struct CommandSimpleBatch { + id: CursorId, + first_batch: Option, + next_batch: Option +} +#[derive(Deserialize, Debug)] +struct CommandSimpleResult { + cursor: CommandSimpleBatch +} + +fn batch_to_array(doc: Document) -> Result<(Option,Option)> { + let doc_result: Result = + bson::from_bson(Bson::Document(doc.clone())) + .map_err(|err| + { + error!("cannot read batch from db: {}", err); + ValueAccessError(bson::ValueAccessError::NotPresent) + }); + + trace!("input: {}, result: {:?}", doc, doc_result); + + doc_result.map(|v| { + if v.cursor.first_batch.is_some() {return (v.cursor.first_batch, Some(v.cursor.id));} + if v.cursor.next_batch.is_some() {return (v.cursor.next_batch, Some(v.cursor.id));} + (None,None) + }) +} + +impl<'a> Iterator for BatchCursor<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + + // (1) try the local document buffer + let res = self.get_next_doc(); + if res.is_some() {return res;} + + // (2) try next() + let res = self.get_cursor_next(); + if res.is_some() {return res;} + + // (3) try getMore + if let Some(cid) = self.cursor_id { + let command = doc! { + "getMore": cid as i64, + "collection": self.coll_name.clone() + }; + let cur_result = self.db.command(command, None); + if let Ok(cur) = cur_result { + self.cursor = cur; + let res = self.get_cursor_next(); + if res.is_some() { return res; } + } + } + None + } + + +} \ No newline at end of file diff --git a/src/database.rs b/src/database.rs index c037c53..501f75b 100644 --- a/src/database.rs +++ b/src/database.rs @@ -16,6 +16,7 @@ use super::collection; use super::collection::Collection; use super::cursor; use super::cursor::Cursor; +use super::cursor::BatchCursor; use super::read_prefs::ReadPrefs; use flags::FlagsValue; @@ -25,6 +26,17 @@ pub enum CreatedBy<'a> { OwnedClient(Client<'a>) } +#[doc(hidden)] +fn get_coll_name_from_doc(doc: &Document) -> Result { + const VALID_COMMANDS: &'static [&'static str] = &["find", "aggregate"]; + for s in VALID_COMMANDS { + if let Ok(val) = doc.get_str(s) { + return Ok(val.to_owned()) + } + } + Err(InvalidParamsError.into()) +} + /// Provides access to a MongoDB database. /// /// A database instance can be created by calling `get_database` or `take_database` on a `Client` instance. @@ -58,8 +70,8 @@ impl<'a> Database<'a> { assert!(!self.inner.is_null()); let default_options = CommandAndFindOptions::default(); - let options = options.unwrap_or(&default_options); - let fields_bsonc = options.fields_bsonc(); + let options = options.unwrap_or(&default_options); + let fields_bsonc = options.fields_bsonc(); let cursor_ptr = unsafe { bindings::mongoc_database_command( @@ -91,6 +103,23 @@ impl<'a> Database<'a> { )) } + /// Execute a command on the database. + /// Automates the process of getting the next batch from getMore + /// and parses the batch so only the result documents are returned. + /// I am unsure of the best practices of when to use this or the CRUD function. + pub fn command_batch( + &'a self, + command: Document, + options: Option<&CommandAndFindOptions> + ) -> Result> { + let coll_name = get_coll_name_from_doc(&command)?; + Ok(BatchCursor::new( + self.command(command, options)?, + self, + coll_name + )) + } + /// Simplified version of `command` that returns the first document immediately. pub fn command_simple( &'a self, @@ -226,3 +255,13 @@ impl<'a> Drop for Database<'a> { } } } + +#[test] +fn test_get_coll_name_from_doc() { + let command = doc! {"find": "cursor_items"}; + assert_eq!("cursor_items", get_coll_name_from_doc(&command).unwrap()); + let command = doc! {"aggregate": "cursor_items"}; + assert_eq!("cursor_items", get_coll_name_from_doc(&command).unwrap()); + let command = doc! {"error": "cursor_items"}; + assert!(get_coll_name_from_doc(&command).is_err()); +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 4c1383a..3343ab3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,6 +31,11 @@ extern crate bson; #[macro_use] extern crate log; +#[macro_use] +extern crate serde_derive; +extern crate serde; + + use std::ffi::CStr; use std::ptr; use std::result; diff --git a/tests/cursor.rs b/tests/cursor.rs index f7b80d0..ea837d4 100644 --- a/tests/cursor.rs +++ b/tests/cursor.rs @@ -87,3 +87,49 @@ fn test_tailing_cursor() { // 15 results. assert_eq!(25, guard.join().expect("Thread failed")); } + +#[test] +fn test_batch_cursor() { + let uri = Uri::new("mongodb://localhost:27017/").unwrap(); + let pool = Arc::new(ClientPool::new(uri, None)); + let client = pool.pop(); + let database = client.get_database("rust_test"); + + const TEST_COLLECTION_NAME: &str = "test_batch_cursor"; + const NUM_TO_TEST: i32 = 10000; + + let mut collection = database.get_collection(TEST_COLLECTION_NAME); + if database.has_collection(TEST_COLLECTION_NAME).unwrap() { + collection.drop().unwrap(); // if prev test failed the old collection may still exist + } + + // add test rows. need many to exercise the batches + { + let bulk_operation = collection.create_bulk_operation(None); + + for i in 0..NUM_TO_TEST { + bulk_operation.insert(&doc!{"key": i}).unwrap(); + } + + let result = bulk_operation.execute(); + assert!(result.is_ok()); + + assert_eq!( + result.ok().unwrap().get("nInserted").unwrap(), // why is this an i32? + &bson::Bson::I32(NUM_TO_TEST) + ); + assert_eq!(NUM_TO_TEST as i64, collection.count(&doc!{}, None).unwrap()); + } + + { + let cur = database.command_batch(doc!{"find":TEST_COLLECTION_NAME},None); + let mut count = 0; + for doc in cur.unwrap() { + count += 1; + println!("doc: {:?}", doc ); + } + assert_eq!(count,NUM_TO_TEST); + } + + collection.drop().unwrap(); +} \ No newline at end of file From f6a4cefb64825a5cba4e3805675199f507d5d7e7 Mon Sep 17 00:00:00 2001 From: Michael Jansen Date: Tue, 1 May 2018 01:16:00 -0400 Subject: [PATCH 6/9] Add listIndexes to supported batch commands --- src/database.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/database.rs b/src/database.rs index 501f75b..a681b8c 100644 --- a/src/database.rs +++ b/src/database.rs @@ -28,7 +28,7 @@ pub enum CreatedBy<'a> { #[doc(hidden)] fn get_coll_name_from_doc(doc: &Document) -> Result { - const VALID_COMMANDS: &'static [&'static str] = &["find", "aggregate"]; + const VALID_COMMANDS: &'static [&'static str] = &["find", "aggregate", "listIndexes"]; for s in VALID_COMMANDS { if let Ok(val) = doc.get_str(s) { return Ok(val.to_owned()) From 77cbcf340da3752e6be6988af85fdf07e0d8e1ca Mon Sep 17 00:00:00 2001 From: Michael Jansen Date: Tue, 1 May 2018 16:39:05 -0400 Subject: [PATCH 7/9] * Improve documentation * add has_collection test * fix white spaces --- .gitignore | 1 + src/cursor.rs | 18 ++++++++++++++---- src/database.rs | 9 ++++----- src/lib.rs | 1 - tests/database.rs | 17 +++++++++++++++++ 5 files changed, 36 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index f806ce4..4a95729 100644 --- a/.gitignore +++ b/.gitignore @@ -2,5 +2,6 @@ target Cargo.lock mongoc-sys/mongo-c-driver* ssl_env_vars + .idea/ .DS_Store \ No newline at end of file diff --git a/src/cursor.rs b/src/cursor.rs index 30b7955..5f716d3 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -257,6 +257,15 @@ impl<'a> Iterator for TailingCursor<'a> { type DocArray = VecDeque; type CursorId = i64; +/// BatchCursor let's you iterate though batches of results +/// in a natural way without having to deal parsing each block +/// of 100 results from mongo. +/// +/// Specifically, this cursor hides the complexity of having to call +/// https://docs.mongodb.com/manual/reference/command/getMore/. This +/// allows you to have much cleaner user code. Only commands which +/// return batches work with this cursor. For example, find, aggregate, +/// and listIndexes all return batches. pub struct BatchCursor<'a> { cursor: Cursor<'a>, db: &'a Database<'a>, @@ -281,6 +290,8 @@ impl<'a> BatchCursor<'a> { } } + // internal function to reach the next batch of results from the mongo cursor + // and store them in the DocArray buffer fn get_cursor_next(&mut self) -> Option> { let item_opt = self.cursor.next(); if let Some(item_res) = item_opt { @@ -299,6 +310,8 @@ impl<'a> BatchCursor<'a> { None } + // internal function for pulling the next document from the documents buffer. + // this is the in memory representation of the documents we receive from each batch (DocArray) fn get_next_doc(&mut self) -> Option> { if let Some(ref mut docs) = self.documents { if docs.len() > 0 { @@ -344,7 +357,6 @@ impl<'a> Iterator for BatchCursor<'a> { type Item = Result; fn next(&mut self) -> Option { - // (1) try the local document buffer let res = self.get_next_doc(); if res.is_some() {return res;} @@ -368,6 +380,4 @@ impl<'a> Iterator for BatchCursor<'a> { } None } - - -} \ No newline at end of file +} diff --git a/src/database.rs b/src/database.rs index a681b8c..19c0f9f 100644 --- a/src/database.rs +++ b/src/database.rs @@ -26,7 +26,6 @@ pub enum CreatedBy<'a> { OwnedClient(Client<'a>) } -#[doc(hidden)] fn get_coll_name_from_doc(doc: &Document) -> Result { const VALID_COMMANDS: &'static [&'static str] = &["find", "aggregate", "listIndexes"]; for s in VALID_COMMANDS { @@ -60,8 +59,8 @@ impl<'a> Database<'a> { /// Execute a command on the database. /// This is performed lazily and therefore requires calling `next` on the resulting cursor. - /// Results are returned in batches as per the mongoc driver. - /// To get the next batch: https://docs.mongodb.com/manual/reference/command/getMore/ + /// if your are using a command like find or aggregate `command_batch` is likely + /// more convenient for you. pub fn command( &'a self, command: Document, @@ -103,7 +102,7 @@ impl<'a> Database<'a> { )) } - /// Execute a command on the database. + /// Execute a command on the database and returns a `BatchCursor` /// Automates the process of getting the next batch from getMore /// and parses the batch so only the result documents are returned. /// I am unsure of the best practices of when to use this or the CRUD function. @@ -221,7 +220,7 @@ impl<'a> Database<'a> { String::from_utf8_lossy(cstr.to_bytes()) } - /// Create a new collection in this database. + /// This function checks to see if a collection exists on the MongoDB server within database. pub fn has_collection>>( &self, name: S diff --git a/src/lib.rs b/src/lib.rs index 3343ab3..12c233a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,7 +35,6 @@ extern crate log; extern crate serde_derive; extern crate serde; - use std::ffi::CStr; use std::ptr; use std::result; diff --git a/tests/database.rs b/tests/database.rs index 8769a64..492a462 100644 --- a/tests/database.rs +++ b/tests/database.rs @@ -54,3 +54,20 @@ fn test_create_collection() { assert_eq!("created_collection", collection.get_name().to_mut()); } + +#[test] +fn test_has_collection() { + let uri = Uri::new("mongodb://localhost:27017/").unwrap(); + let pool = ClientPool::new(uri, None); + let client = pool.pop(); + let database = client.get_database("rust_test"); + database.get_collection("created_collection").drop().unwrap_or(()); + + let collection = database.create_collection( + "created_collection", + None + ).unwrap(); + + assert_eq!("created_collection", collection.get_name().to_mut()); + assert!(database.has_collection("created_collection").unwrap()); +} \ No newline at end of file From d5a3c53b0e364aed23fb48e5c89aef7efac7f6d8 Mon Sep 17 00:00:00 2001 From: Michael Jansen Date: Tue, 15 May 2018 16:46:49 -0400 Subject: [PATCH 8/9] drop test collection after test --- tests/database.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/database.rs b/tests/database.rs index 492a462..91941ab 100644 --- a/tests/database.rs +++ b/tests/database.rs @@ -63,11 +63,15 @@ fn test_has_collection() { let database = client.get_database("rust_test"); database.get_collection("created_collection").drop().unwrap_or(()); + const COLL_NAME: &'static str = "created_collection"; + let collection = database.create_collection( - "created_collection", + COLL_NAME, None ).unwrap(); - assert_eq!("created_collection", collection.get_name().to_mut()); - assert!(database.has_collection("created_collection").unwrap()); + assert_eq!(COLL_NAME, collection.get_name().to_mut()); + assert!(database.has_collection(COLL_NAME).unwrap()); + + database.command_simple(doc!{ "drop": COLL_NAME}, None).unwrap(); } \ No newline at end of file From e79047a18683ee8cd91f54bd7885bc9fe8c5569f Mon Sep 17 00:00:00 2001 From: Michael Jansen Date: Tue, 15 May 2018 17:00:18 -0400 Subject: [PATCH 9/9] avoid conflict with test_create_collection --- tests/database.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/database.rs b/tests/database.rs index 91941ab..50625ec 100644 --- a/tests/database.rs +++ b/tests/database.rs @@ -61,9 +61,10 @@ fn test_has_collection() { let pool = ClientPool::new(uri, None); let client = pool.pop(); let database = client.get_database("rust_test"); - database.get_collection("created_collection").drop().unwrap_or(()); - const COLL_NAME: &'static str = "created_collection"; + const COLL_NAME: &'static str = "created_collection2"; + + database.get_collection(COLL_NAME).drop().unwrap_or(()); let collection = database.create_collection( COLL_NAME, @@ -72,6 +73,4 @@ fn test_has_collection() { assert_eq!(COLL_NAME, collection.get_name().to_mut()); assert!(database.has_collection(COLL_NAME).unwrap()); - - database.command_simple(doc!{ "drop": COLL_NAME}, None).unwrap(); } \ No newline at end of file