Multiple Partition Keys in Azure Cosmos DB (Part 1) – Querying the Change Feed Directly

To begin, let’s be clear that an Azure Cosmos DB container can have only one partition key. I say this from the start in case “multiple partition keys” in the title is somehow misinterpreted to imply otherwise. You always need to come up with a single property to serve as the partition key for every container and choosing the best property for this can sometimes be difficult.

Understanding the Problem

Making the right choice requires intimate knowledge about the data access patterns of your users and applications. You also need to understand how horizontal partitioning works behind the scenes in terms of storage and throughput, how queries and stored procedures are scoped by partition key, and the performance implications of running cross-partition queries and fan-out queries. So there are many considerations to take into account before settling on the one property to partition a container on. I discuss all this at length in a previous blog post Horizontal Partitioning in Azure Cosmos DB.

But here’s the rub. What if, after all the analysis, you come to realize that you simply cannot settle on a single property that serves as an ideal partition key for all scenarios? Let’s say for example, from a write perspective, you find one property will best distribute writes uniformly across all the partitions in the container. But from a query perspective, you find that using the same partition key results in too much fanning out. Or, you might identify two categories of common queries, where it’s roughly 50/50; meaning, about half of all the queries are of one type, and half are of the other. What do you do if the two query categories would each benefit from different partition keys?

Your brain can get caught in an infinite loop over this until you wind up in that state of “analysis paralysis,” where you recognize that there’s just no single best property to choose as the partition key. To break free, you need to think outside the box. Or, let’s say, think outside the container. Because the solution here is to simply create another container that’s a complete “replica” of the first. This second container holds the exact same set of documents as the first but defines a different partition key.

I placed quotes around the word “replica” because this second container is not technically a replica in the true Cosmos DB sense of the word (where, internally, Cosmos DB automatically maintains replicas of the physical partitions in every container). Rather, it’s a manual replica that you maintain yourself. Thus, it’s your job to keep it in sync with changes when they happen in the first container, which is partitioned by a property that’s optimized for writes. As those writes occur in real time, you need to respond by updating the second collection, which is partitioned by a property that’s optimized for queries.

Enter Change Feed

Fortunately, change feed comes to the rescue here. Cosmos DB maintains a persistent record of changes for every container that can be consumed using the change feed. This gives you a reliable mechanism for retrieving changes made to any container, all the way back to the beginning of time. For an introduction to Azure Change Feed, have a look at my previous blog post Change Feed – Unsung Hero of Azure Cosmos DB

In this three-part series of blog posts, I’ll dive into three ways you can use the change feed to implement a solution for synchronizing containers:

  • Querying the change feed directly
  • Using the Change Feed Processor (CFP) library
  • Writing an Azure Functions Cosmos DB trigger

Let’s get started. Assume that we’ve done our analysis and established that city is the ideal partition key for writes, as well as roughly half of the most common queries our users will be running. But we’ve also determined that state is the ideal partition key for the other (roughly half) commonly executed queries. This means we’ll want one container partitioned by city, and another partitioned by state. And we’ll want to consume the city-partitioned container’s change feed to keep the state-partitioned container in sync with changes as they occur. We’ll then be able to direct our city-based queries to the first container, and our state-based queries to the second container, which then eliminates fan-out queries in both cases.

Setting Up

If you’d like to follow along, you’ll need to be sure your environment is setup properly. First, of course, you’ll need to have a Cosmos DB account. The good news here is that you can get a free 30-day account with the “try cosmos” offering, which doesn’t even require a credit card or Azure subscription (just a free Microsoft account). Even better, there’s no limit to the number of times you can start a new 30-day trial. Create your free account at http://azure.microsoft.com/try/cosmosdb.

You’ll need your account’s endpoint URI and master key to connect to Cosmos DB from C#. To obtain them, head over to your Cosmos DB account in the Azure portal, open the Keys blade, and keep it open so that you can handily copy/paste them into the project.

You’ll also need Visual Studio. I’ll be using Visual Studio 2019, but the latest version of Visual Studio 2017 is fine as well. You can download the free community edition at https://visualstudio.microsoft.com/downloads.

Querying the Change Feed Directly

We’ll begin with the raw approach, which is to query the change feed directly using the SDK. The reality is that you’ll almost never want to go this route, except for the simplest small-scale scenarios. Still, it’s worth taking some time to examine this approach first, as I think you’ll benefit from learning how the change feed operates at a low level, and it will enhance your appreciation of the Change Feed Processor (CFP) library which I’ll cover in the next blog post of this series.

Fire up Visual Studio, create a new .NET Core console application, and name the project ChangeFeedDirect, and name the solution ChangeFeedDemos (we’ll be adding more projects to this solution in parts 2 and 3 of this blog series). Next, add the SDK to the ChangeFeedDirect project from the NuGet package Microsoft.Azure.DocumentDB.Core:

We’ll write some basic code to create a database with the two containers, with additional methods to create, update, and delete documents in the first container (partitioned by city). Then we’ll write our “sync” method that directly queries the change feed on the first container, in order to update the second container (partitioned by state) and reflect all the changes made.

Note: Our code (and the SDK) refers to containers as collections.

We’ll write all our code inside the Program.cs file. First, update the using statements at the very top of the file to get the right namespaces imported:

using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

Next, at the very top of the Program class, set two private string constants for your Cosmos DB account’s endpoint and master key. You can simply copy them from the Keys blade in the Azure portal, and paste them right into the code:

private const string CosmosEndpoint = "https://<account-name>.documents.azure.com:443/";
private const string CosmosMasterKey = "<account-key>";

Now edit the Main method with one line to call the Run method:

static void Main(string[] args)
{
  Task.Run(Run).Wait();
}

This one-liner invokes the Run method and waits for it to complete. Add the Run method next, which displays a menu and calls the various methods defined for each menu option:

private static async Task Run()
{
  while (true)
  {
    Console.WriteLine("Menu");
    Console.WriteLine();
    Console.WriteLine("DB - Create database");
    Console.WriteLine("CD - Create documents");
    Console.WriteLine("UD - Update document");
    Console.WriteLine("DD - Delete document");
    Console.WriteLine("SC - Sync collections");
    Console.WriteLine("Q - Quit");
    Console.WriteLine();
    Console.Write("Selection: ");

    var input = Console.ReadLine().ToUpper().Trim();
    if (input == "Q") break;
    else if (input == "DB") await CreateDatabase();
    else if (input == "CD") await CreateDocuments();
    else if (input == "UD") await UpdateDocument();
    else if (input == "DD") await DeleteDocument();
    else if (input == "SC") await SyncCollections();
    else Console.WriteLine("\nInvalid selection; try again\n");
  }
}

We’ll add the menu option methods one at a time, starting with CreateDatabase:

private static async Task CreateDatabase()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    // Create the database

    var dbUri = UriFactory.CreateDatabaseUri("multipk");

    try { await client.DeleteDatabaseAsync(dbUri); } catch { }

    await client.CreateDatabaseAsync(new Database { Id = "multipk" });

    // Create the two collections

    var partitionKeyDefinition = new PartitionKeyDefinition();
    partitionKeyDefinition.Paths.Add("/city");
    await client.CreateDocumentCollectionAsync(dbUri, new DocumentCollection
    {
      Id = "byCity",
      PartitionKey = partitionKeyDefinition,
      DefaultTimeToLive = -1
    }, new RequestOptions { OfferThroughput = 400 });

    partitionKeyDefinition = new PartitionKeyDefinition();
    partitionKeyDefinition.Paths.Add("/state");
    await client.CreateDocumentCollectionAsync(dbUri, new DocumentCollection
    {
      Id = "byState",
      PartitionKey = partitionKeyDefinition
    }, new RequestOptions { OfferThroughput = 400 });

    // Load the sync document into the first collection

    var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");
    var syncDocDef = new
    {
      city = "sync",
      state = "sync",
      lastSync = default(DateTime?)
    };
    await client.CreateDocumentAsync(collUri, syncDocDef);
  }

  Console.WriteLine("Created database for change feed demo");
}

