Multiple Partition Keys in Azure Cosmos DB (Part 1) – Querying the Change Feed Directly

To begin, let’s be clear that an Azure Cosmos DB container can have only one partition key. I say this from the start in case “multiple partition keys” in the title is somehow misinterpreted to imply otherwise. You always need to come up with a single property to serve as the partition key for every container and choosing the best property for this can sometimes be difficult.

Understanding the Problem

Making the right choice requires intimate knowledge about the data access patterns of your users and applications. You also need to understand how horizontal partitioning works behind the scenes in terms of storage and throughput, how queries and stored procedures are scoped by partition key, and the performance implications of running cross-partition queries and fan-out queries. So there are many considerations to take into account before settling on the one property to partition a container on. I discuss all this at length in a previous blog post Horizontal Partitioning in Azure Cosmos DB.

But here’s the rub. What if, after all the analysis, you come to realize that you simply cannot settle on a single property that serves as an ideal partition key for all scenarios? Let’s say for example, from a write perspective, you find one property will best distribute writes uniformly across all the partitions in the container. But from a query perspective, you find that using the same partition key results in too much fanning out. Or, you might identify two categories of common queries, where it’s roughly 50/50; meaning, about half of all the queries are of one type, and half are of the other. What do you do if the two query categories would each benefit from different partition keys?

Your brain can get caught in an infinite loop over this until you wind up in that state of “analysis paralysis,” where you recognize that there’s just no single best property to choose as the partition key. To break free, you need to think outside the box. Or, let’s say, think outside the container. Because the solution here is to simply create another container that’s a complete “replica” of the first. This second container holds the exact same set of documents as the first but defines a different partition key.

I placed quotes around the word “replica” because this second container is not technically a replica in the true Cosmos DB sense of the word (where, internally, Cosmos DB automatically maintains replicas of the physical partitions in every container). Rather, it’s a manual replica that you maintain yourself. Thus, it’s your job to keep it in sync with changes when they happen in the first container, which is partitioned by a property that’s optimized for writes. As those writes occur in real time, you need to respond by updating the second collection, which is partitioned by a property that’s optimized for queries.

Enter Change Feed

Fortunately, change feed comes to the rescue here. Cosmos DB maintains a persistent record of changes for every container that can be consumed using the change feed. This gives you a reliable mechanism for retrieving changes made to any container, all the way back to the beginning of time. For an introduction to Azure Change Feed, have a look at my previous blog post Change Feed – Unsung Hero of Azure Cosmos DB

In this three-part series of blog posts, I’ll dive into three ways you can use the change feed to implement a solution for synchronizing containers:

  • Querying the change feed directly
  • Using the Change Feed Processor (CFP) library
  • Writing an Azure Functions Cosmos DB trigger

Let’s get started. Assume that we’ve done our analysis and established that city is the ideal partition key for writes, as well as roughly half of the most common queries our users will be running. But we’ve also determined that state is the ideal partition key for the other (roughly half) commonly executed queries. This means we’ll want one container partitioned by city, and another partitioned by state. And we’ll want to consume the city-partitioned container’s change feed to keep the state-partitioned container in sync with changes as they occur. We’ll then be able to direct our city-based queries to the first container, and our state-based queries to the second container, which then eliminates fan-out queries in both cases.

Setting Up

If you’d like to follow along, you’ll need to be sure your environment is setup properly. First, of course, you’ll need to have a Cosmos DB account. The good news here is that you can get a free 30-day account with the “try cosmos” offering, which doesn’t even require a credit card or Azure subscription (just a free Microsoft account). Even better, there’s no limit to the number of times you can start a new 30-day trial. Create your free account at http://azure.microsoft.com/try/cosmosdb.

You’ll need your account’s endpoint URI and master key to connect to Cosmos DB from C#. To obtain them, head over to your Cosmos DB account in the Azure portal, open the Keys blade, and keep it open so that you can handily copy/paste them into the project.

You’ll also need Visual Studio. I’ll be using Visual Studio 2019, but the latest version of Visual Studio 2017 is fine as well. You can download the free community edition at https://visualstudio.microsoft.com/downloads.

Querying the Change Feed Directly

We’ll begin with the raw approach, which is to query the change feed directly using the SDK. The reality is that you’ll almost never want to go this route, except for the simplest small-scale scenarios. Still, it’s worth taking some time to examine this approach first, as I think you’ll benefit from learning how the change feed operates at a low level, and it will enhance your appreciation of the Change Feed Processor (CFP) library which I’ll cover in the next blog post of this series.

Fire up Visual Studio, create a new .NET Core console application, and name the project ChangeFeedDirect, and name the solution ChangeFeedDemos (we’ll be adding more projects to this solution in parts 2 and 3 of this blog series). Next, add the SDK to the ChangeFeedDirect project from the NuGet package Microsoft.Azure.DocumentDB.Core:

We’ll write some basic code to create a database with the two containers, with additional methods to create, update, and delete documents in the first container (partitioned by city). Then we’ll write our “sync” method that directly queries the change feed on the first container, in order to update the second container (partitioned by state) and reflect all the changes made.

