Support for aggregations

pull/18/head
Thijs Cadier 9 years ago
parent 480c7cc170
commit 13273b7755

@ -19,7 +19,7 @@ bson = "> 0.1.0"
[dependencies.mongoc-sys] [dependencies.mongoc-sys]
path = "mongoc-sys" path = "mongoc-sys"
version = "1.3.5" version = "1.3.6"
[dev-dependencies] [dev-dependencies]
chrono = "> 0.2.0" chrono = "> 0.2.0"

@ -1,6 +1,6 @@
[package] [package]
name = "mongoc-sys" name = "mongoc-sys"
version = "1.3.5" version = "1.3.6"
description = "Sys package with installer and bindings for mongoc" description = "Sys package with installer and bindings for mongoc"
authors = ["Thijs Cadier <thijs@appsignal.com>"] authors = ["Thijs Cadier <thijs@appsignal.com>"]
build = "build.rs" build = "build.rs"

@ -2,7 +2,7 @@ use std::env;
use std::path::Path; use std::path::Path;
use std::process::Command; use std::process::Command;
static VERSION: &'static str = "1.3.5"; // Should be the same as the version in the manifest static VERSION: &'static str = "1.3.5"; // Should be the same major version as in the manifest
fn main() { fn main() {
let out_dir_var = env::var("OUT_DIR").unwrap(); let out_dir_var = env::var("OUT_DIR").unwrap();

@ -138,6 +138,7 @@ pub mod bindings {
pub type mongoc_remove_flags_t = ::libc::c_uint; pub type mongoc_remove_flags_t = ::libc::c_uint;
pub type mongoc_update_flags_t = ::libc::c_uint; pub type mongoc_update_flags_t = ::libc::c_uint;
extern "C" { extern "C" {
pub fn mongoc_collection_aggregate(collection: *mut mongoc_collection_t, flags: mongoc_query_flags_t, pipeline: *const bson_t, options: *const bson_t, read_prefs: *const mongoc_read_prefs_t) -> *mut mongoc_cursor_t;
pub fn mongoc_collection_command(collection: *mut mongoc_collection_t, flags: mongoc_query_flags_t, skip: uint32_t, limit: uint32_t, batch_size: uint32_t, command: *const bson_t, fields: *const bson_t, read_prefs: *const mongoc_read_prefs_t) -> *mut mongoc_cursor_t; pub fn mongoc_collection_command(collection: *mut mongoc_collection_t, flags: mongoc_query_flags_t, skip: uint32_t, limit: uint32_t, batch_size: uint32_t, command: *const bson_t, fields: *const bson_t, read_prefs: *const mongoc_read_prefs_t) -> *mut mongoc_cursor_t;
pub fn mongoc_collection_command_simple(collection: *mut mongoc_collection_t, command: *const bson_t, read_prefs: *const mongoc_read_prefs_t, reply: *mut bson_t, error: *mut bson_error_t) -> u8; pub fn mongoc_collection_command_simple(collection: *mut mongoc_collection_t, command: *const bson_t, read_prefs: *const mongoc_read_prefs_t, reply: *mut bson_t, error: *mut bson_error_t) -> u8;
pub fn mongoc_collection_count_with_opts(collection: *mut mongoc_collection_t, flags: mongoc_query_flags_t, query: *const bson_t, skip: int64_t, limit: int64_t, opts: *const bson_t, read_prefs: *const mongoc_read_prefs_t, error: *mut bson_error_t) -> int64_t; pub fn mongoc_collection_count_with_opts(collection: *mut mongoc_collection_t, flags: mongoc_query_flags_t, query: *const bson_t, skip: int64_t, limit: int64_t, opts: *const bson_t, read_prefs: *const mongoc_read_prefs_t, error: *mut bson_error_t) -> int64_t;

@ -41,6 +41,28 @@ pub struct Collection<'a> {
inner: *mut bindings::mongoc_collection_t inner: *mut bindings::mongoc_collection_t
} }
/// Options to configure an aggregate operation.
pub struct AggregateOptions {
/// Flags to use
pub query_flags: Flags<QueryFlag>,
/// Options for the aggregate
pub options: Option<Document>,
/// Read prefs to use
pub read_prefs: Option<ReadPrefs>
}
impl AggregateOptions {
/// Default options that are used if no options are specified
/// when aggregating.
pub fn default() -> AggregateOptions {
AggregateOptions {
query_flags: Flags::new(),
options: None,
read_prefs: None
}
}
}
/// Options to configure a bulk operation. /// Options to configure a bulk operation.
pub struct BulkOperationOptions { pub struct BulkOperationOptions {
/// If the operations must be performed in order /// If the operations must be performed in order
@ -210,6 +232,46 @@ impl<'a> Collection<'a> {
} }
} }
/// Execute an aggregation query on the collection.
/// The bson 'pipeline' is not validated, simply passed along as appropriate to the server.
/// As such, compatibility and errors should be validated in the appropriate server documentation.
pub fn aggregate(
&'a self,
pipeline: &Document,
options: Option<&AggregateOptions>
) -> Result<Cursor<'a>> {
let default_options = AggregateOptions::default();
let options = options.unwrap_or(&default_options);
let cursor_ptr = unsafe {
bindings::mongoc_collection_aggregate(
self.inner,
options.query_flags.flags(),
try!(Bsonc::from_document(pipeline)).inner(),
match options.options {
Some(ref o) => {
try!(Bsonc::from_document(o)).inner()
},
None => ptr::null()
},
match options.read_prefs {
Some(ref prefs) => prefs.inner(),
None => ptr::null()
}
)
};
if cursor_ptr.is_null() {
return Err(InvalidParamsError.into())
}
Ok(Cursor::new(
cursor::CreatedBy::Collection(self),
cursor_ptr,
None
))
}
/// Execute a command on the collection. /// Execute a command on the collection.
/// This is performed lazily and therefore requires calling `next` on the resulting cursor. /// This is performed lazily and therefore requires calling `next` on the resulting cursor.
pub fn command( pub fn command(

@ -5,6 +5,34 @@ use mongo_driver::collection::{CountOptions,FindAndModifyOperation};
use mongo_driver::client::{ClientPool,Uri}; use mongo_driver::client::{ClientPool,Uri};
use mongo_driver::flags; use mongo_driver::flags;
#[test]
fn test_aggregate() {
let uri = Uri::new("mongodb://localhost:27017/").unwrap();
let pool = ClientPool::new(uri, None);
let client = pool.pop();
let mut collection = client.get_collection("rust_driver_test", "aggregate");
collection.drop().unwrap_or(());
for _ in 0..5 {
assert!(collection.insert(&doc!{"key" => 1}, None).is_ok());
}
let pipeline = doc!{
"pipeline" => [
{
"$group" => {
"_id" => "$key",
"total" => {"$sum" => "$key"}
}
}
]
};
let total = collection.aggregate(&pipeline, None).unwrap().next().unwrap().unwrap();
assert_eq!(Ok(5), total.get_i32("total"));
}
#[test] #[test]
fn test_command() { fn test_command() {
let uri = Uri::new("mongodb://localhost:27017/").unwrap(); let uri = Uri::new("mongodb://localhost:27017/").unwrap();

Loading…
Cancel
Save