Most of this code is intuitive, even if you’ve never done any Cosmos DB programming before. We first use our endpoint and master key to create a DocumentClient, and then we use the client to create a database named multipk with the two containers in it.

This works by first calling DeleteDatabaseAsync wrapped in a try block with an empty catch block. This effectively results in “delete if exists” behavior to ensure that the multipk database does not exist when we call CreateDatabaseAsync to create it.

Next, we call CreateDocumentCollection twice to create the two containers (again, a collection is a container). We name the first container byCity and assign it a partition key of /city, and we name the second container byState and assign /state as the partition key. Both containers reserve 400 request units (RUs) per second, which is the lowest throughput you can provision.

Notice the DefaultTimeToLive = -1 option applied to the first container. At the time of this writing, change feed does not support deletes. That is, if you delete a document from a container, it does not get picked up by the change feed. This may be supported in the future, but for now, the TTL (time to live) feature provides a very simple way to cope with deletions. Rather than physically deleting documents from the first container, we’ll just update them with a TTL of 60 seconds. That gives us 60 seconds to detect the update in the change feed, so that we can physically delete the corresponding document in the second container. Then, 60 seconds later, Cosmos DB will automatically physically delete the document from the first container by virtue of its TTL setting. You’ll see all this work in a moment when we run the code.

The other point to call out is the creation of our sync document, which is a special metadata document that won’t get copied over to the second container. Instead, we’ll use it to persist a timestamp to keep track of the last time we synchronized the containers. This way, each time we sync, we can request the correct point in time from which to consume changes that have occurred since the previous sync. The document is initialized with a lastSync value of null so that our first sync will consume the change feed from the beginning of time. Then lastSync is updated so that the next sync picks up precisely where the first one left off.

Now let’s implement CreateDocuments. This method simply populates three documents in the first container:

private static async Task CreateDocuments()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

    var dataDocDef = new
    {
      city = "Brooklyn",
      state = "NY",
      slogan = "Kings County"
    };
    await client.CreateDocumentAsync(collUri, dataDocDef);

    dataDocDef = new
    {
      city = "Los Angeles",
      state = "CA",
      slogan = "Golden"
    };
    await client.CreateDocumentAsync(collUri, dataDocDef);

    dataDocDef = new
    {
      city = "Orlando",
      state = "FL",
      slogan = "Sunshine"
    };
    await client.CreateDocumentAsync(collUri, dataDocDef);
  }

  Console.WriteLine("Created 3 documents in city collection");
}

Notice that all three documents have city and state properties, where the city property is the partition key for the container that we’re creating these documents in. The state property is the partition key for the second container, where our sync method will create copies of these documents as it picks them up from the change feed. The slogan property is just an ordinary document property. And although we aren’t explicitly supplying an id property, the SDK will automatically generate one for each document with a GUID as the id value.

We’ll also have an UpdateDocument method to perform a change on one of the documents:

private static async Task UpdateDocument()
{
  var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    // Update a document
    var brooklynDoc = client
      .CreateDocumentQuery(collUri, "SELECT * FROM c WHERE c.city = 'Brooklyn'")
      .AsEnumerable()
      .FirstOrDefault();

    brooklynDoc.slogan = "Fuhgettaboutit! " + Guid.NewGuid().ToString();
    await client.ReplaceDocumentAsync(brooklynDoc._self, brooklynDoc);
  }

  Console.WriteLine("Updated Brooklyn document in city collection");
}

This code retrieves the Brooklyn document, updates the slogan property, and calls ReplaceDocumentAsync to persist the change back to the container.

Next comes the DeleteDocument method:

private static async Task DeleteDocument()
{
  var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    // Delete a document (set time-to-live at 60 seconds)
    var orlandoDoc = client
      .CreateDocumentQuery(collUri, "SELECT * FROM c WHERE c.city = 'Orlando'")
      .AsEnumerable()
      .FirstOrDefault();

    orlandoDoc.ttl = 60;
    await client.ReplaceDocumentAsync(orlandoDoc._self, orlandoDoc);
  }

  Console.WriteLine("Deleted Olrando document in city collection");
}

Remember that (currently) the change feed doesn’t capture deleted documents, so we’re using the TTL (time to live) technique to keep our deletions in sync. Rather than calling DeleteDocumentAsync to physically delete the Orlando document, we’re simply updating it with a ttl property set to 60 and saving it back to the container with ReplaceDocumentAsync. To the change feed, this is just another update, so our sync method will pick it up normally as you’ll see in a moment. Meanwhile, Cosmos DB will physically delete the Orlando document from the first container in 60 seconds, giving our sync method up to one minute to pick it up from the change feed and delete it from the second container.

And finally, the sync method, which is what this whole discussion is all about. Here’s the code for SyncCollections:

private static async Task SyncCollections()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    var cityCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

    // Obtain last sync time

    var syncDoc = client
      .CreateDocumentQuery(cityCollUri, "SELECT * FROM c WHERE c.city = 'sync'")
      .AsEnumerable()
      .FirstOrDefault();

    var lastSync = (DateTime?)syncDoc.lastSync;

    // Step 1: Gather all the partition key ranges (physical partitions)

    var continuationToken = default(string);
    var partitionKeyRanges = new List<PartitionKeyRange>();
    var loop = true;

    while (loop)
    {
      var partitionKeyRange = await client
        .ReadPartitionKeyRangeFeedAsync(
          cityCollUri,
          new FeedOptions { RequestContinuation = continuationToken });

      partitionKeyRanges.AddRange(partitionKeyRange);
      continuationToken = partitionKeyRange.ResponseContinuation;
      loop = continuationToken != null;
    }

    // Step 2: Consume the change feed for each partition key range

    // (simple demo doesn't scale when continuation tokens are needed
    foreach (var partitionKeyRange in partitionKeyRanges)
    {
      var options = new ChangeFeedOptions
      {
        PartitionKeyRangeId = partitionKeyRange.Id,
        StartFromBeginning = (lastSync == null),
        StartTime = (lastSync == null ? null : lastSync),
      };

      var query = client.CreateDocumentChangeFeedQuery(cityCollUri, options);

      while (query.HasMoreResults)
      {
        var readChangesResponse = await query.ExecuteNextAsync();
        foreach (var changedDocument in readChangesResponse)
        {
          if (changedDocument.city != "sync")
          {
            if (JsonConvert.DeserializeObject(changedDocument.ToString())["ttl"] == null)
            {
              var stateCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byState");
              await client.UpsertDocumentAsync(stateCollUri, changedDocument);
              Console.WriteLine($"Upserted document id {changedDocument.id} in byState collection");
            }
            else
            {
              var stateDocUri = UriFactory.CreateDocumentUri("multipk", "byState", changedDocument.id);
              await client.DeleteDocumentAsync(stateDocUri, new RequestOptions
              {
                PartitionKey = new PartitionKey(changedDocument.state)
              });
              Console.WriteLine($"Deleted document id {changedDocument.id} in byState collection");
            }
          }
        }
      }
    }

    // Update last sync time
    syncDoc.lastSync = DateTime.UtcNow;
    await client.ReplaceDocumentAsync(syncDoc._self, syncDoc);
  }

}