Note: Our code (and the SDK) refers to containers as collections.

We’ll write all our code inside the Program.cs file. First, update the using statements at the very top of the file to get the right namespaces imported:

using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

Next, at the very top of the Program class, set two private string constants for your Cosmos DB account’s endpoint and master key. You can simply copy them from the Keys blade in the Azure portal, and paste them right into the code:

private const string CosmosEndpoint = "https://<account-name>.documents.azure.com:443/";
private const string CosmosMasterKey = "<account-key>";

Now edit the Main method with one line to call the Run method:

static void Main(string[] args)
{
  Task.Run(Run).Wait();
}

This one-liner invokes the Run method and waits for it to complete. Add the Run method next, which displays a menu and calls the various methods defined for each menu option:

private static async Task Run()
{
  while (true)
  {
    Console.WriteLine("Menu");
    Console.WriteLine();
    Console.WriteLine("DB - Create database");
    Console.WriteLine("CD - Create documents");
    Console.WriteLine("UD - Update document");
    Console.WriteLine("DD - Delete document");
    Console.WriteLine("SC - Sync collections");
    Console.WriteLine("Q - Quit");
    Console.WriteLine();
    Console.Write("Selection: ");

    var input = Console.ReadLine().ToUpper().Trim();
    if (input == "Q") break;
    else if (input == "DB") await CreateDatabase();
    else if (input == "CD") await CreateDocuments();
    else if (input == "UD") await UpdateDocument();
    else if (input == "DD") await DeleteDocument();
    else if (input == "SC") await SyncCollections();
    else Console.WriteLine("\nInvalid selection; try again\n");
  }
}

We’ll add the menu option methods one at a time, starting with CreateDatabase:

private static async Task CreateDatabase()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    // Create the database

    var dbUri = UriFactory.CreateDatabaseUri("multipk");

    try { await client.DeleteDatabaseAsync(dbUri); } catch { }

    await client.CreateDatabaseAsync(new Database { Id = "multipk" });

    // Create the two collections

    var partitionKeyDefinition = new PartitionKeyDefinition();
    partitionKeyDefinition.Paths.Add("/city");
    await client.CreateDocumentCollectionAsync(dbUri, new DocumentCollection
    {
      Id = "byCity",
      PartitionKey = partitionKeyDefinition,
      DefaultTimeToLive = -1
    }, new RequestOptions { OfferThroughput = 400 });

    partitionKeyDefinition = new PartitionKeyDefinition();
    partitionKeyDefinition.Paths.Add("/state");
    await client.CreateDocumentCollectionAsync(dbUri, new DocumentCollection
    {
      Id = "byState",
      PartitionKey = partitionKeyDefinition
    }, new RequestOptions { OfferThroughput = 400 });

    // Load the sync document into the first collection

    var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");
    var syncDocDef = new
    {
      city = "sync",
      state = "sync",
      lastSync = default(DateTime?)
    };
    await client.CreateDocumentAsync(collUri, syncDocDef);
  }

  Console.WriteLine("Created database for change feed demo");
}

Most of this code is intuitive, even if you’ve never done any Cosmos DB programming before. We first use our endpoint and master key to create a DocumentClient, and then we use the client to create a database named multipk with the two containers in it.

This works by first calling DeleteDatabaseAsync wrapped in a try block with an empty catch block. This effectively results in “delete if exists” behavior to ensure that the multipk database does not exist when we call CreateDatabaseAsync to create it.

Next, we call CreateDocumentCollection twice to create the two containers (again, a collection is a container). We name the first container byCity and assign it a partition key of /city, and we name the second container byState and assign /state as the partition key. Both containers reserve 400 request units (RUs) per second, which is the lowest throughput you can provision.

Notice the DefaultTimeToLive = -1 option applied to the first container. At the time of this writing, change feed does not support deletes. That is, if you delete a document from a container, it does not get picked up by the change feed. This may be supported in the future, but for now, the TTL (time to live) feature provides a very simple way to cope with deletions. Rather than physically deleting documents from the first container, we’ll just update them with a TTL of 60 seconds. That gives us 60 seconds to detect the update in the change feed, so that we can physically delete the corresponding document in the second container. Then, 60 seconds later, Cosmos DB will automatically physically delete the document from the first container by virtue of its TTL setting. You’ll see all this work in a moment when we run the code.

The other point to call out is the creation of our sync document, which is a special metadata document that won’t get copied over to the second container. Instead, we’ll use it to persist a timestamp to keep track of the last time we synchronized the containers. This way, each time we sync, we can request the correct point in time from which to consume changes that have occurred since the previous sync. The document is initialized with a lastSync value of null so that our first sync will consume the change feed from the beginning of time. Then lastSync is updated so that the next sync picks up precisely where the first one left off.

Now let’s implement CreateDocuments. This method simply populates three documents in the first container:

