Bulk Write Operations
This tutorial explains how to take advantage of MongoDB C driver bulk write operation features. Executing write operations in batches reduces the number of network round trips, increasing write throughput.
Bulk Insert
New in MongoDB C driver 0.94.2.
First we need to fetch a bulk operation handle from the mongoc_collection_t. This can be performed in either ordered or unordered mode. Unordered mode allows for greater parallelization when working with sharded clusters.
mongoc_bulk_operation_t *bulk = mongoc_collection_create_bulk_operation (collection, true, write_concern);
We can now start inserting documents to the bulk operation. These will be buffered until we execute the operation.
The bulk operation will coalesce insertions as a single batch for each consecutive call to mongoc_bulk_operation_insert(). This creates a pipelined effect when possible.
The bulk operation API will automatically handle MongoDB servers < 2.6 by speaking the old wire protocol. However, some performance degredation may occur.
To execute the bulk operation and receive the result we call mongoc_bulk_operation_execute().
#include <assert.h> #include <bcon.h> #include <mongoc.h> #include <stdio.h> static void bulk1 (mongoc_collection_t *collection) { mongoc_bulk_operation_t *bulk; bson_error_t error; bson_t *doc; bson_t reply; char *str; bool ret; int i; bulk = mongoc_collection_create_bulk_operation (collection, true, NULL); for (i = 0; i < 10000; i++) { doc = BCON_NEW ("i", BCON_INT32 (i)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); } ret = mongoc_bulk_operation_execute (bulk, &reply, &error); str = bson_as_json (&reply, NULL); printf ("%s\n", str); bson_free (str); if (!ret) { fprintf (stderr, "Error: %s\n", error.message); } bson_destroy (&reply); mongoc_bulk_operation_destroy (bulk); } int main (int argc, char *argv[]) { mongoc_client_t *client; mongoc_collection_t *collection; mongoc_init (); client = mongoc_client_new ("mongodb://localhost/"); collection = mongoc_client_get_collection (client, "test", "test"); bulk1 (collection); mongoc_collection_destroy (collection); mongoc_client_destroy (client); mongoc_cleanup (); return 0; }
Example reply document:
{"nInserted" : 10000, "nMatched" : 0, "nModified" : 0, "nRemoved" : 0, "nUpserted" : 0, "writeErrors" : [] "writeConcernErrors" : [] }
Mixed Bulk Write Operations
New in MongoDB C driver 0.94.2
MongoDB C driver also supports executing mixed bulk write operations. A batch of insert, update, and remove operations can be executed together using the bulk write operations API.
Though the following API will work with all versions of MongoDB, it is designed to be used with MongoDB versions >= 2.6. Much better bulk insert performance can be achieved with older versions of MongoDB through the deprecated mongoc_collection_insert_bulk() method.
Ordered Bulk Write Operations
Ordered bulk write operations are batched and sent to the server in the order provided for serial execution. The reply document describes the type and count of operations performed.
#include <assert.h> #include <bcon.h> #include <mongoc.h> #include <stdio.h> static void bulk2 (mongoc_collection_t *collection) { mongoc_bulk_operation_t *bulk; bson_error_t error; bson_t *query; bson_t *doc; bson_t reply; char *str; bool ret; int i; bulk = mongoc_collection_create_bulk_operation (collection, true, NULL); /* Remove everything */ query = bson_new (); mongoc_bulk_operation_remove (bulk, query); bson_destroy (query); /* Add a few documents */ for (i = 1; i < 4; i++) { doc = BCON_NEW ("_id", BCON_INT32 (i)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); } /* {_id: 1} => {$set: {foo: "bar"}} */ query = BCON_NEW ("_id", BCON_INT32 (1)); doc = BCON_NEW ("$set", "{", "foo", BCON_UTF8 ("bar"), "}"); mongoc_bulk_operation_update (bulk, query, doc, false); bson_destroy (query); bson_destroy (doc); /* {_id: 4} => {'$inc': {'j': 1}} (upsert) */ query = BCON_NEW ("_id", BCON_INT32 (4)); doc = BCON_NEW ("$inc", "{", "j", BCON_INT32 (1), "}"); mongoc_bulk_operation_update (bulk, query, doc, true); bson_destroy (query); bson_destroy (doc); /* replace {j:1} with {j:2} */ query = BCON_NEW ("j", BCON_INT32 (1)); doc = BCON_NEW ("j", BCON_INT32 (2)); mongoc_bulk_operation_replace_one (bulk, query, doc, false); bson_destroy (query); bson_destroy (doc); ret = mongoc_bulk_operation_execute (bulk, &reply, &error); str = bson_as_json (&reply, NULL); printf ("%s\n", str); bson_free (str); if (!ret) { printf ("Error: %s\n", error.message); } bson_destroy (&reply); mongoc_bulk_operation_destroy (bulk); } int main (int argc, char *argv[]) { mongoc_client_t *client; mongoc_collection_t *collection; mongoc_init (); client = mongoc_client_new ("mongodb://localhost/"); collection = mongoc_client_get_collection (client, "test", "test"); bulk2 (collection); mongoc_collection_destroy (collection); mongoc_client_destroy (client); mongoc_cleanup (); return 0; }
Example reply document:
{ "nInserted" : 3, "nMatched" : 2, "nModified" : 2, "nRemoved" : 10000, "nUpserted" : 1, "upserted" : [{"index" : 5, "_id" : 4}], "writeErrors" : [] "writeConcernErrors" : [] }
The index field in the upserted array is the 0-based index of the upsert operation; in this example, the sixth operation of the overall bulk operation was an upsert, so its index is 5.
nModified is only reported when using MongoDB 2.6 and later, otherwise the field is omitted.
Unordered Bulk Write Operations
Unordered bulk write operations are batched and sent to the server in arbitrary order where they may be executed in parallel. Any errors that occur are reported after all operations are attempted.
In the next example the first and third operations fail due to the unique constraint on _id. Since we are doing unordered execution the second and fourth operations succeed.
#include <assert.h> #include <bcon.h> #include <mongoc.h> #include <stdio.h> static void bulk3 (mongoc_collection_t *collection) { mongoc_bulk_operation_t *bulk; bson_error_t error; bson_t *query; bson_t *doc; bson_t reply; char *str; bool ret; /* false indicates unordered */ bulk = mongoc_collection_create_bulk_operation (collection, false, NULL); /* Add a document */ doc = BCON_NEW ("_id", BCON_INT32 (1)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); /* remove {_id: 2} */ query = BCON_NEW ("_id", BCON_INT32 (2)); mongoc_bulk_operation_remove_one (bulk, query); bson_destroy (query); /* insert {_id: 3} */ doc = BCON_NEW ("_id", BCON_INT32 (3)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); /* replace {_id:4} {'i': 1} */ query = BCON_NEW ("_id", BCON_INT32 (4)); doc = BCON_NEW ("i", BCON_INT32 (1)); mongoc_bulk_operation_replace_one (bulk, query, doc, false); bson_destroy (query); bson_destroy (doc); ret = mongoc_bulk_operation_execute (bulk, &reply, &error); str = bson_as_json (&reply, NULL); printf ("%s\n", str); bson_free (str); if (!ret) { printf ("Error: %s\n", error.message); } bson_destroy (&reply); mongoc_bulk_operation_destroy (bulk); } int main (int argc, char *argv[]) { mongoc_client_t *client; mongoc_collection_t *collection; mongoc_init (); client = mongoc_client_new ("mongodb://localhost/"); collection = mongoc_client_get_collection (client, "test", "test"); bulk3 (collection); mongoc_collection_destroy (collection); mongoc_client_destroy (client); mongoc_cleanup (); return 0; }
Example reply document:
{ "nInserted" : 0, "nMatched" : 1, "nModified" : 1, "nRemoved" : 1, "nUpserted" : 0, "writeErrors" : [ { "index" : 0, "code" : 11000, "errmsg" : "E11000 duplicate key error index: test.test.$_id_ dup key: { : 1 }" }, { "index" : 2, "code" : 11000, "errmsg" : "E11000 duplicate key error index: test.test.$_id_ dup key: { : 3 }" } ], "writeConcernErrors" : [] } Error: E11000 duplicate key error index: test.test.$_id_ dup key: { : 1 }
The bson_error_t domain is MONGOC_ERROR_COMMAND and its code is 11000.
Bulk Operation Write Concerns
By default bulk operations are executed with the write_concern of the collection they are executed against. A custom write concern can be passed to the mongoc_collection_create_bulk_operation() method. Write concern errors (e.g. wtimeout) will be reported after all operations are attempted, regardless of execution order.
#include <assert.h> #include <bcon.h> #include <mongoc.h> #include <stdio.h> static void bulk4 (mongoc_collection_t *collection) { mongoc_write_concern_t *wc; mongoc_bulk_operation_t *bulk; bson_error_t error; bson_t *doc; bson_t reply; char *str; bool ret; wc = mongoc_write_concern_new (); mongoc_write_concern_set_w (wc, 4); mongoc_write_concern_set_wtimeout (wc, 100); /* milliseconds */ bulk = mongoc_collection_create_bulk_operation (collection, true, wc); /* Two inserts */ doc = BCON_NEW ("_id", BCON_INT32 (10)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); doc = BCON_NEW ("_id", BCON_INT32 (11)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); ret = mongoc_bulk_operation_execute (bulk, &reply, &error); str = bson_as_json (&reply, NULL); printf ("%s\n", str); bson_free (str); if (!ret) { printf ("Error: %s\n", error.message); } bson_destroy (&reply); mongoc_bulk_operation_destroy (bulk); mongoc_write_concern_destroy (wc); } int main (int argc, char *argv[]) { mongoc_client_t *client; mongoc_collection_t *collection; mongoc_init (); client = mongoc_client_new ("mongodb://localhost/"); collection = mongoc_client_get_collection (client, "test", "test"); bulk4 (collection); mongoc_collection_destroy (collection); mongoc_client_destroy (client); mongoc_cleanup (); return 0; }
Example reply document and error message:
{ "nInserted" : 2, "nMatched" : 0, "nModified" : 0, "nRemoved" : 0, "nUpserted" : 0, "writeErrors" : [], "writeConcernErrors" : [ { "code" : 64, "errmsg" : "waiting for replication timed out" } ] } Error: waiting for replication timed out
The bson_error_t domain is MONGOC_ERROR_WRITE_CONCERN if there are write concern errors and no write errors. Write errors indicate failed operations, so they take precedence over write concern errors, which mean merely that the write concern is not satisfied yet.
Further Reading
See the Driver Bulk API Spec, which describes bulk write operations for all MongoDB drivers.