Let’s break this down. First, we grab the last sync time from the sync document in the first container. Remember, this will be null the very first time we run this method. Then, we’re ready to query the change feed, which is a two-step process.

For step 1, we need to discover all the partition key ranges in the container. A partition key range is essentially a set of partition keys. In our small demo, where we have only one document each across three distinct partition keys (cities), Cosmos DB will host all three of these documents inside a single partition key range.

Although there is conceptually only one change feed per container, there is actually one change feed for each partition key range in the container. So step 1 calls ReadPartitionKeyRangeFeedAsync to discover the partition key ranges, with a loop that utilizes a continuation token from the response so that we retrieve all of the partition key ranges into a list.

Then, in step 2, we iterate the list to consume the change feed on each partition key range. Notice the ChangeFeedOptions object that we set on each iteration, which identifies the partition key range in PartitionKeyRangeId, and then sets either StartFromBeginning or StartTime, depending on whether lastSync is null or not. If it’s null (which will be true only on the very first sync), then StartFromBeginning will be set to true and StartTime will be set to null. Otherwise, StartFromBeginning gets set to false, and StartTime gets set to the timestamp from the last sync.

After preparing the options, we call CreateDocumentChangeFeedQuery that returns an iterator. As long as the iterator’s HasMoreResults property is true, we call ExecuteNextAsync on it to fetch the next set of results from the change feed. And here, ultimately, is where we plug in our sync logic.

Each result is a changed document. We know this will always include the sync document, because we’ll be updating it after every sync. This is metadata that we don’t need copied over to the second container each time, so we filter out the sync document by testing the city property for “sync.”

For all other changed documents, it now becomes a matter of performing the appropriate create, update, or delete operation on the second container. First, we check to see if there is a ttl property on the document. Remember that this is our indication of whether this is a delete or not. If the ttl property isn’t present, then it’s either a create or an update. In either case, we handle the change by calling UpsertDocumentAsync on the second container (upsert means “update or insert”).

Otherwise, if we detect the ttl property, then we call DeleteDocumentAsync to delete the document from the second container, knowing that Cosmos DB will delete its counterpart from the first container when the ttl expires.

Let’s test it out. Start the console app and run the DB (create database) and CD (create documents) commands. Then navigate to the Data Explorer in the Azure portal to verify that the database exists with the two containers, and that the byCity container has three documents in it, plus the sync document with a lastSync value of null indicating that no sync has yet occurred:

The byState container should be empty at this point, because we haven’t run our first sync yet:

Back in the console app, run the SC command to sync the containers. This copies all three documents from the first container’s change feed over to the second container, skipping the sync document which we excluded in our code:

Upserted document id 01501a99-e8df-4c3b-9892-ed2eadb81180 in byState collection
Upserted document id fb8d41ae-3aae-4892-bfe9-8c34bc8138d2 in byState collection
Upserted document id 5f5600f3-9f34-4a4d-bdb4-28061a5ab35a in byState collection

Returning to the portal, refresh the data explorer to confirm that the second container now has the same three documents as the first, although here they are partitioned by state rather than city:

Both containers are now in sync. Refresh the first container view, and you’ll see that the lastSync property has been changed from null to a timestamp from when the previous sync ran.

Run the UD command in the console app the update the first container with a change to the slogan property of the Brooklyn, NY document. Now run SC to sync the containers again:

Upserted document id 01501a99-e8df-4c3b-9892-ed2eadb81180 in byState collection

Refresh the second container view, and you’ll see that the slogan property has now been updated there as well.

Finally, run the DD command in the console app the delete the Orlando, FL document from the first container. Remember that this doesn’t actually delete the document, but rather updates it with a ttl property set to 60 seconds. Then run SC to sync the containers again:

Deleted document id 5f5600f3-9f34-4a4d-bdb4-28061a5ab35a in byState collection

You can now confirm that the Orlando, FL document is deleted from the second container, and within a minute (upon ttl expiration), you’ll see that it gets deleted from the first container as well.

However, don’t wait longer than a minute after setting the ttl before running the sync or you will run out of time. Cosmos DB will delete the document from the first container when the ttl expires, at which point it will disappear from the change feed and you will lose your chance to delete it from the second container.

What’s Next

It didn’t take that much effort to consume the change feed, but that’s only because we have a tiny container with just a handful of changes, and we’re manually invoking each sync. To consume the change feed at scale, much more work needs to be done. For example, the change feed on each partition key range of the container can be consumed concurrently, so we could add multithreading logic to parallelize those queries. Long change feeds can also be consumed in chunks, using continuation tokens that we could persist as a “lease,” so that new clients can resume consumption where previous clients left off. We also want the sync automated, so that we don’t need to poll manually.

Fortunately, the Change Feed Processor (CFP) library handles all these details for you. It was certainly beneficial to start by querying the change feed directly, since exploring that option first is a great way to learn how change feed works internally. However, unless you have very custom requirements, the CFP library is the way to go.

So stay tuned for part 2, and we’ll see how much easier it is to implement our multiple partition key solution much more robustly using the CFP library.

Advertisements

Change Feed – Unsung Hero of Azure Cosmos DB

Introduction

Azure Cosmos DB is rapidly growing in popularity, and for good reason. Microsoft’s globally distributed, multi-model database service has massively scalable storage and throughput, provides a sliding scale for consistency, is fully and automatically indexed, and it exposes multiple APIs. Throw in a server-side programming model for ACID transactions with stored procedures, triggers, and user-defined functions, along with 99.999% SLAs on availability, throughput, latency, and consistency, and it’s easy to see why Cosmos DB is fast winning the hearts of developers and solution architects alike.

Yet still today, one of the most overlooked capabilities in Cosmos DB is its change feed. This little gem sits quietly behind every container in your database, watches for changes, and maintains a persistent record of them in the order they occur. This provides you with a reliable mechanism to consume a continuous and incremental feed of changes, as documents are actively written or modified in any container.

There are numerous use cases for this, and I’ll call out a few of the most common ones in a moment. But all of them share a need to respond to changes made to a Cosmos DB container. And the first thought that comes to the mind of a relational database developer is to use a trigger for this. Cosmos DB supports triggers as part of its server-side programming model, so it could be natural to think of using this feature to consume changes in real time when you need to.

Unfortunately, though, triggers in Cosmos DB do not fire automatically as they do in the relational world. They need to be explicitly referenced with each change in order to run, so they cannot be relied upon for capturing all changes made to a container. Furthermore, triggers are JavaScript-only, and they run in a bounded execution environment within Cosmos DB that is scoped to a single partition key. These characteristics further limit what triggers can practically accomplish in response to a change.

But with change feed, you’ve got a reliable mechanism for retrieving changes made to any container, all the way back to the beginning of time. You can write code (in your preferred language) that consumes the change feed to process it as needed and deploy that code to run on Azure. This paves an easy path for you to build many different solutions for many different scenarios.

Scenarios for Change Feed

Some of the more common use cases for change feed include:

  • Replicating containers for multiple partition keys
  • Denormalizing a document data model across containers
  • Triggering API calls for an event-driven architecture
  • Real time stream processing and materialized view patterns
  • Moving or archiving data to secondary data stores

Each of these deserves their own focused blog post (and will hopefully get one). For the broader context of this overview post, however, I’ll discuss them each at high level.

Replicating containers for multiple partition keys

One of the most (perhaps the most) important things you need to do when creating a container is to decide on an appropriate partition key – a single property in your data that the container will be partitioned by.

Now sometimes this is easy, and sometimes it is not. In order to settle on the correct choice, you need a clear understanding of how your data is used (written to and queried), and how horizontal partitioning works. You can read all about this in my previous blog post, Horizontal Partitioning in Azure Cosmos DB.