private static async Task CreateDocuments()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

    var dataDocDef = new
    {
      city = "Brooklyn",
      state = "NY",
      slogan = "Kings County"
    };
    await client.CreateDocumentAsync(collUri, dataDocDef);

    dataDocDef = new
    {
      city = "Los Angeles",
      state = "CA",
      slogan = "Golden"
    };
    await client.CreateDocumentAsync(collUri, dataDocDef);

    dataDocDef = new
    {
      city = "Orlando",
      state = "FL",
      slogan = "Sunshine"
    };
    await client.CreateDocumentAsync(collUri, dataDocDef);
  }

  Console.WriteLine("Created 3 documents in city collection");
}

Notice that all three documents have city and state properties, where the city property is the partition key for the container that we’re creating these documents in. The state property is the partition key for the second container, where our sync method will create copies of these documents as it picks them up from the change feed. The slogan property is just an ordinary document property. And although we aren’t explicitly supplying an id property, the SDK will automatically generate one for each document with a GUID as the id value.

We’ll also have an UpdateDocument method to perform a change on one of the documents:

private static async Task UpdateDocument()
{
  var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    // Update a document
    var brooklynDoc = client
      .CreateDocumentQuery(collUri, "SELECT * FROM c WHERE c.city = 'Brooklyn'")
      .AsEnumerable()
      .FirstOrDefault();

    brooklynDoc.slogan = "Fuhgettaboutit! " + Guid.NewGuid().ToString();
    await client.ReplaceDocumentAsync(brooklynDoc._self, brooklynDoc);
  }

  Console.WriteLine("Updated Brooklyn document in city collection");
}

This code retrieves the Brooklyn document, updates the slogan property, and calls ReplaceDocumentAsync to persist the change back to the container.

Next comes the DeleteDocument method:

private static async Task DeleteDocument()
{
  var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    // Delete a document (set time-to-live at 60 seconds)
    var orlandoDoc = client
      .CreateDocumentQuery(collUri, "SELECT * FROM c WHERE c.city = 'Orlando'")
      .AsEnumerable()
      .FirstOrDefault();

    orlandoDoc.ttl = 60;
    await client.ReplaceDocumentAsync(orlandoDoc._self, orlandoDoc);
  }

  Console.WriteLine("Deleted Olrando document in city collection");
}

Remember that (currently) the change feed doesn’t capture deleted documents, so we’re using the TTL (time to live) technique to keep our deletions in sync. Rather than calling DeleteDocumentAsync to physically delete the Orlando document, we’re simply updating it with a ttl property set to 60 and saving it back to the container with ReplaceDocumentAsync. To the change feed, this is just another update, so our sync method will pick it up normally as you’ll see in a moment. Meanwhile, Cosmos DB will physically delete the Orlando document from the first container in 60 seconds, giving our sync method up to one minute to pick it up from the change feed and delete it from the second container.

And finally, the sync method, which is what this whole discussion is all about. Here’s the code for SyncCollections:

private static async Task SyncCollections()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    var cityCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

    // Obtain last sync time

    var syncDoc = client
      .CreateDocumentQuery(cityCollUri, "SELECT * FROM c WHERE c.city = 'sync'")
      .AsEnumerable()
      .FirstOrDefault();

    var lastSync = (DateTime?)syncDoc.lastSync;

    // Step 1: Gather all the partition key ranges (physical partitions)

    var continuationToken = default(string);
    var partitionKeyRanges = new List<PartitionKeyRange>();
    var loop = true;

    while (loop)
    {
      var partitionKeyRange = await client
        .ReadPartitionKeyRangeFeedAsync(
          cityCollUri,
          new FeedOptions { RequestContinuation = continuationToken });

      partitionKeyRanges.AddRange(partitionKeyRange);
      continuationToken = partitionKeyRange.ResponseContinuation;
      loop = continuationToken != null;
    }

    // Step 2: Consume the change feed for each partition key range

    // (simple demo doesn't scale when continuation tokens are needed
    foreach (var partitionKeyRange in partitionKeyRanges)
    {
      var options = new ChangeFeedOptions
      {
        PartitionKeyRangeId = partitionKeyRange.Id,
        StartFromBeginning = (lastSync == null),
        StartTime = (lastSync == null ? null : lastSync),
      };

      var query = client.CreateDocumentChangeFeedQuery(cityCollUri, options);

      while (query.HasMoreResults)
      {
        var readChangesResponse = await query.ExecuteNextAsync();
        foreach (var changedDocument in readChangesResponse)
        {
          if (changedDocument.city != "sync")
          {
            if (JsonConvert.DeserializeObject(changedDocument.ToString())["ttl"] == null)
            {
              var stateCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byState");
              await client.UpsertDocumentAsync(stateCollUri, changedDocument);
              Console.WriteLine($"Upserted document id {changedDocument.id} in byState collection");
            }
            else
            {
              var stateDocUri = UriFactory.CreateDocumentUri("multipk", "byState", changedDocument.id);
              await client.DeleteDocumentAsync(stateDocUri, new RequestOptions
              {
                PartitionKey = new PartitionKey(changedDocument.state)
              });
              Console.WriteLine($"Deleted document id {changedDocument.id} in byState collection");
            }
          }
        }
      }
    }

    // Update last sync time
    syncDoc.lastSync = DateTime.UtcNow;
    await client.ReplaceDocumentAsync(syncDoc._self, syncDoc);
  }

}

