Merge pull request #39 from MiesJansen/master

Added the ability to automatically manage getMore calls
pull/42/head
Thijs Cadier 7 years ago committed by GitHub
commit 1c927bdde4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

3
.gitignore vendored

@ -2,3 +2,6 @@ target
Cargo.lock Cargo.lock
mongoc-sys/mongo-c-driver* mongoc-sys/mongo-c-driver*
ssl_env_vars ssl_env_vars
.idea/
.DS_Store

@ -22,6 +22,8 @@ name = "tests"
libc = "^0.2" libc = "^0.2"
log = "^0.3" log = "^0.3"
bson = "^0.11" bson = "^0.11"
serde = "1.0"
serde_derive = "1.0"
[dependencies.mongoc-sys] [dependencies.mongoc-sys]
path = "mongoc-sys" path = "mongoc-sys"

@ -102,6 +102,7 @@ pub mod bindings {
pub fn mongoc_database_get_collection(database: *mut mongoc_database_t, name: *const ::libc::c_char) -> *mut mongoc_collection_t; pub fn mongoc_database_get_collection(database: *mut mongoc_database_t, name: *const ::libc::c_char) -> *mut mongoc_collection_t;
pub fn mongoc_database_get_name(database: *mut mongoc_database_t) -> *const ::libc::c_char; pub fn mongoc_database_get_name(database: *mut mongoc_database_t) -> *const ::libc::c_char;
pub fn mongoc_database_destroy(database: *mut mongoc_database_t) -> (); pub fn mongoc_database_destroy(database: *mut mongoc_database_t) -> ();
pub fn mongoc_database_has_collection(database: *mut mongoc_database_t, name: *const ::libc::c_char, error: *mut bson_error_t) -> ::libc::int32_t;
} }
// Client // Client

@ -4,9 +4,10 @@ use std::iter::Iterator;
use std::ptr; use std::ptr;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use std::collections::VecDeque;
use mongoc::bindings; use mongoc::bindings;
use bson::{Bson,Document,oid}; use bson::{self,Bson,Document,oid};
use super::BsoncError; use super::BsoncError;
use super::bsonc; use super::bsonc;
@ -15,6 +16,7 @@ use super::database::Database;
use super::flags::QueryFlag; use super::flags::QueryFlag;
use super::collection::{Collection,TailOptions}; use super::collection::{Collection,TailOptions};
use super::CommandAndFindOptions; use super::CommandAndFindOptions;
use super::MongoError::ValueAccessError;
use super::Result; use super::Result;
@ -251,3 +253,131 @@ impl<'a> Iterator for TailingCursor<'a> {
} }
} }
} }
type DocArray = VecDeque<Document>;
type CursorId = i64;
/// BatchCursor let's you iterate though batches of results
/// in a natural way without having to deal parsing each block
/// of 100 results from mongo.
///
/// Specifically, this cursor hides the complexity of having to call
/// https://docs.mongodb.com/manual/reference/command/getMore/. This
/// allows you to have much cleaner user code. Only commands which
/// return batches work with this cursor. For example, find, aggregate,
/// and listIndexes all return batches.
pub struct BatchCursor<'a> {
cursor: Cursor<'a>,
db: &'a Database<'a>,
coll_name: String,
cursor_id: Option<CursorId>,
documents: Option<DocArray>
}
impl<'a> BatchCursor<'a> {
pub fn new(
cursor: Cursor<'a>,
db: &'a Database<'a>,
coll_name: String
) -> BatchCursor<'a> {
BatchCursor {
cursor,
db,
coll_name,
cursor_id: None,
documents: None
}
}
// internal function to reach the next batch of results from the mongo cursor
// and store them in the DocArray buffer
fn get_cursor_next(&mut self) -> Option<Result<Document>> {
let item_opt = self.cursor.next();
if let Some(item_res) = item_opt {
if let Ok(item) = item_res {
let docs_ret = batch_to_array(item);
if let Ok(docs) = docs_ret {
self.documents = docs.0;
if docs.1.is_some() {self.cursor_id = docs.1}
let res = self.get_next_doc();
if res.is_some() { return res; }
} else {
return Some(Err(docs_ret.err().unwrap()));
}
}
}
None
}
// internal function for pulling the next document from the documents buffer.
// this is the in memory representation of the documents we receive from each batch (DocArray)
fn get_next_doc(&mut self) -> Option<Result<Document>> {
if let Some(ref mut docs) = self.documents {
if docs.len() > 0 {
let doc = docs.pop_front().unwrap();
return Some(Ok(doc));
}
}
None
}
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct CommandSimpleBatch {
id: CursorId,
first_batch: Option<DocArray>,
next_batch: Option<DocArray>
}
#[derive(Deserialize, Debug)]
struct CommandSimpleResult {
cursor: CommandSimpleBatch
}
fn batch_to_array(doc: Document) -> Result<(Option<DocArray>,Option<CursorId>)> {
let doc_result: Result<CommandSimpleResult> =
bson::from_bson(Bson::Document(doc.clone()))
.map_err(|err|
{
error!("cannot read batch from db: {}", err);
ValueAccessError(bson::ValueAccessError::NotPresent)
});
trace!("input: {}, result: {:?}", doc, doc_result);
doc_result.map(|v| {
if v.cursor.first_batch.is_some() {return (v.cursor.first_batch, Some(v.cursor.id));}
if v.cursor.next_batch.is_some() {return (v.cursor.next_batch, Some(v.cursor.id));}
(None,None)
})
}
impl<'a> Iterator for BatchCursor<'a> {
type Item = Result<Document>;
fn next(&mut self) -> Option<Self::Item> {
// (1) try the local document buffer
let res = self.get_next_doc();
if res.is_some() {return res;}
// (2) try next()
let res = self.get_cursor_next();
if res.is_some() {return res;}
// (3) try getMore
if let Some(cid) = self.cursor_id {
let command = doc! {
"getMore": cid as i64,
"collection": self.coll_name.clone()
};
let cur_result = self.db.command(command, None);
if let Ok(cur) = cur_result {
self.cursor = cur;
let res = self.get_cursor_next();
if res.is_some() { return res; }
}
}
None
}
}

