From 13273b775590ef9043f2721a68de53a7d1f232a6 Mon Sep 17 00:00:00 2001 From: Thijs Cadier Date: Sat, 4 Jun 2016 15:49:15 +0200 Subject: [PATCH] Support for aggregations --- Cargo.toml | 2 +- mongoc-sys/Cargo.toml | 2 +- mongoc-sys/build.rs | 2 +- mongoc-sys/src/lib.rs | 1 + src/collection.rs | 62 +++++++++++++++++++++++++++++++++++++++++++ tests/collection.rs | 28 +++++++++++++++++++ 6 files changed, 94 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e070b0e..b7b5ff9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ bson = "> 0.1.0" [dependencies.mongoc-sys] path = "mongoc-sys" -version = "1.3.5" +version = "1.3.6" [dev-dependencies] chrono = "> 0.2.0" diff --git a/mongoc-sys/Cargo.toml b/mongoc-sys/Cargo.toml index 513015c..7a8e79b 100644 --- a/mongoc-sys/Cargo.toml +++ b/mongoc-sys/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mongoc-sys" -version = "1.3.5" +version = "1.3.6" description = "Sys package with installer and bindings for mongoc" authors = ["Thijs Cadier "] build = "build.rs" diff --git a/mongoc-sys/build.rs b/mongoc-sys/build.rs index 69f48b5..b482978 100644 --- a/mongoc-sys/build.rs +++ b/mongoc-sys/build.rs @@ -2,7 +2,7 @@ use std::env; use std::path::Path; use std::process::Command; -static VERSION: &'static str = "1.3.5"; // Should be the same as the version in the manifest +static VERSION: &'static str = "1.3.5"; // Should be the same major version as in the manifest fn main() { let out_dir_var = env::var("OUT_DIR").unwrap(); diff --git a/mongoc-sys/src/lib.rs b/mongoc-sys/src/lib.rs index 8e059cc..f6bdbcb 100644 --- a/mongoc-sys/src/lib.rs +++ b/mongoc-sys/src/lib.rs @@ -138,6 +138,7 @@ pub mod bindings { pub type mongoc_remove_flags_t = ::libc::c_uint; pub type mongoc_update_flags_t = ::libc::c_uint; extern "C" { + pub fn mongoc_collection_aggregate(collection: *mut mongoc_collection_t, flags: mongoc_query_flags_t, pipeline: *const bson_t, options: *const bson_t, read_prefs: *const mongoc_read_prefs_t) -> *mut mongoc_cursor_t; pub fn mongoc_collection_command(collection: *mut mongoc_collection_t, flags: mongoc_query_flags_t, skip: uint32_t, limit: uint32_t, batch_size: uint32_t, command: *const bson_t, fields: *const bson_t, read_prefs: *const mongoc_read_prefs_t) -> *mut mongoc_cursor_t; pub fn mongoc_collection_command_simple(collection: *mut mongoc_collection_t, command: *const bson_t, read_prefs: *const mongoc_read_prefs_t, reply: *mut bson_t, error: *mut bson_error_t) -> u8; pub fn mongoc_collection_count_with_opts(collection: *mut mongoc_collection_t, flags: mongoc_query_flags_t, query: *const bson_t, skip: int64_t, limit: int64_t, opts: *const bson_t, read_prefs: *const mongoc_read_prefs_t, error: *mut bson_error_t) -> int64_t; diff --git a/src/collection.rs b/src/collection.rs index 3310641..f18879e 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -41,6 +41,28 @@ pub struct Collection<'a> { inner: *mut bindings::mongoc_collection_t } +/// Options to configure an aggregate operation. +pub struct AggregateOptions { + /// Flags to use + pub query_flags: Flags, + /// Options for the aggregate + pub options: Option, + /// Read prefs to use + pub read_prefs: Option +} + +impl AggregateOptions { + /// Default options that are used if no options are specified + /// when aggregating. + pub fn default() -> AggregateOptions { + AggregateOptions { + query_flags: Flags::new(), + options: None, + read_prefs: None + } + } +} + /// Options to configure a bulk operation. pub struct BulkOperationOptions { /// If the operations must be performed in order @@ -210,6 +232,46 @@ impl<'a> Collection<'a> { } } + /// Execute an aggregation query on the collection. + /// The bson 'pipeline' is not validated, simply passed along as appropriate to the server. + /// As such, compatibility and errors should be validated in the appropriate server documentation. + pub fn aggregate( + &'a self, + pipeline: &Document, + options: Option<&AggregateOptions> + ) -> Result> { + let default_options = AggregateOptions::default(); + let options = options.unwrap_or(&default_options); + + let cursor_ptr = unsafe { + bindings::mongoc_collection_aggregate( + self.inner, + options.query_flags.flags(), + try!(Bsonc::from_document(pipeline)).inner(), + match options.options { + Some(ref o) => { + try!(Bsonc::from_document(o)).inner() + }, + None => ptr::null() + }, + match options.read_prefs { + Some(ref prefs) => prefs.inner(), + None => ptr::null() + } + ) + }; + + if cursor_ptr.is_null() { + return Err(InvalidParamsError.into()) + } + + Ok(Cursor::new( + cursor::CreatedBy::Collection(self), + cursor_ptr, + None + )) + } + /// Execute a command on the collection. /// This is performed lazily and therefore requires calling `next` on the resulting cursor. pub fn command( diff --git a/tests/collection.rs b/tests/collection.rs index 2e52a0b..8225c5d 100644 --- a/tests/collection.rs +++ b/tests/collection.rs @@ -5,6 +5,34 @@ use mongo_driver::collection::{CountOptions,FindAndModifyOperation}; use mongo_driver::client::{ClientPool,Uri}; use mongo_driver::flags; +#[test] +fn test_aggregate() { + let uri = Uri::new("mongodb://localhost:27017/").unwrap(); + let pool = ClientPool::new(uri, None); + let client = pool.pop(); + let mut collection = client.get_collection("rust_driver_test", "aggregate"); + collection.drop().unwrap_or(()); + + for _ in 0..5 { + assert!(collection.insert(&doc!{"key" => 1}, None).is_ok()); + } + + let pipeline = doc!{ + "pipeline" => [ + { + "$group" => { + "_id" => "$key", + "total" => {"$sum" => "$key"} + } + } + ] + }; + + let total = collection.aggregate(&pipeline, None).unwrap().next().unwrap().unwrap(); + + assert_eq!(Ok(5), total.get_i32("total")); +} + #[test] fn test_command() { let uri = Uri::new("mongodb://localhost:27017/").unwrap();