Let’s break this down. First, we grab the last sync time from the sync document in the first container. Remember, this will be null the very first time we run this method. Then, we’re ready to query the change feed, which is a two-step process.

For step 1, we need to discover all the partition key ranges in the container. A partition key range is essentially a set of partition keys. In our small demo, where we have only one document each across three distinct partition keys (cities), Cosmos DB will host all three of these documents inside a single partition key range.

Although there is conceptually only one change feed per container, there is actually one change feed for each partition key range in the container. So step 1 calls ReadPartitionKeyRangeFeedAsync to discover the partition key ranges, with a loop that utilizes a continuation token from the response so that we retrieve all of the partition key ranges into a list.

Then, in step 2, we iterate the list to consume the change feed on each partition key range. Notice the ChangeFeedOptions object that we set on each iteration, which identifies the partition key range in PartitionKeyRangeId, and then sets either StartFromBeginning or StartTime, depending on whether lastSync is null or not. If it’s null (which will be true only on the very first sync), then StartFromBeginning will be set to true and StartTime will be set to null. Otherwise, StartFromBeginning gets set to false, and StartTime gets set to the timestamp from the last sync.

After preparing the options, we call CreateDocumentChangeFeedQuery that returns an iterator. As long as the iterator’s HasMoreResults property is true, we call ExecuteNextAsync on it to fetch the next set of results from the change feed. And here, ultimately, is where we plug in our sync logic.

Each result is a changed document. We know this will always include the sync document, because we’ll be updating it after every sync. This is metadata that we don’t need copied over to the second container each time, so we filter out the sync document by testing the city property for “sync.”

For all other changed documents, it now becomes a matter of performing the appropriate create, update, or delete operation on the second container. First, we check to see if there is a ttl property on the document. Remember that this is our indication of whether this is a delete or not. If the ttl property isn’t present, then it’s either a create or an update. In either case, we handle the change by calling UpsertDocumentAsync on the second container (upsert means “update or insert”).

Otherwise, if we detect the ttl property, then we call DeleteDocumentAsync to delete the document from the second container, knowing that Cosmos DB will delete its counterpart from the first container when the ttl expires.

Let’s test it out. Start the console app and run the DB (create database) and CD (create documents) commands. Then navigate to the Data Explorer in the Azure portal to verify that the database exists with the two containers, and that the byCity container has three documents in it, plus the sync document with a lastSync value of null indicating that no sync has yet occurred:

The byState container should be empty at this point, because we haven’t run our first sync yet:

Back in the console app, run the SC command to sync the containers. This copies all three documents from the first container’s change feed over to the second container, skipping the sync document which we excluded in our code:

Upserted document id 01501a99-e8df-4c3b-9892-ed2eadb81180 in byState collection
Upserted document id fb8d41ae-3aae-4892-bfe9-8c34bc8138d2 in byState collection
Upserted document id 5f5600f3-9f34-4a4d-bdb4-28061a5ab35a in byState collection

Returning to the portal, refresh the data explorer to confirm that the second container now has the same three documents as the first, although here they are partitioned by state rather than city:

Both containers are now in sync. Refresh the first container view, and you’ll see that the lastSync property has been changed from null to a timestamp from when the previous sync ran.

Run the UD command in the console app the update the first container with a change to the slogan property of the Brooklyn, NY document. Now run SC to sync the containers again:

Upserted document id 01501a99-e8df-4c3b-9892-ed2eadb81180 in byState collection

Refresh the second container view, and you’ll see that the slogan property has now been updated there as well.

Finally, run the DD command in the console app the delete the Orlando, FL document from the first container. Remember that this doesn’t actually delete the document, but rather updates it with a ttl property set to 60 seconds. Then run SC to sync the containers again:

Deleted document id 5f5600f3-9f34-4a4d-bdb4-28061a5ab35a in byState collection

You can now confirm that the Orlando, FL document is deleted from the second container, and within a minute (upon ttl expiration), you’ll see that it gets deleted from the first container as well.

However, don’t wait longer than a minute after setting the ttl before running the sync or you will run out of time. Cosmos DB will delete the document from the first container when the ttl expires, at which point it will disappear from the change feed and you will lose your chance to delete it from the second container.

What’s Next

It didn’t take that much effort to consume the change feed, but that’s only because we have a tiny container with just a handful of changes, and we’re manually invoking each sync. To consume the change feed at scale, much more work needs to be done. For example, the change feed on each partition key range of the container can be consumed concurrently, so we could add multithreading logic to parallelize those queries. Long change feeds can also be consumed in chunks, using continuation tokens that we could persist as a “lease,” so that new clients can resume consumption where previous clients left off. We also want the sync automated, so that we don’t need to poll manually.

Fortunately, the Change Feed Processor (CFP) library handles all these details for you. It was certainly beneficial to start by querying the change feed directly, since exploring that option first is a great way to learn how change feed works internally. However, unless you have very custom requirements, the CFP library is the way to go.

So stay tuned for part 2, and we’ll see how much easier it is to implement our multiple partition key solution much more robustly using the CFP library.

Advertisements