@ -16,6 +16,7 @@ use super::collection;
use super::collection::Collection; use super::collection::Collection;
use super::cursor; use super::cursor;
use super::cursor::Cursor; use super::cursor::Cursor;
use super::cursor::BatchCursor;
use super::read_prefs::ReadPrefs; use super::read_prefs::ReadPrefs;
use flags::FlagsValue; use flags::FlagsValue;
@ -25,6 +26,16 @@ pub enum CreatedBy<'a> {
OwnedClient(Client<'a>) OwnedClient(Client<'a>)
} }
fn get_coll_name_from_doc(doc: &Document) -> Result<String> {
const VALID_COMMANDS: &'static [&'static str] = &["find", "aggregate", "listIndexes"];
for s in VALID_COMMANDS {
if let Ok(val) = doc.get_str(s) {
return Ok(val.to_owned())
}
}
Err(InvalidParamsError.into())
}
/// Provides access to a MongoDB database. /// Provides access to a MongoDB database.
/// ///
/// A database instance can be created by calling `get_database` or `take_database` on a `Client` instance. /// A database instance can be created by calling `get_database` or `take_database` on a `Client` instance.
@ -48,6 +59,8 @@ impl<'a> Database<'a> {
/// Execute a command on the database. /// Execute a command on the database.
/// This is performed lazily and therefore requires calling `next` on the resulting cursor. /// This is performed lazily and therefore requires calling `next` on the resulting cursor.
/// if your are using a command like find or aggregate `command_batch` is likely
/// more convenient for you.
pub fn command( pub fn command(
&'a self, &'a self,
command: Document, command: Document,
@ -56,8 +69,8 @@ impl<'a> Database<'a> {
assert!(!self.inner.is_null()); assert!(!self.inner.is_null());
let default_options = CommandAndFindOptions::default(); let default_options = CommandAndFindOptions::default();
let options = options.unwrap_or(&default_options); let options = options.unwrap_or(&default_options);
let fields_bsonc = options.fields_bsonc(); let fields_bsonc = options.fields_bsonc();
let cursor_ptr = unsafe { let cursor_ptr = unsafe {
bindings::mongoc_database_command( bindings::mongoc_database_command(
@ -89,6 +102,23 @@ impl<'a> Database<'a> {
)) ))
} }
/// Execute a command on the database and returns a `BatchCursor`
/// Automates the process of getting the next batch from getMore
/// and parses the batch so only the result documents are returned.
/// I am unsure of the best practices of when to use this or the CRUD function.
pub fn command_batch(
&'a self,
command: Document,
options: Option<&CommandAndFindOptions>
) -> Result<BatchCursor<'a>> {
let coll_name = get_coll_name_from_doc(&command)?;
Ok(BatchCursor::new(
self.command(command, options)?,
self,
coll_name
))
}
/// Simplified version of `command` that returns the first document immediately. /// Simplified version of `command` that returns the first document immediately.
pub fn command_simple( pub fn command_simple(
&'a self, &'a self,
@ -189,6 +219,31 @@ impl<'a> Database<'a> {
}; };
String::from_utf8_lossy(cstr.to_bytes()) String::from_utf8_lossy(cstr.to_bytes())
} }
/// This function checks to see if a collection exists on the MongoDB server within database.
pub fn has_collection<S: Into<Vec<u8>>>(
&self,
name: S
) -> Result<bool> {
let mut error = BsoncError::empty();
let name_cstring = CString::new(name).unwrap();
let has_collection = unsafe {
bindings::mongoc_database_has_collection(
self.inner,
name_cstring.as_ptr(),
error.mut_inner())
};
if error.is_empty() {
Ok(match has_collection{
0 => false,
_ => true
})
} else {
Err(error.into())
}
}
} }
impl<'a> Drop for Database<'a> { impl<'a> Drop for Database<'a> {
@ -199,3 +254,13 @@ impl<'a> Drop for Database<'a> {
} }
} }
} }
#[test]
fn test_get_coll_name_from_doc() {
let command = doc! {"find": "cursor_items"};
assert_eq!("cursor_items", get_coll_name_from_doc(&command).unwrap());
let command = doc! {"aggregate": "cursor_items"};
assert_eq!("cursor_items", get_coll_name_from_doc(&command).unwrap());
let command = doc! {"error": "cursor_items"};
assert!(get_coll_name_from_doc(&command).is_err());
}

