Support for tailing queries

Add a tail convenience method that returns a tailing cursor
that implements the recommended way of tailing and reconnecting
in it's iterator implementation.

Also move optional arguments to separate type that can optionally be
added to the various methods.
pull/2/head
Thijs Cadier 10 years ago
parent c7a2b78016
commit b12196a41e

@ -4,14 +4,14 @@ use std::borrow::Cow;
use mongo_c_driver_wrapper::bindings;
use bson::Document;
use bson::{Bson,Document};
use super::Result;
use super::{BsoncError,InvalidParamsError};
use super::bsonc::Bsonc;
use super::client::Client;
use super::cursor;
use super::cursor::Cursor;
use super::cursor::{Cursor,TailingCursor};
use super::database::Database;
use super::flags::{Flags,FlagsValue,InsertFlag,QueryFlag,RemoveFlag};
use super::write_concern::WriteConcern;
@ -27,6 +27,90 @@ pub struct Collection<'a> {
inner: *mut bindings::mongoc_collection_t
}
pub struct CountOptions {
pub query_flags: Flags<QueryFlag>,
pub skip: u32,
pub limit: u32,
pub opts: Option<Document>,
pub read_prefs: Option<ReadPrefs>
}
impl CountOptions {
pub fn default() -> CountOptions {
CountOptions {
query_flags: Flags::new(),
skip: 0,
limit: 0,
opts: None,
read_prefs: None
}
}
}
pub struct FindOptions {
pub query_flags: Flags<QueryFlag>,
pub skip: u32,
pub limit: u32,
pub batch_size: u32,
pub fields: Option<Document>,
pub read_prefs: Option<ReadPrefs>
}
impl FindOptions {
pub fn default() -> FindOptions {
FindOptions {
query_flags: Flags::new(),
skip: 0,
limit: 0,
batch_size: 0,
fields: None,
read_prefs: None
}
}
}
pub struct InsertOptions {
pub insert_flags: Flags<InsertFlag>,
pub write_concern: WriteConcern
}
impl InsertOptions {
pub fn default() -> InsertOptions {
InsertOptions {
insert_flags: Flags::new(),
write_concern: WriteConcern::new()
}
}
}
pub struct RemoveOptions {
pub remove_flags: Flags<RemoveFlag>,
pub write_concern: WriteConcern
}
impl RemoveOptions {
pub fn default() -> RemoveOptions {
RemoveOptions {
remove_flags: Flags::new(),
write_concern: WriteConcern::new()
}
}
}
pub struct TailOptions {
pub wait_time_ms: u32,
pub max_retries: u32
}
impl TailOptions {
pub fn default() -> TailOptions {
TailOptions {
wait_time_ms: 500,
max_retries: 5
}
}
}
impl<'a> Collection<'a> {
pub fn new(
created_by: CreatedBy<'a>,
@ -39,31 +123,30 @@ impl<'a> Collection<'a> {
}
}
pub fn count_with_options(
pub fn count(
&self,
query_flags: &Flags<QueryFlag>,
query: &Document,
skip: u32,
limit: u32,
opts: Option<&Document>,
read_prefs: Option<&ReadPrefs>
options: Option<&CountOptions>
) -> Result<i64> {
assert!(!self.inner.is_null());
let default_options = CountOptions::default();
let options = options.unwrap_or(&default_options);
let mut error = BsoncError::empty();
let count = unsafe {
bindings::mongoc_collection_count_with_opts(
self.inner,
query_flags.flags(),
options.query_flags.flags(),
try!(Bsonc::from_document(query)).inner(),
skip as i64,
limit as i64,
match opts {
Some(o) => try!(Bsonc::from_document(o)).inner(),
options.skip as i64,
options.limit as i64,
match options.opts {
Some(ref o) => try!(Bsonc::from_document(o)).inner(),
None => ptr::null()
},
match read_prefs {
Some(prefs) => prefs.inner(),
match options.read_prefs {
Some(ref prefs) => prefs.inner(),
None => ptr::null()
},
error.mut_inner()
@ -77,20 +160,6 @@ impl<'a> Collection<'a> {
}
}
pub fn count(
&self,
query: &Document
) -> Result<i64> {
self.count_with_options(
&Flags::new(),
query,
0,
0,
None,
None
)
}
pub fn drop(&mut self) -> Result<()> {
assert!(!self.inner.is_null());
let mut error = BsoncError::empty();
@ -107,34 +176,32 @@ impl<'a> Collection<'a> {
Ok(())
}
pub fn find_with_options(
pub fn find(
&'a self,
query_flags: &Flags<QueryFlag>,
skip: u32,
limit: u32,
batch_size: u32,
query: &Document,
fields: Option<&Document>,
read_prefs: Option<&ReadPrefs>
options: Option<&FindOptions>
) -> Result<Cursor<'a>> {
assert!(!self.inner.is_null());
let default_options = FindOptions::default();
let options = options.unwrap_or(&default_options);
let inner = unsafe {
bindings::mongoc_collection_find(
self.inner,
query_flags.flags(),
skip,
limit,
batch_size,
options.query_flags.flags(),
options.skip,
options.limit,
options.batch_size,
try!(Bsonc::from_document(query)).inner(),
match fields {
Some(f) => {
match options.fields {
Some(ref f) => {
try!(Bsonc::from_document(f)).inner()
},
None => ptr::null()
},
match read_prefs {
Some(prefs) => prefs.inner(),
match options.read_prefs {
Some(ref prefs) => prefs.inner(),
None => ptr::null()
}
)
@ -147,21 +214,6 @@ impl<'a> Collection<'a> {
Ok(Cursor::new(cursor::CreatedBy::Collection(self), inner))
}
pub fn find(
&'a self,
query: &Document
) -> Result<Cursor<'a>> {
self.find_with_options(
&Flags::new(),
0,
0,
0,
&query,
None,
None
)
}
pub fn get_name(&self) -> Cow<str> {
let cstr = unsafe {
CStr::from_ptr(bindings::mongoc_collection_get_name(self.inner))
@ -169,21 +221,23 @@ impl<'a> Collection<'a> {
String::from_utf8_lossy(cstr.to_bytes())
}
pub fn insert_with_options(
pub fn insert(
&'a self,
insert_flags: &Flags<InsertFlag>,
document: &Document,
write_concern: &WriteConcern
options: Option<&InsertOptions>
) -> Result<()> {
assert!(!self.inner.is_null());
let default_options = InsertOptions::default();
let options = options.unwrap_or(&default_options);
let mut error = BsoncError::empty();
let success = unsafe {
bindings::mongoc_collection_insert(
self.inner,
insert_flags.flags(),
options.insert_flags.flags(),
try!(Bsonc::from_document(&document)).inner(),
write_concern.inner(),
options.write_concern.inner(),
error.mut_inner()
)
};
@ -195,29 +249,23 @@ impl<'a> Collection<'a> {
}
}
pub fn insert(&'a self, document: &Document) -> Result<()> {
self.insert_with_options(
&Flags::new(),
document,
&WriteConcern::new()
)
}
pub fn remove_with_options(
pub fn remove(
&self,
remove_flags: &Flags<RemoveFlag>,
selector: &Document,
write_concern: &WriteConcern
options: Option<&RemoveOptions>
) -> Result<()> {
assert!(!self.inner.is_null());
let default_options = RemoveOptions::default();
let options = options.unwrap_or(&default_options);
let mut error = BsoncError::empty();
let success = unsafe {
bindings::mongoc_collection_remove(
self.inner,
remove_flags.flags(),
options.remove_flags.flags(),
try!(Bsonc::from_document(&selector)).inner(),
write_concern.inner(),
options.write_concern.inner(),
error.mut_inner()
)
};
@ -229,24 +277,16 @@ impl<'a> Collection<'a> {
}
}
pub fn remove(
&self,
selector: &Document
) -> Result<()> {
self.remove_with_options(
&Flags::new(),
selector,
&WriteConcern::new()
)
}
pub fn save_with_options(
pub fn save(
&self,
document: &Document,
write_concern: &WriteConcern
write_concern: Option<&WriteConcern>
) -> Result<()> {
assert!(!self.inner.is_null());
let default_write_concern = WriteConcern::new();
let write_concern = write_concern.unwrap_or(&default_write_concern);
let mut error = BsoncError::empty();
let success = unsafe {
bindings::mongoc_collection_save(
@ -264,13 +304,37 @@ impl<'a> Collection<'a> {
}
}
pub fn save(
&self,
document: &Document,
) -> Result<()> {
self.save_with_options(
document,
&WriteConcern::new()
/// Tails a query
///
/// Takes ownership of query and options because they could be
/// modified and reused when the connections is disrupted and
/// we need to restart the query. The query will be placed in a
/// $query key, so the function can add configuration needed for
/// proper tailing.
///
/// The query is executed when iterating, so this function doesn't
/// return a result itself.
///
/// The necessary flags to configure a tailing query will be added
/// to the configured flags if you choose to supply options.
pub fn tail(
&'a self,
query: Document,
find_options: Option<FindOptions>,
tail_options: Option<TailOptions>
) -> TailingCursor<'a> {
let mut query_with_options = Document::new();
query_with_options.insert(
"$query".to_string(),
Bson::Document(query)
);
query_with_options.insert("$natural".to_string(), Bson::I32(1));
TailingCursor::new(
self,
query_with_options,
find_options.unwrap_or(FindOptions::default()),
tail_options.unwrap_or(TailOptions::default())
)
}
}
@ -304,23 +368,23 @@ mod tests {
let mut document = bson::Document::new();
document.insert("key_1".to_string(), bson::Bson::String("Value 1".to_string()));
document.insert("key_2".to_string(), bson::Bson::String("Value 2".to_string()));
assert!(collection.insert(&document).is_ok());
assert!(collection.insert(&document, None).is_ok());
let mut second_document = bson::Document::new();
second_document.insert("key_1".to_string(), bson::Bson::String("Value 3".to_string()));
assert!(collection.insert(&second_document).is_ok());
assert!(collection.insert(&second_document, None).is_ok());
let query = bson::Document::new();
// Count the documents in the collection
assert_eq!(2, collection.count(&query).unwrap());
assert_eq!(2, collection.count(&query, None).unwrap());
// Find the documents
assert_eq!(
collection.find(&document).unwrap().next().unwrap().unwrap().get("key_1").unwrap().to_json(),
collection.find(&document, None).unwrap().next().unwrap().unwrap().get("key_1").unwrap().to_json(),
bson::Bson::String("Value 1".to_string()).to_json()
);
let mut found_document = collection.find(&second_document).unwrap().next().unwrap().unwrap();
let mut found_document = collection.find(&second_document, None).unwrap().next().unwrap().unwrap();
assert_eq!(
found_document.get("key_1").unwrap().to_json(),
bson::Bson::String("Value 3".to_string()).to_json()
@ -328,42 +392,42 @@ mod tests {
// Update the second document
found_document.insert("key_1".to_string(), bson::Bson::String("Value 4".to_string()));
assert!(collection.save(&found_document).is_ok());
assert!(collection.save(&found_document, None).is_ok());
// Reload and check value
let found_document = collection.find(&found_document).unwrap().next().unwrap().unwrap();
let found_document = collection.find(&found_document, None).unwrap().next().unwrap().unwrap();
assert_eq!(
found_document.get("key_1").unwrap().to_json(),
bson::Bson::String("Value 4".to_string()).to_json()
);
// Remove one
assert!(collection.remove(&found_document).is_ok());
assert!(collection.remove(&found_document, None).is_ok());
// Count again
assert_eq!(1, collection.count(&query).unwrap());
assert_eq!(1, collection.count(&query, None).unwrap());
// Find the document and see if it has the keys we expect
{
let mut cursor = collection.find(&query).unwrap();
let mut cursor = collection.find(&query, None).unwrap();
let next_document = cursor.next().unwrap().unwrap();
assert!(next_document.contains_key("key_1"));
assert!(next_document.contains_key("key_2"));
}
// Find the document with fields set
{
let mut fields = bson::Document::new();
fields.insert("key_1".to_string(), bson::Bson::Boolean(true));
{
let mut cursor = collection.find_with_options(
&flags::Flags::new(),
0,
0,
0,
&query,
Some(&fields),
None
).unwrap();
let options = super::FindOptions {
query_flags: flags::Flags::new(),
skip: 0,
limit: 0,
batch_size: 0,
fields: Some(fields),
read_prefs: None
};
let mut cursor = collection.find(&query, Some(&options)).unwrap();
let next_document = cursor.next().unwrap().unwrap();
assert!(next_document.contains_key("key_1"));
assert!(!next_document.contains_key("key_2"));
@ -371,7 +435,7 @@ mod tests {
// Drop collection
collection.drop().unwrap();
assert_eq!(0, collection.count(&query).unwrap());
assert_eq!(0, collection.count(&query, None).unwrap());
}
#[test]
@ -382,7 +446,7 @@ mod tests {
let collection = client.get_collection("rust_driver_test", "items");
let document = bson::Document::new();
let result = collection.insert(&document);
let result = collection.insert(&document, None);
assert!(result.is_err());
assert_eq!(
"MongoError (BsoncError: Failed to connect to target host: localhost:27018)",

@ -3,12 +3,13 @@ use std::ptr;
use std::thread;
use mongo_c_driver_wrapper::bindings;
use bson::Document;
use bson::{Bson,Document};
use super::BsoncError;
use super::bsonc;
use super::client::Client;
use super::collection::Collection;
use super::flags::QueryFlag;
use super::collection::{Collection,FindOptions,TailOptions};
use super::Result;
@ -20,6 +21,8 @@ pub enum CreatedBy<'a> {
pub struct Cursor<'a> {
_created_by: CreatedBy<'a>,
inner: *mut bindings::mongoc_cursor_t,
tailing: bool,
tail_wait_time_ms: u32
}
impl<'a> Cursor<'a> {
@ -30,7 +33,9 @@ impl<'a> Cursor<'a> {
assert!(!inner.is_null());
Cursor {
_created_by: created_by,
inner: inner
inner: inner,
tailing: false,
tail_wait_time_ms: 0
}
}
@ -88,15 +93,14 @@ impl<'a> Iterator for Cursor<'a> {
if success == 0 {
if error.is_empty() {
if self.is_alive() {
// Since there was no error and the cursor is
// alive this must be a tailing cursor and we'll
// wait for 500ms before trying again.
thread::sleep_ms(500);
if self.tailing && self.is_alive() {
// Since there was no error, this is a tailing cursor
// and the cursor is alive we'll wait before trying again.
thread::sleep_ms(self.tail_wait_time_ms);
continue;
} else {
// No result, no error and cursor not alive anymore
// so we must be at the end.
// No result, no error and cursor not tailing so we must
// be at the end.
return None
}
} else {
@ -126,11 +130,108 @@ impl<'a> Drop for Cursor<'a> {
}
}
/// Cursor that will reconnect and resume tailing a collection
/// at the right point if the connection fails.
pub struct TailingCursor<'a> {
collection: &'a Collection<'a>,
query: Document,
find_options: FindOptions,
tail_options: TailOptions,
cursor: Option<Cursor<'a>>,
last_seen_id: Option<[u8; 12]>,
retry_count: u32
}
impl<'a> TailingCursor<'a> {
pub fn new(
collection: &'a Collection<'a>,
query: Document,
find_options: FindOptions,
tail_options: TailOptions
) -> TailingCursor<'a> {
// Add flags to make query tailable
let mut find_options = find_options;
find_options.query_flags.add(QueryFlag::TailableCursor);
find_options.query_flags.add(QueryFlag::AwaitData);
TailingCursor {
collection: collection,
query: query,
find_options: find_options,
tail_options: tail_options,
cursor: None,
last_seen_id: None,
retry_count: 0
}
}
}
impl<'a> Iterator for TailingCursor<'a> {
type Item = Result<Document>;
fn next(&mut self) -> Option<Self::Item> {
loop {
// Start a scope so we're free to set the cursor to None at the end.
{
if self.cursor.is_none() {
// Add the last seen id to the query if it's present.
match self.last_seen_id {
Some(id) => {
let mut gt_id = Document::new();
gt_id.insert("$gt".to_string(), Bson::ObjectId(id));
self.query.insert("_id".to_string(), Bson::Document(gt_id));
},
None => ()
};
// Set the cursor
self.cursor = match self.collection.find(&self.query, Some(&self.find_options)) {
Ok(mut c) => {
c.tailing = true;
c.tail_wait_time_ms = self.tail_options.wait_time_ms;
Some(c)
},
Err(e) => return Some(Err(e.into()))
};
}
let cursor = match self.cursor {
Some(ref mut c) => c,
None => panic!("It should be impossible to not have a cursor here")
};
match cursor.next() {
Some(next_result) => {
match next_result {
Ok(next) => {
// This was successfull, so reset retry count and return result.
self.retry_count = 0;
return Some(Ok(next))
},
Err(e) => {
// Retry if we haven't exceeded the maximum number of retries.
if self.retry_count >= self.tail_options.max_retries {
return Some(Err(e.into()))
}
}
}
},
None => ()
};
}
// We made it to the end, so we weren't able to get the next item from
// the cursor. We need to reconnect in the next iteration of the loop.
self.retry_count += 1;
self.cursor = None;
}
}
}
#[cfg(test)]
mod tests {
use std::thread;
use bson;
use super::super::flags;
use super::super::uri::Uri;
use super::super::client::ClientPool;
use super::super::Result;
@ -145,13 +246,13 @@ mod tests {
let mut document = bson::Document::new();
document.insert("key".to_string(), bson::Bson::String("value".to_string()));
collection.drop().unwrap();
collection.drop().unwrap_or(());
for _ in 0..10 {
assert!(collection.insert(&document).is_ok());
assert!(collection.insert(&document, None).is_ok());
}
let query = bson::Document::new();
let cursor = collection.find(&query).unwrap();
let cursor = collection.find(&query, None).unwrap();
assert!(cursor.is_alive());
@ -178,20 +279,8 @@ mod tests {
let capped_collection = database.create_collection("capped", Some(&options)).unwrap();
let normal_collection = database.create_collection("not_capped", None).unwrap();
let mut flags = flags::Flags::new();
flags.add(flags::QueryFlag::TailableCursor);
flags.add(flags::QueryFlag::AwaitData);
// Try to tail on a normal collection
let failing_cursor = normal_collection.find_with_options(
&flags,
0,
0,
0,
&bson::Document::new(),
None,
None
).unwrap();
let failing_cursor = normal_collection.tail(bson::Document::new(), None, None);
let failing_result = failing_cursor.into_iter().next().unwrap();
assert!(failing_result.is_err());
assert_eq!(
@ -201,32 +290,20 @@ mod tests {
let mut document = bson::Document::new();
document.insert("key_1".to_string(), bson::Bson::String("Value 1".to_string()));
// Insert some documents into the collection
for _ in 0..5 {
capped_collection.insert(&document).unwrap();
}
// Insert a first document into the collection
capped_collection.insert(&document, None).unwrap();
// Start a tailing iterator in a thread
let cloned_pool = pool.clone();
let guard = thread::spawn(move || {
let client = cloned_pool.pop();
let collection = client.get_collection("rust_test", "capped");
let cursor = collection.find_with_options(
&flags,
0,
0,
0,
&bson::Document::new(),
None,
None
).unwrap();
let cursor = collection.tail(bson::Document::new(), None, None);
let mut counter = 0usize;
for result in cursor.into_iter() {
assert!(result.is_ok());
counter += 1;
if counter == 15 {
if counter == 25 {
break;
}
}
@ -234,16 +311,16 @@ mod tests {
});
// Wait for the thread to boot up
thread::sleep_ms(200);
thread::sleep_ms(250);
// Insert some more documents into the collection
for _ in 0..10 {
capped_collection.insert(&document).unwrap();
for _ in 0..25 {
capped_collection.insert(&document, None).unwrap();
}
// See if they appeared while iterating the cursor
// The for loop returns whenever we get more than
// 15 results.
assert_eq!(15, guard.join().unwrap());
assert_eq!(25, guard.join().unwrap());
}
}

@ -1,18 +1,20 @@
use mongo_c_driver_wrapper::bindings;
use std::collections::BTreeSet;
pub struct Flags<T> {
flags: Vec<T>
flags: BTreeSet<T>
}
impl <T> Flags<T> {
impl <T> Flags<T> where T: Ord {
pub fn new() -> Flags<T> {
Flags {
flags: Vec::new()
flags: BTreeSet::new()
}
}
pub fn add(&mut self, flag: T) {
self.flags.push(flag);
self.flags.insert(flag);
}
}
@ -22,6 +24,7 @@ pub trait FlagsValue {
/// Flags for insert operations
/// See: http://api.mongodb.org/c/current/mongoc_insert_flags_t.html
#[derive(Eq,PartialEq,Ord,PartialOrd)]
pub enum InsertFlag {
ContinueOnError,
NoValidate
@ -44,6 +47,7 @@ impl FlagsValue for Flags<InsertFlag> {
/// Flags for query operations
/// See: http://api.mongodb.org/c/current/mongoc_query_flags_t.html
#[derive(Eq,PartialEq,Ord,PartialOrd)]
pub enum QueryFlag {
TailableCursor,
SlaveOk,
@ -76,6 +80,7 @@ impl FlagsValue for Flags<QueryFlag> {
/// Flags for deletion operations
/// See: http://api.mongodb.org/c/1.1.8/mongoc_remove_flags_t.html
#[derive(Eq,PartialEq,Ord,PartialOrd)]
pub enum RemoveFlag {
SingleRemove
}
@ -102,6 +107,7 @@ mod tests {
flags.add(super::InsertFlag::ContinueOnError);
assert_eq!(1, flags.flags());
flags.add(super::InsertFlag::NoValidate);
flags.add(super::InsertFlag::NoValidate);
assert_eq!(31, flags.flags());
}
@ -114,7 +120,18 @@ mod tests {
flags.add(super::QueryFlag::TailableCursor);
assert_eq!(2, flags.flags());
flags.add(super::QueryFlag::Partial);
flags.add(super::QueryFlag::Partial);
assert_eq!(130, flags.flags());
}
#[test]
pub fn test_remove_flags() {
let mut flags = super::Flags::new();
assert_eq!(0, flags.flags());
flags.add(super::RemoveFlag::SingleRemove);
flags.add(super::RemoveFlag::SingleRemove);
assert_eq!(1, flags.flags());
}
}

Loading…
Cancel
Save