Demystifying the Multi-Model Capabilities in Azure Cosmos DB

When someone casually asks me, “Hey, what is Cosmos DB?,” I casually respond, “Well, that’s Microsoft’s globally distributed, massively scalable, horizontally partitioned, low latency, fully indexed, multi-model NoSQL database, of course.” One of two things happen then. I’ll often get a long weird look, after which the other person politely excuses themselves and moves along. But every now and again, I’ll hear “wow, that sounds awesome, tell me more!” If you’re still reading this, then I’m guessing you’re in the latter category.

If you start to elaborate on each of the bullet points in my soundbite response, there’s a lot to discuss before you get to “multi-model NoSQL” at the tail end. Starting with “globally distributed,” Cosmos DB is – first and foremost – a database designed for modern web and mobile applications, which are (typically) global applications in nature. Simply by clicking the mouse on a map in the portal, your Cosmos DB database is instantly replicated anywhere and everywhere Microsoft hosts a data center (there are nearly 50 of them worldwide, to date). This delivers high availability and low latency to users wherever they’re located.

Cosmos DB also delivers virtually unlimited scale, both in terms of storage – via server-side horizontal partitioning, and throughput – by provisioning a prescribed number of request units (RUs) per second. This ensures low latency, and is backed by comprehensive SLAs to yield predictable database performance for your applications. And it’s unique “inverted indexing” scheme enables automatic indexing of every property that you store, with minimal overhead.

Whew. That’s quite a lot to digest before we even start pondering Cosmos DB’s multi-model support, mentioned there at the end of my lengthy description. In fact, it’s very deliberately placed at the end. Because regardless of which data model you choose, it actually makes no difference to Cosmos DB. The capabilities around global distribution, horizontal partitioning, provisioned throughput, and automatic indexing apply – these are durable concepts that transcend whatever data model you choose. So you get to pick and choose among any of the supported data models, without compromising any of the core features of the Cosmos DB engine.

Which segues right into the topic of this blog post. What exactly is “multi-model”? And specifically, what does it mean for a database platform like Cosmos DB to support multiple data models?

It all boils down to how you’d like to treat your data – and this is where the developer comes in. Because, while massive scale is clearly important (if not critical), developers don’t really care about such details as long as it all “just works.” And it’s Cosmos DB’s job to make sure that this all works. When it comes to actually building applications – well, that’s the developer’s job, and this is where the decision of which data model to choose comes into play.

Depending on the type of application being built, it could be more appropriate to use one data model and not another. For example, if the application focuses more on relationships between entities than the entities themselves, then a graph data model may work better than a document model. In other cases, a developer may want to migrate an existing NoSQL application to Cosmos DB; for example, an existing Mongo DB or Cassandra application. In these scenarios, the Cosmos DB data model will be pre-determined; depending on the back-end database dependency of the application being ported, the developer would choose either the Mongo DB-compatible or Cassandra-compatible data model. Such a migration would require minimal (to no) changes to the existing application code. And yet, in other “green field” situations, developers that are very opinionated about how data should be modeled are free to choose whichever data model they prefer.

Each data model has an API for developers to work with in Cosmos DB. Put another way, the developer chooses an API for their database, and that determines the data model that is used. So, let’s break it down:

MM1

Document Data Model (SQL & MongoDB APIs)

The first thing to point out is that the SQL API is, essentially, the original DocumentDB programming model from the days when Cosmos DB was called DocumentDB. This is arguably the most robust and capable of all the APIs, because it is the only one that exposes a server-side programming model that lets you build fully transactional stored procedures, triggers, and user-defined functions.

So both the SQL and MongoDB APIs give you a document data model, but the two APIs themselves are radically different. Yes, they are similar from a data modeling perspective; you store complete denormalized entities as a hierarchical key-value document model; pure JSON in the case of the SQL API, or BSON in the case of the MongoDB API (BSON is MongoDB’s special binary-encoded version of JSON that extends JSON with additional data types and multi-language support).

The critical difference between the two APIs is the programming interface itself. The SQL API uses Microsoft’s innovative variant of structured query language (SQL) that is tailored for searching across hierarchical JSON documents. It also supports the server-side programming model (for example, stored procedures), which none of the other APIs do.

In contrast, the MongoDB API actually provides wire-level support, which is a compatibility layer that understands the protocol used by the MongoDB driver for sending packets over the network. MongoDB has a built-in find method used for querying documents (unlike the SQL support found in the SQL API). So the MongoDB API appeals to existing MongoDB developers, because they now enjoy the scale-out, throughput, and global distribution capabilities of Cosmos DB, without deserting the MongoDB ecosystem. Because the MongoDB API in Cosmos DB gives you full compatibility with existing MongoDB application code, and lets you continue working with familiar MongoDB tools.

Key-Value Data Model (Table API)

You can also model your data as a key-value store, using the Table API. This API is actually the evolution of Azure Table Storage – one of the very first NoSQL databases available on Azure. In fact, all existing Azure Table Storage customers will eventually be migrated over to Cosmos DB and the Table API.