But what do you do when you can’t decide? What if there are two properties that make good choices, one for write performance, and another that’s better for query performance? This can lead to “analysis paralysis,” a scary condition that is fortunately and easily remedied using change feed.

All you do is create two containers, each partitioned by a different partition key. The first container uses a partition key that’s optimized for writes (it may also be appropriate for certain types of queries as well), while the second one uses a partition key optimized for most typical queries. Simply use change feed to monitor changes made to the container as the writes occur and replicate the changes out to the second container.

Your application then writes to the first container and queries from the second container, simple as that! I’ll show you a detailed walkthrough of exactly how to implement this using C# and the Change Feed Processor Library with Azure Functions in my next post.

Denormalizing a document data model across containers

Developers with a background in relational database design often struggle initially with the denormalized approach to data modeling in the NoSQL world of JSON documents. I personally empathize; from my own experience, I know that it can be difficult at first to embrace concepts that run contrary to deeply engrained practices that span decades of experience in the field.

Data duplication is a case in point, where this is considered a big no-no in the normalized world of relational databases with operational workloads. But with NoSQL, we often deliberately duplicate data in order to avoid expensive additional lookups. There is no concept of a JOIN in any NoSQL database engine, and we can avoid having to perform our own “manual” joins if we simply duplicate the same information across documents in different containers.

This is a somewhat finer-grained version of the previous scenario, which replicates entire documents between two containers. In this case, we have different documents in each container, with data fragments from changed documents in one container being replicated into other (related) documents in another container.

But how do you ensure that the duplicated data remains in sync as changes occur in the source container? Why, change feed of course! Just monitor the source container and update the target container. I’ll show you exactly how to do this in a future post.

Triggering API calls for an event-driven architecture

In this scenario, you source events to a set of microservices, each with a single responsibility. For instance, an ecommerce website with a large-scale order processing pipeline. The pipeline is broken up into a set of smaller microservices, each of which can be scaled out independently. Each microservice is responsible for a single task in the pipeline, such as calculating tax on each order, generating tax audit records, processing each order payment, sending orders off to a fulfillment center, and generating shipping notifications.

Thus, you potentially have N microservices communicating with up to N-1 other microservices, which adds significant complexity to the larger architecture. The design can be greatly simplified if, instead, all these microservices communicate through a bus; that is, a persistent event store. And Cosmos DB serves as an excellent persistent event store, because the change feed makes it easy to broker and source these events to each microservice. Furthermore, because the events themselves are persisted, the order processing pipeline itself is very robust and incredibly resilient to failures. You can also query and navigate the individual events, so that this data can be surfaced out through a customer care API.

Check out this link for a real-world case study of how Jet.com implemented an event-sourced architecture using the Cosmos DB change feed with hundreds of microservices: https://customers.microsoft.com/en-us/story/jet-com-powers-innovative-e-commerce-engine-on-azure-in-less-than-12-months. There is also a video on this from Microsoft Build 2018 here: https://channel9.msdn.com/Events/Build/2018/BRK3602

Real time stream processing and materialized view patterns

The change feed can also be used for performing real time stream processing and analytics. In the order processing pipeline scenario, for example, this would enable you to take all the events and materialize a single view for tracking the order status. You could then easily and efficiently present the order status through a user-facing API.

Other examples of so called “lambda architectures” include performing real time analytics on IoT telemetry or building a scoring leader board for a massively multiplayer online video game.

Moving or archiving data to secondary data stores

Another common scenario for using the change feed involves replicating data from Cosmos DB as your primary (hot) store to some other secondary (cold) data store. Cosmos DB is a superb hot store because it can sustain heavy write ingestion, and then immediately serve the ingested records back out to a user-facing API.

Over time as the volume of data mounts, however, you may want to offload older data to cold storage for archival. Once again, change feed is a wonderful mechanism to implement a replication strategy that does just that.

Consuming the Change Feed

So how do you actually work with the change feed? There are several different ways, and I’ll conclude this blog post by briefly explaining three of them. (Don’t worry, I’ll drill deeper into all three next post!)

Direct Access

First, you can query the change feed directly using the SDK. This raw approach works but is the hardest to implement at large scale. Essentially, you first need to discover all the container’s partitions, and then you query each of them for their changes. You also need to persist state metadata; for example, a timestamp for when the change feed was last queried, and a continuation token for each partition. Plus, you’ll want to optimize performance by spawning multiple tasks across different partitions so that they get processed in parallel.

If all this sounds like a lot of work, it is. Which is why you’ll almost certainly want to leverage the Change Feed Processor (CFP) library instead.

Change Feed Processor (CFP) Library

The CFP library provides a high-level abstraction over direct access that greatly simplifies the process of reading the change feed from all the different partitions of a container. This is a separate NuGet package that you pull into your project, and it handles all the aforementioned complexity for you. It will automatically persist state, track all the partitions of the container, and acquire leases so that you can scale out across many consumers.

To make this work, the CFP library persists a set of leases as documents in another dedicated Cosmos DB container. Then, when you spin up consumers, they attempt to acquire leases as they expire.

All you do is write an observer class that implements IChangeFeedObserver. The primary method of this interface that you need to implement is ProcessChangesAsync, which receives the change feed as a list of documents that you can process as needed. No partitions to worry about, no timestamps or continuation tokens to persist, and no scale-out needs to concern yourself with.

However, you still need to write your own host, and deploy the DLL with your observer class to an Azure app service. Although the process is straightforward, going with Azure Functions instead provides an even easier deployment model.

Azure Functions

The simplest way to consume the change feed is by using Azure Functions with a Cosmos DB trigger. If you’re not already familiar with Azure Functions, they let you write individual methods (functions), which you deploy for execution in a serverless environment hosted on Azure. The term “serverless” here means without also having to write a host and deploy an Azure app service to run your code.

Azure Functions are invoked by one of any number of triggers. Yes, Azure Functions also uses the term triggers, but unlike triggers in Cosmos DB, an Azure Functions trigger always fires when its conditions are met. There are several different triggers available, including the one that we care about here, the Cosmos DB trigger. This Azure Functions trigger binds to configuration that points to the container you want to monitor changes on, and the lease collection that gets managed by the CFP library under the covers.

From there, the process is identical to using the CFP library, only the deployment model is dramatically simpler. The Azure Functions method that is bound using the Cosmos DB trigger receives a parameter with the same list of documents that a CFP library’s observer class does in its ProcessChangesAsync method, and you process change feed data the way you need to just the same.

What’s Next?

Hopefully, this blog post opened your eyes to the change feed in Azure Cosmos DB. This powerful feature creates all sorts of exciting possibilities across a wide range of use cases. In the next installment, I’ll focus on container replication for multiple partition keys, and walk you step by step through the process of building a change feed solution using direct access, CFP library, and Azure Functions – all with working code samples. So, stay tuned!

Understanding Consistency Levels in Azure Cosmos DB

Developers with a background in relational databases are accustomed to achieving data integrity using transactions. Once a writer updates a bank balance and commits the transaction, it’s entirely unacceptable for a reader to ever be served the previous value, and a relational database ensures that this never happens. In the NoSQL world, this is referred to as strong consistency. Achieving strong consistency in a NoSQL database is more challenging, because NoSQL databases by design write to multiple replicas. And in the case of Azure Cosmos DB, these replicas can be geographically spread across multiple Microsoft data centers throughout the world.

First though, let’s understand consistency within the context of a single data center.