@ -31,6 +31,10 @@ extern crate bson;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use]
extern crate serde_derive;
extern crate serde;
use std::ffi::CStr; use std::ffi::CStr;
use std::ptr; use std::ptr;
use std::result; use std::result;

@ -87,3 +87,49 @@ fn test_tailing_cursor() {
// 15 results. // 15 results.
assert_eq!(25, guard.join().expect("Thread failed")); assert_eq!(25, guard.join().expect("Thread failed"));
} }
#[test]
fn test_batch_cursor() {
let uri = Uri::new("mongodb://localhost:27017/").unwrap();
let pool = Arc::new(ClientPool::new(uri, None));
let client = pool.pop();
let database = client.get_database("rust_test");
const TEST_COLLECTION_NAME: &str = "test_batch_cursor";
const NUM_TO_TEST: i32 = 10000;
let mut collection = database.get_collection(TEST_COLLECTION_NAME);
if database.has_collection(TEST_COLLECTION_NAME).unwrap() {
collection.drop().unwrap(); // if prev test failed the old collection may still exist
}
// add test rows. need many to exercise the batches
{
let bulk_operation = collection.create_bulk_operation(None);
for i in 0..NUM_TO_TEST {
bulk_operation.insert(&doc!{"key": i}).unwrap();
}
let result = bulk_operation.execute();
assert!(result.is_ok());
assert_eq!(
result.ok().unwrap().get("nInserted").unwrap(), // why is this an i32?
&bson::Bson::I32(NUM_TO_TEST)
);
assert_eq!(NUM_TO_TEST as i64, collection.count(&doc!{}, None).unwrap());
}
{
let cur = database.command_batch(doc!{"find":TEST_COLLECTION_NAME},None);
let mut count = 0;
for doc in cur.unwrap() {
count += 1;
println!("doc: {:?}", doc );
}
assert_eq!(count,NUM_TO_TEST);
}
collection.drop().unwrap();
}

@ -54,3 +54,23 @@ fn test_create_collection() {
assert_eq!("created_collection", collection.get_name().to_mut()); assert_eq!("created_collection", collection.get_name().to_mut());
} }
#[test]
fn test_has_collection() {
let uri = Uri::new("mongodb://localhost:27017/").unwrap();
let pool = ClientPool::new(uri, None);
let client = pool.pop();
let database = client.get_database("rust_test");
const COLL_NAME: &'static str = "created_collection2";
database.get_collection(COLL_NAME).drop().unwrap_or(());
let collection = database.create_collection(
COLL_NAME,
None
).unwrap();
assert_eq!(COLL_NAME, collection.get_name().to_mut());
assert!(database.has_collection(COLL_NAME).unwrap());
}
Loading…
Cancel
Save