With this data model, each entity consists of a key and a value pair, but the value itself is a set of key-value pairs. So this is nothing like a table in a relational database, where each row has the same columns; with the Table API in Cosmo DB, each entity’s value can have a different set of key-value pairs.

The Table API appeals primarily to existing Azure Table Storage customers, because it emulates the Azure Table Storage API. Using this API, existing Azure Table Storage apps can be migrated quickly and easily to Cosmos DB. For a new project though, there would be little reason to ever consider using the Table API, when you consider the fact that the SQL API is far more capable than the Table API.

So then, when would you actually choose to use the Table API? Well, again, the primary use case is to migrate an existing Azure Table Storage account over to Cosmos DB, without having to change any code in your applications. Remember that Microsoft is planning to do this for every customer over a long term migration, but there’s no reason to wait for them to do that, if you don’t want to wait. You can migrate the data yourself now, and then immediately start enjoying the benefits of Cosmos DB as a back-end, and you don’t have to make any changes whatsoever to your existing Azure Table Storage applications. You just change the connection string to point to Cosmos DB, and the application continues to work seamlessly against the Cosmos DB Table API.

Graph Data Model (Gremlin API)

You can also choose the Gremlin API, which gives you a graph database based on the Apache Tinkerpop open source project. Graph databases are becoming increasingly popular in the NoSQL world.

What do you put in a graph? One of two things; either a vertex or an edge. Now don’t let these terms intimidate you. They’re just fancy words for entities and relationships, respectively. So a vertex is an entity, and an edge is a one-way relationship between any two vertices. And that’s it – nothing more and nothing less. These are the building blocks of any graph database. And whether you’re storing a vertex or an edge, you can attach any number of arbitrary properties to it; much like the arbitrary key-value pairs you can define for a row using the Table API, or a flat JSON document using the SQL API.

The Gremlin API provides a succinct graph traversal language that enables you to efficiently query across the many relationships that exist in a graph database. For example, in a social networking application, you could easily find a user, then look for all of that user’s posts where the location is NY, and of those, find all the relationships where some other user has commented on or liked those posts.

Columnar (Cassandra API)

There’s a fourth option for choosing a data model, and that’s columnar, using the Cassandra API. Columnar is yet another way of modeling your data, where – in a departure from the typical way of dealing with schema-free data in the NoSQL world – you actually do define the schema of your data up-front. However, data is still stored physically in a column-oriented fashion, so it’s still OK to have sparse columns, and it has good support for aggregations. Columnar is somewhat similar to the Key-Value data model with the Table API, except that every item in the container is required to adhere to the schema defined for the container. And in that sense, columnar is really most similar to columnstore in SQL Server, except of course that it is implemented using a NoSQL architecture, so it’s distributed and partitioned to massively scale out big data.

Atom Record Sequence (ARS)

The fact of the matter is, these APIs merely project your data as different data models; whereas internally, your data is always stored as ARS – or Atom Record Sequence – a Microsoft creation that defines the persistence layer for key-value pairs. Now you don’t need to know anything about ARS; you don’t even need to know that it’s there. But it is there, under the covers, storing all your data as key-value pairs in a manner that’s agnostic to the data-model you’ve chosen to work with.

Because at the end of the day, it’s all just keys and values – not just the key-value data model, but all these data models. They’re all some form of keys and values. A JSON or BSON document is a collection of keys and values, where values can either be simple values, or they can contain nested key-value pairs. The key-value model is clearly based on keys and values, but so are graph and columnar. The relationships you define in a graph database are expressed as annotations that are, themselves key-value pairs, and certainly all the columns defined for a columnar data model can be viewed as key-value pairs as well.

So, these API’s are here to broaden your choices in terms of how you get to treat your data; they bear no consequence on the ability to scale your database. For example, if you want to be able to write SQL queries, you would choose the SQL API, and not the Table API. But if you want MongoDB or Azure Table Storage compatibility, then you’d go with the MongoDB or Table API respectively.

Switching Between Data Models

As you’ve seen, when you choose an API, you are also choosing a data model. And today (since the release of Cosmos DB in May 2017), you choose an API when you create a Cosmos DB account. This means that today, a Cosmos DB account is tied to one API, which ties it to one data model:

MM2

But again, each data model is merely a projection of the same underlying ARS format, and so eventually you will be able to create a single account, and then switch freely between different APIs within the account. So that then, you’ll be able to access one database as graph, key-value, document, or columnar, all at once, if you wish:

MM3

You can also expect to see additional APIs in the future, as Cosmos DB broadens its compatibility support for other database systems. This will enable a an even wider range of developers to stick with their database of choice, while leveraging Cosmos DB as a back end for horizontal partitioning, provisioned throughput, global distribution, and automatic indexing.

Summary

Azure Cosmos DB has multiple APIs and supports multiple data models. In this blog post, we explored the multi-API, multi-model capabilities of Cosmos DB, including the document data model with either the SQL or MongoDB APIs, key-value with the Table API, graph with the Gremlin API, and columnar with the Cassandra API.

Regardless of which data model you choose, however, Cosmos DB stores everything in ARS, and merely projects different data models, based on the different APIs. This provides developers with a wide range of choices for how they’d like to model their data, without making any compromises in scale, partitioning, throughput, indexing, or global distribution.