In one Azure data center, your data is written out to multiple replicas (four at least). Consistency is all about whether or not you can be sure that the data you are reading at any point in time is – in fact – the latest version of that data, because, you can be reading from any replica at any given time. And if it’s not the latest version, then this is known as a “dirty read.”

Cosmos DB supports five consistency levels, ranging from strong to eventual. I’ll discuss each of these levels in a moment, but you can see that there’s a sliding scale here that balances latency with availability:

On the one extreme, strong consistency guarantees you’ll never experience a dirty read, but of course, this guarantee comes with a cost, because Cosmos DB can’t permit reads until all the replicas are updated. This results in higher latency, which degrades performance.

On the other end of the spectrum, there’s eventual consistency, which offers no guarantees whatsoever, and you never know with certainty whether or not you’re reading the latest version of data. This gives Cosmos DB the freedom to serve your reads from any available replica, without having to wait for that replica to receive the latest updates from other writes. In turn, this results in the lowest latency, and delivers the best performance, at the cost of potentially experiencing dirty reads.

Replication within a region

Consistency is always a concern, but practically, it only becomes really important once you start geo-replicating your data. Because the reality is that, despite these varying consistency levels, it’s extremely rare to ever experience a dirty read within a data center, even when using eventual consistency, which is the weakest consistency level. And that’s because the replicas in a data center are located very close to one another, and data moves extremely fast between them… typically within one millisecond.

So in fact, there is little difference in consistency, latency, performance, and availability within a data center, where you’ll almost never encounter a dirty read, but it’s a completely different matter once you start replicating globally.

Global replication

Once you globally distribute your data, it takes hundreds of milliseconds to replicate changes across continents. This greatly increases the chances for dirty reads, which is why consistency becomes a much more pressing concern once you globally distribute your data across multiple data centers.

Five Consistency Levels

Strong

In the case of Strong Consistency, you can be certain that you’re always reading the latest data. But you pay for that certainty in performance. Because when somebody writes to the database, everybody waits for Cosmos DB to acknowledge that the write was successfully saved to all the replicas. Only then can Cosmos DB serve up consistent reads, guaranteed. Cosmos DB actually does support strong constency across regions, assuming that you can tolerate the high latency that will result while replicating globally.

Bounded Staleness

Next there’s bounded staleness, which is kind of one step away from strong. As the name implies, this means that you can tolerate stale data, but up to a point. So you can actually configure much how out of date you want to allow for stale reads. Think of it as controlling the level of freshness, where you might have dirty reads, but only if the data isn’t too much out of date. Otherwise, for data that’s too stale, Cosmos DB will switch to strong consistency. In terms of defining how stale is stale, you can specify lag in terms of time – like, no stale data older than X, or update operations – as in, no stale data that’s been updated more than Y number of times.

Session

And then there’s session consistency, which is actually the default consistency level. With session consistency, you maintain a session for your writer – or writers – by defining a unique session key that all writers include in their requests. Cosmos DB then uses the session key to ensure strong consistency for the writer, meaning that a writer is always guaranteed to read the same data that they wrote, and will never read a stale version from an out-of-date replica – while everyone else may experience dirty reads.

When you create a DocumentClient instance with the .NET SDK, it automatically establishes a session key and includes it with every request. This means that within the lifetime of a DocumentClient instance, you are guaranteed strong consistency for reading any data that you’ve written, when using session consistency. This is a perfect balance for many scenarios, which is why session consistency the default. Because very often, you want to sure immediately that the data you’ve written has – in fact – been written, but it’s perfectly OK if it takes a little extra time for everyone else to see what you’ve written, once all the replicas have caught up with your write.

Consistent Prefix

With consistent prefix, dirty reads are always possible. However, you do get some guarantees. First, when you do get stale data from a dirty read, you can at least be sure that the data you get has in fact been updated to all the replicas, even thought it’s not the most up-to-date version of that data, which has still yet to be replicated. And second, you’ll never experience reads out of order. For example, say the same data gets updated four times, so there are versions A, B, C, and D, where D is the must up to date version, and A, B, and C are stale versions. Now assume A and B have both been replicated, but C and D haven’t yet. You can be sure that, until C and D do get replicated, you’ll never read versions A and B out of order. That is, you might first get version A when only A has been replicated, and then get version B once version B has been replicated, and then you’ll never again get version A. You can only get the stale versions in order, so you’ll only continue getting version B until a later version gets replicated.

Eventual

Eventual consistency is the weakest consistency level. It offers no guarantees whatsoever, in exchange for the lowest latency and best performance compared to all the other consistency levels. With eventual, you never wait for any acknowledgement that data has been fully replicated before reading. This means that read requests can be serviced by replicas that have already been updated, or by those that haven’t. So you can get inconsistent results in a seemingly random fashion, until eventually, all the replicas get updated and then all the reads become consistent – until the next write operation, of course. So this really is the polar opposite of strong consistency, because now there’s no guarantee that you’re ever reading the latest data, but there’s also never any waiting, which delivers the fastest performance.

One more thing…

It’s also worth mentioning that when bounded staleness and session consistency levels don’t apply strong consistency, they fall back to consistent prefix, not all the way down eventual consistency. So for bounded staleness, which gives you strong consistency when the data is too stale, you’ll still get consistent prefix guarantees when the data isn’t too stale. Likewise, with session, which gives you strong consistency when reading your own writes, you’ll get consistent prefix when reading other writes.

Demystifying the Multi-Model Capabilities in Azure Cosmos DB

When someone casually asks me, “Hey, what is Cosmos DB?,” I casually respond, “Well, that’s Microsoft’s globally distributed, massively scalable, horizontally partitioned, low latency, fully indexed, multi-model NoSQL database, of course.” One of two things happen then. I’ll often get a long weird look, after which the other person politely excuses themselves and moves along. But every now and again, I’ll hear “wow, that sounds awesome, tell me more!” If you’re still reading this, then I’m guessing you’re in the latter category.

If you start to elaborate on each of the bullet points in my soundbite response, there’s a lot to discuss before you get to “multi-model NoSQL” at the tail end. Starting with “globally distributed,” Cosmos DB is – first and foremost – a database designed for modern web and mobile applications, which are (typically) global applications in nature. Simply by clicking the mouse on a map in the portal, your Cosmos DB database is instantly replicated anywhere and everywhere Microsoft hosts a data center (there are nearly 50 of them worldwide, to date). This delivers high availability and low latency to users wherever they’re located.

Cosmos DB also delivers virtually unlimited scale, both in terms of storage – via server-side horizontal partitioning, and throughput – by provisioning a prescribed number of request units (RUs) per second. This ensures low latency, and is backed by comprehensive SLAs to yield predictable database performance for your applications. And it’s unique “inverted indexing” scheme enables automatic indexing of every property that you store, with minimal overhead.

Whew. That’s quite a lot to digest before we even start pondering Cosmos DB’s multi-model support, mentioned there at the end of my lengthy description. In fact, it’s very deliberately placed at the end. Because regardless of which data model you choose, it actually makes no difference to Cosmos DB. The capabilities around global distribution, horizontal partitioning, provisioned throughput, and automatic indexing apply – these are durable concepts that transcend whatever data model you choose. So you get to pick and choose among any of the supported data models, without compromising any of the core features of the Cosmos DB engine.

Which segues right into the topic of this blog post. What exactly is “multi-model”? And specifically, what does it mean for a database platform like Cosmos DB to support multiple data models?

It all boils down to how you’d like to treat your data – and this is where the developer comes in. Because, while massive scale is clearly important (if not critical), developers don’t really care about such details as long as it all “just works.” And it’s Cosmos DB’s job to make sure that this all works. When it comes to actually building applications – well, that’s the developer’s job, and this is where the decision of which data model to choose comes into play.

Depending on the type of application being built, it could be more appropriate to use one data model and not another. For example, if the application focuses more on relationships between entities than the entities themselves, then a graph data model may work better than a document model. In other cases, a developer may want to migrate an existing NoSQL application to Cosmos DB; for example, an existing Mongo DB or Cassandra application. In these scenarios, the Cosmos DB data model will be pre-determined; depending on the back-end database dependency of the application being ported, the developer would choose either the Mongo DB-compatible or Cassandra-compatible data model. Such a migration would require minimal (to no) changes to the existing application code. And yet, in other “green field” situations, developers that are very opinionated about how data should be modeled are free to choose whichever data model they prefer.

Each data model has an API for developers to work with in Cosmos DB. Put another way, the developer chooses an API for their database, and that determines the data model that is used. So, let’s break it down:

MM1

Document Data Model (SQL & MongoDB APIs)

The first thing to point out is that the SQL API is, essentially, the original DocumentDB programming model from the days when Cosmos DB was called DocumentDB. This is arguably the most robust and capable of all the APIs, because it is the only one that exposes a server-side programming model that lets you build fully transactional stored procedures, triggers, and user-defined functions.

So both the SQL and MongoDB APIs give you a document data model, but the two APIs themselves are radically different. Yes, they are similar from a data modeling perspective; you store complete denormalized entities as a hierarchical key-value document model; pure JSON in the case of the SQL API, or BSON in the case of the MongoDB API (BSON is MongoDB’s special binary-encoded version of JSON that extends JSON with additional data types and multi-language support).

The critical difference between the two APIs is the programming interface itself. The SQL API uses Microsoft’s innovative variant of structured query language (SQL) that is tailored for searching across hierarchical JSON documents. It also supports the server-side programming model (for example, stored procedures), which none of the other APIs do.

In contrast, the MongoDB API actually provides wire-level support, which is a compatibility layer that understands the protocol used by the MongoDB driver for sending packets over the network. MongoDB has a built-in find method used for querying documents (unlike the SQL support found in the SQL API). So the MongoDB API appeals to existing MongoDB developers, because they now enjoy the scale-out, throughput, and global distribution capabilities of Cosmos DB, without deserting the MongoDB ecosystem. Because the MongoDB API in Cosmos DB gives you full compatibility with existing MongoDB application code, and lets you continue working with familiar MongoDB tools.

Key-Value Data Model (Table API)

You can also model your data as a key-value store, using the Table API. This API is actually the evolution of Azure Table Storage – one of the very first NoSQL databases available on Azure. In fact, all existing Azure Table Storage customers will eventually be migrated over to Cosmos DB and the Table API.

With this data model, each entity consists of a key and a value pair, but the value itself is a set of key-value pairs. So this is nothing like a table in a relational database, where each row has the same columns; with the Table API in Cosmo DB, each entity’s value can have a different set of key-value pairs.

The Table API appeals primarily to existing Azure Table Storage customers, because it emulates the Azure Table Storage API. Using this API, existing Azure Table Storage apps can be migrated quickly and easily to Cosmos DB. For a new project though, there would be little reason to ever consider using the Table API, when you consider the fact that the SQL API is far more capable than the Table API.

So then, when would you actually choose to use the Table API? Well, again, the primary use case is to migrate an existing Azure Table Storage account over to Cosmos DB, without having to change any code in your applications. Remember that Microsoft is planning to do this for every customer over a long term migration, but there’s no reason to wait for them to do that, if you don’t want to wait. You can migrate the data yourself now, and then immediately start enjoying the benefits of Cosmos DB as a back-end, and you don’t have to make any changes whatsoever to your existing Azure Table Storage applications. You just change the connection string to point to Cosmos DB, and the application continues to work seamlessly against the Cosmos DB Table API.

Graph Data Model (Gremlin API)

You can also choose the Gremlin API, which gives you a graph database based on the Apache Tinkerpop open source project. Graph databases are becoming increasingly popular in the NoSQL world.

What do you put in a graph? One of two things; either a vertex or an edge. Now don’t let these terms intimidate you. They’re just fancy words for entities and relationships, respectively. So a vertex is an entity, and an edge is a one-way relationship between any two vertices. And that’s it – nothing more and nothing less. These are the building blocks of any graph database. And whether you’re storing a vertex or an edge, you can attach any number of arbitrary properties to it; much like the arbitrary key-value pairs you can define for a row using the Table API, or a flat JSON document using the SQL API.

The Gremlin API provides a succinct graph traversal language that enables you to efficiently query across the many relationships that exist in a graph database. For example, in a social networking application, you could easily find a user, then look for all of that user’s posts where the location is NY, and of those, find all the relationships where some other user has commented on or liked those posts.

Columnar (Cassandra API)

There’s a fourth option for choosing a data model, and that’s columnar, using the Cassandra API. Columnar is yet another way of modeling your data, where – in a departure from the typical way of dealing with schema-free data in the NoSQL world – you actually do define the schema of your data up-front. However, data is still stored physically in a column-oriented fashion, so it’s still OK to have sparse columns, and it has good support for aggregations. Columnar is somewhat similar to the Key-Value data model with the Table API, except that every item in the container is required to adhere to the schema defined for the container. And in that sense, columnar is really most similar to columnstore in SQL Server, except of course that it is implemented using a NoSQL architecture, so it’s distributed and partitioned to massively scale out big data.

Atom Record Sequence (ARS)

The fact of the matter is, these APIs merely project your data as different data models; whereas internally, your data is always stored as ARS – or Atom Record Sequence – a Microsoft creation that defines the persistence layer for key-value pairs. Now you don’t need to know anything about ARS; you don’t even need to know that it’s there. But it is there, under the covers, storing all your data as key-value pairs in a manner that’s agnostic to the data-model you’ve chosen to work with.

Because at the end of the day, it’s all just keys and values – not just the key-value data model, but all these data models. They’re all some form of keys and values. A JSON or BSON document is a collection of keys and values, where values can either be simple values, or they can contain nested key-value pairs. The key-value model is clearly based on keys and values, but so are graph and columnar. The relationships you define in a graph database are expressed as annotations that are, themselves key-value pairs, and certainly all the columns defined for a columnar data model can be viewed as key-value pairs as well.

So, these API’s are here to broaden your choices in terms of how you get to treat your data; they bear no consequence on the ability to scale your database. For example, if you want to be able to write SQL queries, you would choose the SQL API, and not the Table API. But if you want MongoDB or Azure Table Storage compatibility, then you’d go with the MongoDB or Table API respectively.

Switching Between Data Models

As you’ve seen, when you choose an API, you are also choosing a data model. And today (since the release of Cosmos DB in May 2017), you choose an API when you create a Cosmos DB account. This means that today, a Cosmos DB account is tied to one API, which ties it to one data model:

MM2

But again, each data model is merely a projection of the same underlying ARS format, and so eventually you will be able to create a single account, and then switch freely between different APIs within the account. So that then, you’ll be able to access one database as graph, key-value, document, or columnar, all at once, if you wish:

MM3

You can also expect to see additional APIs in the future, as Cosmos DB broadens its compatibility support for other database systems. This will enable a an even wider range of developers to stick with their database of choice, while leveraging Cosmos DB as a back end for horizontal partitioning, provisioned throughput, global distribution, and automatic indexing.

Summary

Azure Cosmos DB has multiple APIs and supports multiple data models. In this blog post, we explored the multi-API, multi-model capabilities of Cosmos DB, including the document data model with either the SQL or MongoDB APIs, key-value with the Table API, graph with the Gremlin API, and columnar with the Cassandra API.