Horizontal Partitioning in Azure Cosmos DB

In Azure Cosmos DB, partitioning is what allows you to massively scale your database, not just in terms of storage but also throughput. You simply create a container in your database, and let Cosmos DB partition the data you store in that container, to manage its growth.

This means that you just work with the one container as a single logical resource where you store data. And you can just let the container grow and grow, without worrying about scale, because Cosmos DB creates as many partitions as needed behind the scenes to accommodate your data.

Automated Scale-Out

These partitions themselves are the physical storage for the data in your container. Think of partitions as individual buckets of data that, collectively, is the container. But you don’t need to think about it too much, because the whole idea is that it all just works automatically and transparently. So when one partition gets too full, Cosmos DB automatically creates a new partition, and frees up space in the growing partition by splitting its data, and moving some of it over into the new partition.

This technique scales the storage of your container, but the partitions themselves are also replicated internally, and so this also ensures availability of your container. Furthermore, Cosmos DB will split partitions even well before they grow too full, which also scales throughput because there are now more partitions available to satisfy a larger workload.

Life for a container begins with a single partition, and when you start storing data to the container, it gets written to that partition. Then you write some more data, which continues to fill the partition, until at some point, a new partition gets created to store the additional data. And this continues ad infinitum, resulting in horizontal scale-out that gives you a virtually unlimited container for your database:

But the question then becomes, how does Cosmos DB know which data is stored in which partitions? I mean, if your container grows to hundreds or thousands of partitions, how does Cosmos DB know where to look for data when you run a query? Having a piece of data stored arbitrarily in any given partition within the collection means that Cosmos DB needs to check each individual partition when you query. This doesn’t seem very efficient, which is why Cosmos DB can be much smarter about this, and that’s where you come in. Because your job in all of this – and really your only job – is to define a partition key for the container.

Selecting a Partition Key

The single most important thing you need to do when you create a container is to define its partition key. This could be any property in your data, but choosing the best property will scale your data for storage and throughput in the most efficient way. And conversely, some properties will be a poor choice, and will thus impede your ability to scale. So even though you don’t need to handle partitioning yourself, understanding how Cosmos DB internally partitions your data, combined with an understanding of how your data will be most typically accessed, will help you make the right choice.

Partition key values are hashed

For each item you write to the container, Cosmos DB calculates a hash on the property value that you’ve designated as the partition key, and it uses that hash to determine which physical partition the item should be stored in. This means that all the items in your container with the same value for the property you’ve chosen for the partition key will always be stored physically together in the same partition, guaranteed. They will never, ever, be spread across multiple partitions.

Physical partitions host multiple logical partitions

However, this certainly does not mean that Cosmos DB creates one partition for each unique partition key value. Your container can grow elastically, but the partitions themselves have a fixed storage capacity. So one physical partition per partition key (i.e., logical partition) is far from ideal, because you’d potentially wind up having Cosmos DB create far more partitions than are actually necessary, each with a large amount of free space that will never be used.

But as I said, Cosmos DB is much smarter than that, because it also uses a range algorithm to co-locate multiple partition keys within a single partition. This is totally transparent to you however – your data gets partitioned by the partition key that you choose, but then which physical partition is used to store an item, and what other partition keys (logical partitions) happen to be co-located on that same partition – well that’s all insignificant and irrelevant as far as we’re concerned.

Two primary considerations

So now you understand that all the items with the same partition key will always live together in the same partition, which makes it easier to figure out what property in your data might or might not be a good candidate to use for the partition key.

First, queries will be most efficient if they can be serviced by looking at one partition, as opposed to spreading the query to work across multiple partitions. That’s why, typically, you always want to supply the partition key value with any query you want to run, so that Cosmos DB can go directly to the partition where it knows all the data that could possibly be returned by your query lives. You can certainly override this, and tell Cosmos DB to query across all the partitions in your collection, but you would definitely want to pick a partition key where such a query would be the exception, rather than the general rule, which would be that most typical queries in your application (say 80% or more) could be scoped to a single partition key value.

The same is true of updates, because – as I’ll blog about soon – you can write server-side stored procedures that update multiple documents in a single transaction, so that all the updates succeed or fail together as a whole, and this is possible only across multiple documents that have the same partition key value, which is another thing to think about when defining the partition key for a container.

You also want to choose a value that won’t introduce bottlenecks for storage or throughput. Something that can be somewhat uniformly distributed across partitions, so that storage and throughput can be somewhat evenly spread within the container.

Let’s dig a little deeper with a few different scenarios. Here I’ve got a container that uses city as the partition key:

Like I’ve been explaining, all the items for any city are stored together within the same partition, and each partition is co-locating multiple partition keys. It’s not a perfect distribution – and it never will be – but at least at this point, it seems we can somewhat evenly distribute our data within the container using city as the partition key. Furthermore, any query within a given city can be serviced by a single partition, and multiple items in the same city can be updated in a single transaction using a stored procedure.