Regardless of which data model you choose, however, Cosmos DB stores everything in ARS, and merely projects different data models, based on the different APIs. This provides developers with a wide range of choices for how they’d like to model their data, without making any compromises in scale, partitioning, throughput, indexing, or global distribution.

Horizontal Partitioning in Azure Cosmos DB

In Azure Cosmos DB, partitioning is what allows you to massively scale your database, not just in terms of storage but also throughput. You simply create a container in your database, and let Cosmos DB partition the data you store in that container, to manage its growth.

This means that you just work with the one container as a single logical resource where you store data. And you can just let the container grow and grow, without worrying about scale, because Cosmos DB creates as many partitions as needed behind the scenes to accommodate your data.

Automated Scale-Out

These partitions themselves are the physical storage for the data in your container. Think of partitions as individual buckets of data that, collectively, is the container. But you don’t need to think about it too much, because the whole idea is that it all just works automatically and transparently. So when one partition gets too full, Cosmos DB automatically creates a new partition, and frees up space in the growing partition by splitting its data, and moving some of it over into the new partition.

This technique scales the storage of your container, but the partitions themselves are also replicated internally, and so this also ensures availability of your container. Furthermore, Cosmos DB will split partitions even well before they grow too full, which also scales throughput because there are now more partitions available to satisfy a larger workload.

Life for a container begins with a single partition, and when you start storing data to the container, it gets written to that partition. Then you write some more data, which continues to fill the partition, until at some point, a new partition gets created to store the additional data. And this continues ad infinitum, resulting in horizontal scale-out that gives you a virtually unlimited container for your database:

But the question then becomes, how does Cosmos DB know which data is stored in which partitions? I mean, if your container grows to hundreds or thousands of partitions, how does Cosmos DB know where to look for data when you run a query? Having a piece of data stored arbitrarily in any given partition within the collection means that Cosmos DB needs to check each individual partition when you query. This doesn’t seem very efficient, which is why Cosmos DB can be much smarter about this, and that’s where you come in. Because your job in all of this – and really your only job – is to define a partition key for the container.

Selecting a Partition Key

The single most important thing you need to do when you create a container is to define its partition key. This could be any property in your data, but choosing the best property will scale your data for storage and throughput in the most efficient way. And conversely, some properties will be a poor choice, and will thus impede your ability to scale. So even though you don’t need to handle partitioning yourself, understanding how Cosmos DB internally partitions your data, combined with an understanding of how your data will be most typically accessed, will help you make the right choice.

Partition key values are hashed

For each item you write to the container, Cosmos DB calculates a hash on the property value that you’ve designated as the partition key, and it uses that hash to determine which physical partition the item should be stored in. This means that all the items in your container with the same value for the property you’ve chosen for the partition key will always be stored physically together in the same partition, guaranteed. They will never, ever, be spread across multiple partitions.

Physical partitions host multiple logical partitions

However, this certainly does not mean that Cosmos DB creates one partition for each unique partition key value. Your container can grow elastically, but the partitions themselves have a fixed storage capacity. So one physical partition per partition key (i.e., logical partition) is far from ideal, because you’d potentially wind up having Cosmos DB create far more partitions than are actually necessary, each with a large amount of free space that will never be used.

But as I said, Cosmos DB is much smarter than that, because it also uses a range algorithm to co-locate multiple partition keys within a single partition. This is totally transparent to you however – your data gets partitioned by the partition key that you choose, but then which physical partition is used to store an item, and what other partition keys (logical partitions) happen to be co-located on that same partition – well that’s all insignificant and irrelevant as far as we’re concerned.

Two primary considerations

So now you understand that all the items with the same partition key will always live together in the same partition, which makes it easier to figure out what property in your data might or might not be a good candidate to use for the partition key.

First, queries will be most efficient if they can be serviced by looking at one partition, as opposed to spreading the query to work across multiple partitions. That’s why, typically, you always want to supply the partition key value with any query you want to run, so that Cosmos DB can go directly to the partition where it knows all the data that could possibly be returned by your query lives. You can certainly override this, and tell Cosmos DB to query across all the partitions in your collection, but you would definitely want to pick a partition key where such a query would be the exception, rather than the general rule, which would be that most typical queries in your application (say 80% or more) could be scoped to a single partition key value.

The same is true of updates, because – as I’ll blog about soon – you can write server-side stored procedures that update multiple documents in a single transaction, so that all the updates succeed or fail together as a whole, and this is possible only across multiple documents that have the same partition key value, which is another thing to think about when defining the partition key for a container.

You also want to choose a value that won’t introduce bottlenecks for storage or throughput. Something that can be somewhat uniformly distributed across partitions, so that storage and throughput can be somewhat evenly spread within the container.

Let’s dig a little deeper with a few different scenarios. Here I’ve got a container that uses city as the partition key:

Like I’ve been explaining, all the items for any city are stored together within the same partition, and each partition is co-locating multiple partition keys. It’s not a perfect distribution – and it never will be – but at least at this point, it seems we can somewhat evenly distribute our data within the container using city as the partition key. Furthermore, any query within a given city can be serviced by a single partition, and multiple items in the same city can be updated in a single transaction using a stored procedure.

Now, what happens when we need to grow this data? Say we start getting more data for Chicago, but can already see the partition hosting Chicago is starting to run low on headroom. So when you start adding more Chicago data, Cosmos DB will at some point – automatically and transparently – split the partition. That is, it will create a new partition, and move roughly half the data from the Chicago partition into the new partition. In this case, Berlin got moved out as well as Chicago, and now there is sufficient room for Chicago to continue growing:

But is city still the best choice to use here? If you have a few cities that are significantly larger than others, or that are queried significantly more often than others, then perhaps a finer granularity might be better, like zip code, for example. That will yield many more distinct values than city, and ideally, you want a partition key with many distinct values – like hundreds or thousands, at a minimum.

Choosing the Right Partition Key

Ultimately, only you will be able to make the best choice, because the best choice will be driven by the typical data access patterns in your particular application. I have already explained that the choice you make determines if a query can be serviced by a single partition – which is preferred, and which should be the most common case – or if multiple partitions need to be looked at. And it also determines which items can be updated together in a single transaction, using a stored procedure – and which cannot.

So with that in mind, let’s see what would make a good partition key for the different kinds of applications you might be building.

User Profile Data

Typical social networking applications store user profile data, and very often the user ID is a good partition key choice for this type of application. It means that you can query across all the posts of a given user, and Cosmos DB can service the query by looking at just the one partition where all of that user’s posts are stored. And, for example, you could write a stored procedure that updates the location property of multiple posts belonging to any user, and those updates would occur inside of a transaction – they would all commit, or they would all rollback, together.

IOT

IoT applications deal with the so-called Internet of Things… where you’re storing telemetry data from all sorts of devices – it could be refrigerators, or automobiles – whatever. The device ID is typically used for IoT apps, so for example, if you’re storing information about different cars, you would go with the VIN number, the vehicle identification number that uniquely identifies each distinct automobile. Queries against any particular car, or device, can then be serviced by a single partition, and multiple items for the same car or device can then be updated together in a transaction by writing a stored procedure.

Multitenant

In a multi-tenant architecture, you provide storage for different customers, or tenants. Each tenant needs to be isolated, so the tenant ID seems like a natural candidate to choose for the partition key. Because then each tenant can run their own queries against a single partition, and transactionally update their own data in that partition using a stored procedure, completely isolated from other tenants.

Writes distributed

You also need to think about how your application writes to the container, because you want to make sure that they get spread somewhat uniformly across all the partition within the container. Say you’re building a social networking app. I mentioned that user ID might be a good choice, but what about creation date? Well, creation date is a bad choice – for any type of application, really – because of the way the application is going to write data.

So from a storage perspective, we do get nice uniformity by using the creation date for the partition key, and, as time marches on, Cosmos DB will automatically add new partitions for new data, as you’ve already seen:

But the problem is that – when the application writes new data, the writes will always be directed to the same partition, based on whatever day it is. This results in what’s called a hot partition, where we have a bottleneck that’s going to quickly consume a great deal more of the reserved throughput you’ve provisioned for the container. Specifically, Cosmos DB evenly distributes your provisioned throughput across all the physical partitions in the container. So in this case where there are four partitions, if you are provisioning (and thus paying for) 4,000 RU/sec (request units per second), you’ll only get the performance of about 1,000 RU/sec:

That’s why user ID is a much better choice. Because then, writes get directed to different partitions, depending on the user. So throughout the day, as user profile data gets written to the container, those writes are being much more evenly distributed, with user ID as the partition key.

Multiple containers

And then you need to consider that, in some scenarios, you can’t settle on one good partition key for a single container. In these cases, you’ll create multiple containers in the database, each with different partition keys, and then store different data in each container, based on usage.

The multi-tenant scenario makes a perfect example:

We’re partitioning by tenant ID, but clearly tenant number 10 is much bigger than all the other tenants. That could be because all the other tenants are, let’s say, small mom-and-pop shops, while tenant number 10 is some huge Fortune 500 company like Toyota. Compared to tenant number 10, all the other tenants have modest storage and throughput requirements.

This too creates a hot partition. First, look at storage. Partitions have a fixed size limit, which happens to be 10 gigabytes today, though that will likely change to 100 gigabytes in the future. But that limit is almost irrelevant, because independent of that, there is very uneven distribution of throughput. Tenant number 10 is much busier than the other tenants, so most writes are going to that one partition. And this is to the detriment of all the other tenants, because the reserved throughput is provisioned for the entire container. Furthermore, given the significantly larger volume of data for tenant number 10, it’s likely that large tenant will benefit from partitioning on some fine-grained property within their own data, and not just on the entire tenant ID, which is the partition key for the container.

And that’s a perfect example of when you’ll create a second container for your database. In this case, we can dedicate the entire second container to tenant number 10, and – let’s say it’s Toyota – that container can be partitioned by VIN number:

As a result, we have good uniformity across the entire database. One container handles small-sized tenants. This container is partitioned by tenant ID, and might be provisioned for relatively low throughput that gets shared by all the tenants in the container. And the other individual container handles the large tenant, Toyota. This container is partitioned by VIN number, and might even reserve more throughput for all of Toyota than the first container reserves for all smaller clients combined.

Learning Azure Cosmos DB

Hey everyone!

I’m extremely delighted to announce the launch of my brand new Pluralsight course on Azure Cosmos DB.

This is a 10-module course that targets developers – and that’s you!

The course should be live on Pluralsight in the next few days. If you’re interested in building next-generation global apps for the web and mobile, just join me for about 6 hours, and get up to speed in a heartbeat!

Here’s what the course delivers:

Part 1: Introduction and Overview

We begin by defining NoSQL, explaining big data, and describing the characteristics of a NoSQL Database. Then we introduce Cosmos DB with a high-level discussion around global distribution, server-side partitioning, and its multi-model capabilities (with support for documents, tables, and graphs). We also introduce the local emulator, which lets you develop for Cosmos DB without an internet connection or Azure subscription. This module has several demos showing how to create an Azure Cosmos DB account, how to download and install the local emulator, and how to use the SQL API to create a collection, populate it with documents, and query for documents using Cosmos DB SQL.

Part 2: Tuning for Throughput and Performance

This module explains how to provision throughput in Cosmos DB. We begin by learning about request units, or RUs. With RUs, you don’t worry about hardware concerns, like CPU, memory, or I/O. Instead, Cosmos DB delivers predictable performance by letting you reserve as many RUs per second that your application needs, and will throttle requests if you start to exceed to throughput limits that you reserve. The portal has many customizable charts and graphs that let you monitor the request unit consumption of your database, and you can whiteboard the cost to estimate your throughput needs using the online request unit calculator. The module concludes with an explanation of how Cosmos DB pricing is divided between storage and throughput.

Part 3: Horizontal Partitioning

This module discusses how Cosmos DB achieves elastic scale via server-side horizontal partitioning. We explain how containers are logical resources that Cosmos DB manages by creating and replicating multiple physical partitions to scale both storage and throughput. For best results, we discuss the considerations to keep in mind when choosing a partition key, which groups related items together for queries, and supports updating multiple items in a transaction using stored procedures. The module concludes by showing how to enable cross-partition queries, where Cosmos DB automatically fans out the query to multiple partitions, and aggregates each partition’s results into a single result for the query.

Part 4: Globally Distributing Data

In this module, we explore the geo-replication capabilities in Cosmos DB, also called turnkey global distribution. In addition to local replication within one Azure data center, Cosmos DB can replicate your database to any number of other Azure data centers in regions located throughout the world. This brings your data closer to your users, giving them low latency and high availability wherever they are located. You will see how easy it is to enable global distribution, and configure your application with a failover sequence of preferred data. We wrap up the module by discussing consistency, and explaining the five consistency levels that let you balance latency and availability with stale reads that occur when querying replicas that are not up to date.

Part 5: Data Modeling for the SQL API

Using the SQL API, you model items in the database as JSON documents. This module starts by calling out some of the key differences in data modeling between a traditional relational database platform (like SQL Server) and a JSON document database (like Cosmos DB with the SQL API). We then examine the special properties, including the resource ID and self-link properties that uniquely identify each resource, and the URI factory which simplifies the task of constructing the self-link to any resource in the database. The module concludes by showing how the Data Migration Tool can be used to transform and import data from SQL Server to Cosmos DB.

Part 6: Querying Documents

This module explores the special version of SQL used by Cosmos DB for querying JSON documents in a collection
with the SQL API. We explain how the query language is rooted in JSON and JavaScript semantics, and then
dive into numerous demos that show how to filter, sort, iterate arrays, and perform intra-document joins,
using the many available operators and built-in math, type checking, string, array, aggregate, and spatial
functions provided by Cosmos DB SQL.

Part 7: Programming with the .NET SDK

This module shows how to use the .NET SDK with the SQL API to build client applications for Cosmos DB. We cover how to use the SDK for working with databases and collections, creating and querying documents, indexing, and resource tokens based on users and permissions.

Part 8: Programming the Cosmos DB Server

This module teaches you how to write server-side code that runs within the Cosmos DB service, including stored procedures, triggers, and user-defined functions. We also show you how to cope with throttled requests, and how to implement a continuation model for long-running processes.

Part 9: Using the Table API

This module explains the Table API in Cosmos DB, which provides 100% compatibility with the original Azure Table Storage NoSQL service, but adds predictable reserved throughput, and automatic indexing. We discuss the key-value data model used by the Table API, explain when it makes sense to use this API and when it doesn’t, and show how to migrate an existing Azure Table Storage account and application to Cosmos DB and the Table API.

Part 10: Using the Gremlin API

This module explains the Gremlin API, which provides a graph data model over Cosmos DB. This API is based on the Apache TinkerPop open source project, and defines a set of steps that can be used to populate a graph with vertices (entities) and edges (relationships). After populating the graph, you can run Gremlin queries that traverse the many relationships defined in the database. We’ll create several demos, including a simple graph, airport model, and a multi-model comic book catalog.

I hope this has whet your appetite to learn more about Cosmos DB!