Now, what happens when we need to grow this data? Say we start getting more data for Chicago, but can already see the partition hosting Chicago is starting to run low on headroom. So when you start adding more Chicago data, Cosmos DB will at some point – automatically and transparently – split the partition. That is, it will create a new partition, and move roughly half the data from the Chicago partition into the new partition. In this case, Berlin got moved out as well as Chicago, and now there is sufficient room for Chicago to continue growing:

But is city still the best choice to use here? If you have a few cities that are significantly larger than others, or that are queried significantly more often than others, then perhaps a finer granularity might be better, like zip code, for example. That will yield many more distinct values than city, and ideally, you want a partition key with many distinct values – like hundreds or thousands, at a minimum.

Choosing the Right Partition Key

Ultimately, only you will be able to make the best choice, because the best choice will be driven by the typical data access patterns in your particular application. I have already explained that the choice you make determines if a query can be serviced by a single partition – which is preferred, and which should be the most common case – or if multiple partitions need to be looked at. And it also determines which items can be updated together in a single transaction, using a stored procedure – and which cannot.

So with that in mind, let’s see what would make a good partition key for the different kinds of applications you might be building.

User Profile Data

Typical social networking applications store user profile data, and very often the user ID is a good partition key choice for this type of application. It means that you can query across all the posts of a given user, and Cosmos DB can service the query by looking at just the one partition where all of that user’s posts are stored. And, for example, you could write a stored procedure that updates the location property of multiple posts belonging to any user, and those updates would occur inside of a transaction – they would all commit, or they would all rollback, together.

IOT

IoT applications deal with the so-called Internet of Things… where you’re storing telemetry data from all sorts of devices – it could be refrigerators, or automobiles – whatever. The device ID is typically used for IoT apps, so for example, if you’re storing information about different cars, you would go with the VIN number, the vehicle identification number that uniquely identifies each distinct automobile. Queries against any particular car, or device, can then be serviced by a single partition, and multiple items for the same car or device can then be updated together in a transaction by writing a stored procedure.

Multitenant

In a multi-tenant architecture, you provide storage for different customers, or tenants. Each tenant needs to be isolated, so the tenant ID seems like a natural candidate to choose for the partition key. Because then each tenant can run their own queries against a single partition, and transactionally update their own data in that partition using a stored procedure, completely isolated from other tenants.

Writes distributed

You also need to think about how your application writes to the container, because you want to make sure that they get spread somewhat uniformly across all the partition within the container. Say you’re building a social networking app. I mentioned that user ID might be a good choice, but what about creation date? Well, creation date is a bad choice – for any type of application, really – because of the way the application is going to write data.

So from a storage perspective, we do get nice uniformity by using the creation date for the partition key, and, as time marches on, Cosmos DB will automatically add new partitions for new data, as you’ve already seen:

But the problem is that – when the application writes new data, the writes will always be directed to the same partition, based on whatever day it is. This results in what’s called a hot partition, where we have a bottleneck that’s going to quickly consume a great deal more of the reserved throughput you’ve provisioned for the container. Specifically, Cosmos DB evenly distributes your provisioned throughput across all the physical partitions in the container. So in this case where there are four partitions, if you are provisioning (and thus paying for) 4,000 RU/sec (request units per second), you’ll only get the performance of about 1,000 RU/sec:

That’s why user ID is a much better choice. Because then, writes get directed to different partitions, depending on the user. So throughout the day, as user profile data gets written to the container, those writes are being much more evenly distributed, with user ID as the partition key.

Multiple containers

And then you need to consider that, in some scenarios, you can’t settle on one good partition key for a single container. In these cases, you’ll create multiple containers in the database, each with different partition keys, and then store different data in each container, based on usage.

The multi-tenant scenario makes a perfect example:

We’re partitioning by tenant ID, but clearly tenant number 10 is much bigger than all the other tenants. That could be because all the other tenants are, let’s say, small mom-and-pop shops, while tenant number 10 is some huge Fortune 500 company like Toyota. Compared to tenant number 10, all the other tenants have modest storage and throughput requirements.

This too creates a hot partition. First, look at storage. Partitions have a fixed size limit, which happens to be 10 gigabytes today, though that will likely change to 100 gigabytes in the future. But that limit is almost irrelevant, because independent of that, there is very uneven distribution of throughput. Tenant number 10 is much busier than the other tenants, so most writes are going to that one partition. And this is to the detriment of all the other tenants, because the reserved throughput is provisioned for the entire container. Furthermore, given the significantly larger volume of data for tenant number 10, it’s likely that large tenant will benefit from partitioning on some fine-grained property within their own data, and not just on the entire tenant ID, which is the partition key for the container.

And that’s a perfect example of when you’ll create a second container for your database. In this case, we can dedicate the entire second container to tenant number 10, and – let’s say it’s Toyota – that container can be partitioned by VIN number:

As a result, we have good uniformity across the entire database. One container handles small-sized tenants. This container is partitioned by tenant ID, and might be provisioned for relatively low throughput that gets shared by all the tenants in the container. And the other individual container handles the large tenant, Toyota. This container is partitioned by VIN number, and might even reserve more throughput for all of Toyota than the first container reserves for all smaller clients combined.