Data Modeling and Partitioning Patterns in Azure Cosmos DB

Introduction

Many of us are familiar with relational databases like SQL Server and Oracle. But Azure Cosmos DB is a NoSQL (non-relational) database – which is very different, and there are new ways to think about data modeling. In this post, we’ll use a familiar real-world relational data model and refactor it as a non-relational data model for Azure Cosmos DB.

First, there are many ways to describe Azure Cosmos DB, but for our purposes, we can define it as having two primary characteristics: it is horizontally scalable, and it is non-relational.

Horizontally scalable

In Azure Cosmos DB, you store data in a single logical container. But behind the container, Azure Cosmos DB manages a cluster of servers, and distributes the workload across multiple physical machines. This is transparent to us – we never worry about these back-end servers – we just work with the one container. Azure Cosmos DB, meanwhile, automatically maintains the cluster, and dynamically adds more and more servers as needed, to accommodate your growth. And this is the essence of horizontal scale.

Diagram of a database container distributing data across many machines, enabling horizontal scale

Each server has its own disk storage and CPU, just like any machine. And that means that – effectively – you get unlimited storage and unlimited throughput. There’s no practical limit to the number of servers in the cluster, meaning no limit on disk space or processing power for your Cosmos DB container.

Non-relational

Think about how things work in the relational world, where we store data in rows. We focus on how those rows get joined along primary and foreign keys, and we rely on the database to enforce constraints on those keys.

But in the non-relational world, we store data in documents – that is JSON documents. Now there’s certainly nothing you can store in a row that you can’t store in a JSON document, so you could absolutely design a highly normalized data model with documents that reference other documents on some key, like so:

multiple documents in a row with arrows connecting them

Unfortunately, this results in a very inefficient design for Azure Cosmos DB. Why? Because again, Azure Cosmos DB is horizontally scalable, where documents that you write to a container are very likely to be stored across multiple physical servers behind the scenes:

A database container with multiple documents and servers underneath it

Although it is technically possible to enforce relational constraints across a cluster of servers, doing so would have an enormous negative impact on performance. And well, “speed and performance” is the name of the game in Azure Cosmos DB, with comprehensive SLAs on availability, throughput, latency, and consistency. Reads and writes have extremely low, single-digit millisecond latency – meaning that they complete within 9 milliseconds or less in most cases. So, in order to deliver on these performance guarantees, Azure Cosmos DB simply doesn’t support the concept of joins, and can’t enforce relational constraints across documents.

Suitable for relational workloads?

With no joins and no relational constraints, the obvious question becomes: “Is Azure Cosmos DB suitable for relational workloads?” And the answer is, yes, of course it is. Otherwise, the post would end right here.

And when you think about it, most real-world use cases are relational. But because Azure Cosmos DB is horizontally scalable and non-relational, we need to use different techniques to materialize relationships between entities. And this means a whole new approach to designing your data model. In some cases, the new methods are radically different, and run contrary to best practices that some of us have been living by for decades.

Fortunately, while it is very different, it’s not really very difficult. It’s just that, again, you need to think about things differently when designing your data model for Azure Cosmos DB.

WebStore relational model

Our sample database is for an e-commerce web site that we’re calling WebStore. Here is the relational data model for WebStore in SQL Server:

Example of a relational database model in SQL Server

This data model is relatively small, but still representative of a typical production model. It has one-to-many relationships, like the ones from Customer to CustomerAddress and SalesOrder. There is also a one-to-one relationship from Customer to CustomerPassword, and the ProductTags table implements a many-to-many relationship between Product and ProductTag.

Container per table?

It’s very natural at first to think of a container in Azure Cosmos DB like a table. Your first instinct may be to say, OK, we have nine tables in our data model, let’s create nine containers in Azure Cosmos DB.

Now you can certainly do this, but again, this would be the worst possible design, being horizontally scalable and non-relational. Azure Cosmos DB will expose no way to join documents, or to enforce relational constraints between them. Therefore, this approach will not only perform poorly, but it will be very difficult to maintain and program against.

What’s the answer? Let’s get there one step at a time.

Embed vs. reference

Let’s start with customers and their related entities. JSON is hierarchical, so we don’t need separate documents for every type. So now think about the distinction between one-to-many and one-to-few. It’s reasonable to impose an upper limit on the number of addresses a customer can have, and there’s only one password per customer, so we could combine all of those into a single document:

Example data document with a user's information including ID, title, first name, and more

This has immediately solved the problem of joining between customers and their addresses, and passwords, because that’s all “pre-joined” by embedding the one-to-few relationship for addresses as a nested array, and the one-to-one relationship for the password as an embedded object. Simply by embedding, we’ve reduced three relational tables to a single customer document.

On the other hand, we would certainly not want an upper limit on the number of sales orders per customer – ideally, that is unbounded (while the maximum document size is 2 MB). The orders will be stored in separate documents, referenced by customer ID.

The rules for when to embed and when to reference are simple, as we’re demonstrating. One-to-few and one-to-one relationships often benefit from embedding, while one-to-many (particularly unbounded) and many-to-many relationships require that you reference. Embedding is also useful when all the entities are typically queried and/or updated together (for example, a customer profile with addresses and password), while it’s usually better to separate entities that are most often queried and updated separately (such as individual sales orders).

The next – and arguably most important – step is to choose a partition key for the customer document. Making the right choice requires that you understand how partitioning works.

Understanding partitioning

When you create a container, you supply a partition key. This is some value in your documents that Azure Cosmos DB groups documents together by in logical partitions. Each server in the cluster is a physical partition that can host any number of logical partitions, each of which in turn stores any number of documents with the same partition key value. Again, we don’t think about the physical partitions, we’re concerned primarily with the logical partitions that are based on the partition key that we select.

All the documents in a logical partition will always be stored on the same physical partition; a logical partition will never be spread across multiple servers in the cluster. Ideally, therefore, you want to choose a partition key whose value will be known for most of your typical queries. When the partition key is known, then Azure Cosmos DB can route your query directly to the physical partition where it knows all the documents that can possibly satisfy the query are stored. This is called a single-partition query.

If the partition key is now known, then it’s a cross-partition query (also often called a fan-out query). In this case, Azure Cosmos DB needs to visit every physical partition and aggregate their results into a single resultset for the query. This is fine for occasional queries, but adds unacceptable overhead for common queries in a heavy workload.

You also want a partition key that results in a uniform distribution of both storage and throughput. A logical partition can’t exceed 20 GB, but regardless, you don’t want some logical partitions to be huge and others very small. And from a throughput perspective, you don’t want some logical partitions to be heavily accessed for reads and writes, and not others. These “hot partition” situations should always be avoided.

Choosing a partition key

With this understanding, we can select a partition key for the customer documents that we’ll store in a customer container. The question is always the same: “What’s the most common query?” For customers, we most often want to query a customer by its ID, like so:

SELECT * FROM c WHERE c.id = ‘<custId>’

In this case, we want to choose the id property itself as the partition key. This means you’ll get only one document in every logical partition, which is fine. It’s desirable to use a partition key that yields a large spectrum of distinct values. You may have thousands of logical partitions for thousands of customers, but with only one document each, you will achieve a highly uniform distribution across the physical partitions.

We’ll take a very different approach for product categories. Users visiting the website will typically want to view the complete list of product categories. Then, they’ll want to query for all the product that belong to a category that interests them, which is essentially a query on the product category container with no WHERE clause. The problem though, is that would be a cross-partition query, and we want to get all our category documents using a single-partition query.

The trick here is to add another property called type to each product category document, and set its value to “category” in every document. Then we can partition the product category container on the type property. This would store all the category documents in a single logical partition, and the following query could retrieve them as a single-partition query:

SELECT * FROM c WHERE c.type = ‘category’

This same is true of tags; users will typically want a full list of tags and then drill to view the products associated with interesting tags. This is a typical pattern for short lookup lists that are often retrieved all at once. So that would be another container for product tags, partitioned on a type property where all the documents have the same value “tag” in that property, and then queried with:

SELECT * FROM c WHERE c.type = ‘tag’

Many-to-many relationships

Now for the many-to-many relationship between products and tags. This can be modeled by embedding an array of IDs on one side or the other; we could either store a list of tag IDs in each product, or a list of product IDs in each tag. Since there will be fewer products per tag than tags per product, we’ll store tag IDs in each product document, like so:

Once the user chooses a category, the next typical query would be to retrieve all the products in a given category by category ID, like so:

Example of a document with item details including ID, category ID, description, price, and more

SELECT * FROM c WHERE c.categoryId = ‘<catId>’

To make this a single-partition query, we want to partition the product contain on the product category ID, and that will store all the products for the same category in the same logical partition.

Introduction denormalization

Now we have a new challenge, because product documents hold just the category ID and an array of tag IDs – it doesn’t have the category and tag names themselves. And we already know that Azure Cosmos DB won’t join related documents together for us. So, if we want to display the category and tag names on the web page – which we do – then we need to run additional queries to get that information. First, we need to query the product category container to get the category name, and then – for each product in the category – we need to query the product tag container to get all the tag names.

We definitely need to avoid this, and we’ll solve the problem using denormalization. And that just means that – unlike in a normalized data model – we will duplicate information as necessary in order to make it more readily available for queries that need it. That means that we’ll store a copy of the category name, and copies of the tag names, in each related product document:

Example of a document for an item that includes multiple attributes and embedded details in the Tags attribute

Now we have everything we need to display about a product self-contained inside a single product document. And that will work great, until of course, there’s a category name or tag name is changed. Because now we need a way to cascade that name change to all the related copies in order to keep our data consistent.

Denormalizing with the change feed

This is a perfect situation for the Azure Cosmos DB change feed, which is a persistent log of all changes made to any container. By subscribing to the change feed on the category and tag containers, we can respond to updates and then propagate the change out to all related product documents so that everything remains in sync.

This can be achieved with a minimal amount of code, and implemented out-of-band with the main application by deploying the change feed code to run as an Azure function:

image of three database containers

Any change to a category or tag name triggers and Azure function to update all related product documents. This lets us maintain a denormalized model that’s optimized to retrieve all relevant information about a product with one single-partition query.

Combining different types

The last part of our schema are the customer orders and order details. First, we’ll embed the details into each order as a single document for the sales order container, because that’s another one-to-few relationship between entities that are typically retrieved and updated together.

It will be very common for customers to retrieve their orders using the following query:

SELECT * FROM c WHERE c.customerId = ‘<custId>’

That makes the customer ID the best choice for the partition key. But before we jump to create another container for sales orders, remember that we’re also partitioning customers on the customer ID in the customer container. And unlike a relational database where tables have defined schemas, Azure Cosmos DB lets you mix different types of documents in the same container. And it makes sense to do that when those different types share the same partition key.

We’ll combine customer and sales order documents in the same customer container, which will require just a minor tweak to the customer document. We’ll need to add a customerId property to hold a copy of the customer ID in the id property. Then we can partition on customerId which will be present in both document types:

A database container called Customer with two documents showing customer details

Notice that we’ve also added a type property to distinguish between the two types of documents. So now, there is still only one customer document per logical partition, but each logical partition also includes the related orders for that customer. And this kind of gets us our joins back, because now we can retrieve a customer and all their related orders with the following single-partition query:

SELECT * FROM c WHERE c.id = ‘<custId>’

Denormalizing with a stored procedure

Let’s wrap up with one more query to retrieve our top customers; essentially, a list of customers sorted descending by order count. In the relational world, we would just run a SELECT COUNT(*) on the Order table with a GROUP BY on the customer ID, and then sort descending on that count.

But in Azure Cosmos DB, the answer is once again to denormalize. We’ll just add a salesOrderCount property to each customer document. Then our query becomes as simple as:

SELECT * FROM c WHERE c.type = ‘customer’ ORDER BY c.salesOrderCount DESC

Of course, we need to keep that salesOrderCount property in sync; each time we create a new sales order document, we also need to increment the salesOrderCount property in the related customer document. We could use the change feed like before, but stored procedures are a better choice when your updates are contained to a single logical partition.

In this case, the new sales order document is being written to the same logical partition as the related customer document. We can write a stored procedure in JavaScript that runs within the Azure Cosmos DB service which creates the new sales order document and updates the customer document with the incremented sales order count.

The big advantage here is that stored procedures in Azure Cosmos DB run as a transaction that succeeds or fails as a whole. Both write operations will need to complete successfully or they both roll back. This guarantees consistency between the salesOrderCount property in the customer document and the true number of related sales order documents in the same logical partition.

One last thing to mention is that this is a cross-partition query, unlike of our previous examples, which were all single-partition queries. Remember again that cross-partition queries aren’t necessarily evil, as long as they aren’t very common. In our case, this last query won’t run routinely on the website; it’s more like a “back office” query that an executive runs every now and again to find the top customers.

Summary

This post has walked you through the steps to refactor a relational data model as non-relational for Azure Cosmos DB. We collapsed multiple entities by embedding, and we support denormalization through the use of the change feed and stored procedures.

We also combined customer and sales order documents in the same container, because they are both partitioned on the same value (customer ID). To wrap up the design, we can also combine the product category and product tag documents in a single “product metadata” container, since they are both partitioned on the same type property.

That brings us to our final design:

three database containers: customer, product, and product metadata

Using a combination of non-relational modeling techniques, we’ve reduced nine tables into just three containers, where majority of queries run by the application are all scoped to a single logical partition.

 

 

Multiple Partition Keys in Azure Cosmos DB (Part 3) – Azure Functions with Change Feed Trigger

Welcome to part 3 in this three-part series on using the Azure Cosmos DB change feed to implement a synchronized solution between multiple containers that store the same documents, each with a different partition key.

Start with part 1 for an overview of the problem we’re trying to solve. In a nutshell, we’re using the change feed to synchronize two containers, so that changes made to one container are replicated to the other. Thus the two containers have the same data in them, but each one is partitioned by a different partition key to best suit one of two common data access patterns. Part 1 presented a solution that queries the change feed directly. In part 2, I showed you how to use the Change Feed Processor (CFP) library instead, which provides numerous advantages over the low-level approach in part 1.

This post concludes the three-part series, and shows how to deploy the solution to execute in a serverless cloud environment using Azure Functions.

Building the Azure Functions Project

As you learned in part 2, the benefits of using the CFP library are clear. But we still need a way to deploy our host to run in Azure. We could certainly refactor the CfpLibraryHost console application as a web job. But the simplest way to achieve this is by using Azure Functions with a Cosmos DB trigger.

If you’re not already familiar with Azure Functions, they let you write individual methods (functions), which you deploy for execution in a serverless environment hosted on Azure. The term “serverless” here means without also having to create an Azure app service to deploy your host.

In the current ChangeFeedDemos solution, create a new Azure Functions project and name it CfpLibraryFunction. Visual Studio will offer up a selection of “trigger” templates to choose from, including the Cosmos DB Trigger that we’ll be using. However, we’ll just choose the Empty template and configure the project manually:

Next, add the Cosmos DB WebJob Extensions to the new project from the NuGet package Microsoft.Azure.WebJobs.Extensions.CosmosDB:

Now create a new class named MultiPkCollectionFunction in the CfpLibraryFunction project with the following code:

using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;

namespace CosmosDbDemos.MultiPkCollection.ChangeFeedFunction
{
  public static class MultiPkFunction
  {
    private const string CosmosEndpoint = "https://<account-name>.documents.azure.com:443/";
    private const string CosmosMasterKey = "<account-key>";

    private static readonly DocumentClient _client;

    static MultiPkFunction()
    {
      _client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey);
    }

    [FunctionName("MultiPkCollectionFunction")]
    public static void Run(
      [CosmosDBTrigger(
        databaseName: "multipk",
        collectionName: "byCity",
        ConnectionStringSetting = "CosmosDbConnectionString",
        LeaseCollectionName = "lease")]
      IReadOnlyList<Document> documents,
      ILogger log)
    {
      foreach (var document in documents)
      {
        var jDocument = JsonConvert.DeserializeObject<JObject>(document.ToString());

        if (jDocument["ttl"] == null)
        {
          var stateCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byState");
          _client.UpsertDocumentAsync(stateCollUri, document).Wait();
          log.LogInformation($"Upserted document id {document.Id} in byState collection");
        }
        else
        {
          var stateDocUri = UriFactory.CreateDocumentUri("multipk", "byState", document.Id);
          _client.DeleteDocumentAsync(stateDocUri, new RequestOptions
          {
            PartitionKey = new PartitionKey(jDocument["state"].Value<string>())
          }).Wait();
          log.LogInformation($"Deleted document id {document.Id} in byState collection");
        }
      }
    }
  }
}

Note that this is all the code we need. While the CFP library host project required less code than querying the change feed directly, it still required additional code to build and host the processor – plus you need a full Azure app service to deploy in the cloud.

Using an Azure Function, it’s super lean. We have the same minimal business logic as before, and still leverage all the CFP library goodness as before, but being “serverless,” we’re ready to deploy this as a standalone function in the cloud, without creating a full Azure app service. Plus, you can test the Azure Function locally using Visual Studio and the debugger, so that you can deploy to the cloud with confidence. Folks, it just doesn’t get much better than this!

The CosmosDBTrigger binding in the signature of the Run method causes this code to fire on every notification that get raised out of the CFP library that’s watching the byCity container for changes. The binding tells the trigger to monitor the byCity collection in the multipk database, and to persist leases to the container named lease. It also references a connection string setting named CosmosDbConnectionString to obtain the necessary endpoint and master key, which we’ll configure in a moment.

The actual implementation of the Run method is virtually identical to the ProcessChangesAsync method in our CFP library host project from part 2. The only difference is that we log output using the ILogger instance passed into our Azure Function, rather than using Console.WriteLine to log output in our CFP library host project.

Setting the connection string

Next we need to set the connection string for the trigger that we are referencing in the signature of the Run method. Azure Functions support configuration files similar to the way appsettings.json, app.config, or web.config is used for typical .NET applications. For Azure Functions, this file is named local.settings.json. Open this file and add the CosmosDbConnectionString setting that the trigger will use to monitor the byCity collection for changes:

{
  "IsEncrypted": false,
  "Values": {
    "CosmosDbConnectionString": "AccountEndpoint=https://<account-name>.documents.azure.com:443/;AccountKey=<account-key>;",
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet"
  }
} 

Testing locally

It’s very easy to test and debug our code locally, before deploying it to Azure.

Let’s see it work by running the two projects ChangeFeedDirect and CfpLibraryFunction at the same time. The ChangeFeedDirect console app provides an interactive menu for creating the database with the byCity, byState, and lease containers, and also for inserting, updating, and deleting documents in byCity container. The CfpLibraryFunction app, meanwhile, sits and waits to be notified of changes as they occur in the byCity container, so that they can be synchronized to the byState container.

Start the first console app (ChangeFeedDirect) and run the DB command to drop and recreate the database with empty byCity and byState containers. Then run CL to create the lease container.

Without closing the first console app, start the second one (CfpLibraryFunction). To do this, right-click the CfpLibraryFunction project in Solution Explorer and choose Debug, Start New Instance. You will see a console app open up which hosts the local webjob for our Azure function:

Wait a few moments until the local Azure Functions host indicates that the application has started:

Back in the first console app (ChangeFeedDirect), run the CD command to create three documents in the byCity container. Then run the UD command to update a document, and the DD command to delete another document (by setting its ttl property). Do not run SC to manually sync with the direct change feed queries from part 1. Instead, sit back and watch it happen automatically with the CFP library under control of the local Azure Functions webjob host:

Before we deploy to Azure, let re-initialize the database. Back in the first console app (ChangeFeedDirect), run the DB command to drop and recreate the database with empty byCity and byState containers. Then run CL to create the lease container.

Deploying to Azure

Satisfied that our code is working properly, we can now deploy the Azure function to run in the cloud.

Right-click the CfpLibraryFunction project and choose Publish.

Select Create New and then click Publish.

Assign a name for the deployed function, and select the desired subscription, resource group, hosting plan, and storage account (or just accept the defaults):

Now click Create and wait for the deployment to complete.

Over in the Azure portal, navigate to All Resources, filter on types for App Services, and locate your new Azure Functions service:

Click on the App Service, and then click Configuration:

Next, click New Application Setting:

This is the cloud equivalent of the local.settings.json file, where you need to plug in the same CosmosDbConnection string setting that you provided earlier when testing locally:

Now click Update, Save, and close the application settings blade.

You’re all done! To watch it work, click on MultiPkCollectionFunction, scroll down, and expand the Logs panel:

Back in the ChangeFeedDirect console app, run the CD command to create three documents in the byCity container. Then run the UD command to update a document, and the DD command to delete another document (by setting its ttl property). Do not run SC to manually sync with the direct change feed queries from part 1. Instead, sit back and watch it happen automatically with the CFP library running in our Azure function. Within a few moments, the Azure function trigger fires, and our code synchronizes them to the byState container:

What’s Next?

A few more pointers before concluding:

Conclusion

This concludes the three-post series on using the Cosmos DB change feed to synchronize two containers with different partition keys over the same data. Thanks for reading, and happy coding!

Multiple Partition Keys in Azure Cosmos DB (Part 2) – Using the Change Feed Processor Library

Welcome to part 2 in this series of blog posts on using the Azure Cosmos DB change feed to implement a synchronized solution between multiple containers that store the same documents, each with a different partition key.

Start with part 1 for an overview of the problem we’re trying to solve. In a nutshell, we’re using the change feed to synchronize two containers, so that changes made to one container are replicated to the other. Thus the two containers have the same data in them, but each one is partitioned by a different partition key to best suit one of two common data access patterns. Part 1 presented a solution that queries the change feed directly. In this post, we’ll use the Change Feed Processor (CFP) library instead, which provides numerous advantages over the low-level approach we took in part 1.

What is the CFP Library?

For the solution we built in part 1 to consume the change feed at scale, much more work needs to be done. For example, the change feed on each partition key range of the container can be consumed concurrently, so we could add multithreading logic to parallelize those queries. Long change feeds can also be consumed in chunks, using continuation tokens that we could persist as a “lease,” so that new clients can resume consumption where previous clients left off. We also want the sync automated, so that we don’t need to poll manually.

Fortunately, the Change Feed Processor (CFP) library handles all these details for you. In most cases, unless you have very custom requirements, the CFP library is the way to go over directly querying the change feed yourself.

The CFP library automatically discovers the container’s partition key ranges and parallelizes change feed consumption across each of them internally. This means that you only consume one logical change feed for the entire container, and don’t worry at all about the individual change feeds behind the individual partition key ranges. The library relies on a dedicated “lease” container to persist lease documents that maintain state for each partition key range. Each lease represents a checkpoint in time which tracks continuation tokens for chunking long change feeds across multiple clients. Thus, as you increase the number of clients, the overall change feed processing gets divided across them evenly.

Using the lease container also means we won’t need to rely on our own “sync” document that we were using to track the last time we queried the change feed directly. Therefore, our code will be simpler than the previous solution from part 1, yet we will simultaneously inherit all the aforementioned capabilities that the CFP library provides.

Building on the ChangeFeedDemo solution we created in part 1, open the Program.cs file in the ChangeFeedDirect project. In the Run method, add a line to display a menu item for creating the lease container:

Console.WriteLine("CL - Create lease collection"); 

Further down in the method, add an “else if” condition for the new menu item that calls CreateLeaseCollection:

else if (input == "CL") await CreateLeaseCollection();

Now add the CreateLeaseCollection method:

private static async Task CreateLeaseCollection()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    var dbUri = UriFactory.CreateDatabaseUri("multipk");

    var partitionKeyDefinition = new PartitionKeyDefinition();
    partitionKeyDefinition.Paths.Add("/id");
    await client.CreateDocumentCollectionAsync(dbUri, new DocumentCollection
    {
      Id = "lease",
      PartitionKey = partitionKeyDefinition
    }, new RequestOptions { OfferThroughput = 400 });
  }

  Console.WriteLine("Created lease collection");
} 

This code creates a container named lease provisioned for 400 RUs per second. Also notice that the lease container uses the id property itself as its partition key.

Next, create the host project, which is essentially the client. When an instance of this host runs, it will listen for and consume changes across the entire container. Then, as you spin up additional instances of the host, the work to consume changes across the entire container will be distributed uniformly across all the host instances by the CFP library.

In the current ChangeFeedDemos solution, create a new .NET Core console application and name the project CfpLibraryHost. Next, add the CFP library to the new project from the NuGet package Microsoft.Azure.DocumentDB.ChangeFeedProcessor:

Replace all the code in Program.cs as follows:

using System;
using System.Threading.Tasks;

namespace CfpLibraryHost
{
  class Program
  {
    static void Main(string[] args)
    {
      var host = new ChangeFeedProcessorHost();
      Task.Run(host.RunProcessor).Wait();

      Console.ReadKey();
    }
  }
} 

This startup code initializes and runs the host. It then enters a wait state during which the host will be notified automatically by the CFP library whenever changes are available for consumption.

Next, to implement the host, create a new class named ChangeFeedProcessorHost with the following code:

using Microsoft.Azure.Documents.ChangeFeedProcessor;
using System;
using System.Threading.Tasks;

namespace CfpLibraryHost
{
  public class ChangeFeedProcessorHost
  {
    public async Task RunProcessor()
    {
      var monitoredCollection = new DocumentCollectionInfo
      {
        Uri = new Uri(Constants.CosmosEndpoint),
        MasterKey = Constants.CosmosMasterKey,
        DatabaseName = "multipk",
        CollectionName = "byCity",
      };

      var leaseCollection = new DocumentCollectionInfo
      {
        Uri = new Uri(Constants.CosmosEndpoint),
        MasterKey = Constants.CosmosMasterKey,
        DatabaseName = "multipk",
        CollectionName = "lease",
      };

      var builder = new ChangeFeedProcessorBuilder();

      builder
        .WithHostName($"Change Feed Processor Host - {Guid.NewGuid()}")
        .WithFeedCollection(monitoredCollection)
        .WithLeaseCollection(leaseCollection)
        .WithObserverFactory(new MultiPkCollectionObserverFactory());

      var processor = await builder.BuildAsync();
      await processor.StartAsync();

      Console.WriteLine("Started change feed processor - press any key to stop");
      Console.ReadKey();

      await processor.StopAsync();
    }
  }

} 

The host builds a processor configured to monitor the byCity container, using the lease container to track internal state. The processor also references an observer factory which, in turn, supplies an observer to consume the changes as they happen.

This code relies on defined constants for the URI and master key of the Cosmos DB account. Define them as follows in a new class called Constants.cs:

namespace CfpLibraryHost
{
  public class Constants
  {
    public const string CosmosEndpoint = "https://<account-name>.documents.azure.com:443/";
    public const string CosmosMasterKey = "<account-key>";
  }
} 

The observer factory is super-simple. Just create a class that implements IChangeFeedObserverFactory, and return an observer instance (where the real logic will go) that implements IChangeFeedObserver. Name the class MultiPkCollectionObserverFactory, as referenced by WithObserverFactory in the host.

using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;

namespace CfpLibraryHost
{
  public class MultiPkCollectionObserverFactory : IChangeFeedObserverFactory
  {
    public IChangeFeedObserver CreateObserver() =>
      new MultiPkCollectionObserver();
  }
} 

Finally, create the observer class itself. Name this class MultiPkCollectionObserver, as referenced by the CreateObserver method in the factory class:

using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
using Microsoft.Azure.Documents.Client;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace CfpLibraryHost
{
  public class MultiPkCollectionObserver : IChangeFeedObserver
  {
    private static DocumentClient _client;

    static MultiPkCollectionObserver() =>
      _client = new DocumentClient(new Uri(Constants.CosmosEndpoint), Constants.CosmosMasterKey);

    public async Task OpenAsync(IChangeFeedObserverContext context)
    {
      Console.WriteLine($"Start observing partition key range {context.PartitionKeyRangeId}");
    }

    public async Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
    {
      Console.WriteLine($"Stop observing partition key range {context.PartitionKeyRangeId} because {reason}");
    }

    public async Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> documents, CancellationToken cancellationToken)
    {
      foreach (var document in documents)
      {
        var jDocument = JsonConvert.DeserializeObject<JObject>(document.ToString());

        if (jDocument["ttl"] == null)
        {
          var stateCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byState");
          await _client.UpsertDocumentAsync(stateCollUri, document);
          Console.WriteLine($"Upserted document id {document.Id} in byState collection");
        }
        else
        {
          var stateDocUri = UriFactory.CreateDocumentUri("multipk", "byState", document.Id);
          await _client.DeleteDocumentAsync(stateDocUri, new RequestOptions
          {
            PartitionKey = new PartitionKey(jDocument["state"].Value<string>())
          });
          Console.WriteLine($"Deleted document id {document.Id} in byState collection");
        }
      }
    }
  }
} 

Our synchronization logic is embedded inside the ProcessChangesAsync method. The benefits of using the CFP library should really stand out now, compared to our solution from part 1. This code is significantly simpler yet dramatically more robust than the SyncCollections method we wrote in the ChangeFeedDirect project. We are no longer concerned with discovering and iterating partition key ranges, nor do we need to track the last sync time ourselves. Instead, the CFP library handles those details, plus it parallelizes the work and uniformly distributes large-scale processing across multiple client instances. Our code is now limited to just the business logic we require, which is the same logic as before: Upsert new or changed documents into the byState container as they occur in the byCity container, or delete them if their TTL property is set.

Let’s see it work. We’ll want to run both projects at the same time. The ChangeFeedDirect console app provides an interactive menu for creating the database with the byCity, byState, and lease containers, and also for inserting, updating, and deleting documents in byCity container. The CfpLibraryHost app, meanwhile, sits and waits to be notified of changes as they occur in the byCity container, so that they can be synchronized to the byState container.

Start the first console app (ChangeFeedDirect) and run the DB command to drop and recreate the database with empty byCity and byState containers. Then run CL to create the lease container.

Without closing the first console app, start the second one (CfpLibraryHost). To do this, right-click the CfpLibraryHost project in Solution Explorer and choose Debug, Start New Instance. The host fires up, and now just sits and waits for changes from the CFP library:

Started change feed processor - press any key to stop
Start observing partition key range 0

Back in the first console app (ChangeFeedDirect), run the CD command to create three documents in the byCity container. Then run the UD command to update a document, and the DD command to delete another document (by setting its ttl property). Do not run SC to manually sync with the direct change feed queries from part 1. Instead, sit back and watch it happen automatically with the CFP library. Within a few moments, the host is notified of the changes, and our observer class synchronizes them to the byState container:

Started change feed processor - press any key to stop
Start observing partition key range 0
Upserted document id 63135c76-d5c6-44a8-acd3-67dfc54faae6 in byState collection
Upserted document id 06296937-de8b-4d72-805d-f3120bf06403 in byState collection
Upserted document id 0b222279-06df-487e-b746-7cb347acdf18 in byState collection
Upserted document id 63135c76-d5c6-44a8-acd3-67dfc54faae6 in byState collection
Deleted document id 0b222279-06df-487e-b746-7cb347acdf18 in byState collection 

The beauty here is that the client does not need to manually poll the change feed for updates. And we can just spin up as many clients as needed to scale out the processing for very large containers with many changes.

What’s Next?

The benefits of using the CFP library are clear, but we still need to deploy our host to run in Azure. We could certainly deploy the CfpLibraryHost console application to a web app in App Service as an Azure WebJob. But the simplest way to achieve this is by using Azure Functions with a Cosmos DB trigger.

So tune in to part 3, where I’ll conclude this three-part series by showing you how to deploy our solution to Azure using Azure Functions.

Multiple Partition Keys in Azure Cosmos DB (Part 1) – Querying the Change Feed Directly

To begin, let’s be clear that an Azure Cosmos DB container can have only one partition key. I say this from the start in case “multiple partition keys” in the title is somehow misinterpreted to imply otherwise. You always need to come up with a single property to serve as the partition key for every container and choosing the best property for this can sometimes be difficult.

Understanding the Problem

Making the right choice requires intimate knowledge about the data access patterns of your users and applications. You also need to understand how horizontal partitioning works behind the scenes in terms of storage and throughput, how queries and stored procedures are scoped by partition key, and the performance implications of running cross-partition queries and fan-out queries. So there are many considerations to take into account before settling on the one property to partition a container on. I discuss all this at length in my previous blog post Horizontal Partitioning in Azure Cosmos DB.

But here’s the rub. What if, after all the analysis, you come to realize that you simply cannot settle on a single property that serves as an ideal partition key for all scenarios? Let’s say for example, from a write perspective, you find one property will best distribute writes uniformly across all the partitions in the container. But from a query perspective, you find that using the same partition key results in too much fanning out. Or, you might identify two categories of common queries, where it’s roughly 50/50; meaning, about half of all the queries are of one type, and half are of the other. What do you do if the two query categories would each benefit from different partition keys?

Your brain can get caught in an infinite loop over this until you wind up in that state of “analysis paralysis,” where you recognize that there’s just no single best property to choose as the partition key. To break free, you need to think outside the box. Or, let’s say, think outside the container. Because the solution here is to simply create another container that’s a complete “replica” of the first. This second container holds the exact same set of documents as the first but defines a different partition key.

I placed quotes around the word “replica” because this second container is not technically a replica in the true Cosmos DB sense of the word (where, internally, Cosmos DB automatically maintains replicas of the physical partitions in every container). Rather, it’s a manual replica that you maintain yourself. Thus, it’s your job to keep it in sync with changes when they happen in the first container, which is partitioned by a property that’s optimized for writes. As those writes occur in real time, you need to respond by updating the second collection, which is partitioned by a property that’s optimized for queries.

Enter Change Feed

Fortunately, change feed comes to the rescue here. Cosmos DB maintains a persistent record of changes for every container that can be consumed using the change feed. This gives you a reliable mechanism for retrieving changes made to any container, all the way back to the beginning of time. For an introduction to change feed, have a look at my previous blog post Change Feed – Unsung Hero of Azure Cosmos DB

In this three-part series of blog posts, I’ll dive into different techniques you can use for consuming the change feed to synchronize containers:

Let’s get started. Assume that we’ve done our analysis and established that city is the ideal partition key for writes, as well as roughly half of the most common queries our users will be running. But we’ve also determined that state is the ideal partition key for the other (roughly half) commonly executed queries. This means we’ll want one container partitioned by city, and another partitioned by state. And we’ll want to consume the city-partitioned container’s change feed to keep the state-partitioned container in sync with changes as they occur. We’ll then be able to direct our city-based queries to the first container, and our state-based queries to the second container, which then eliminates fan-out queries in both cases.

Setting Up

If you’d like to follow along, you’ll need to be sure your environment is setup properly. First, of course, you’ll need to have a Cosmos DB account. The good news here is that you can get a free 30-day account with the “try cosmos” offering, which doesn’t even require a credit card or Azure subscription (just a free Microsoft account). Even better, there’s no limit to the number of times you can start a new 30-day trial. Create your free account at http://azure.microsoft.com/try/cosmosdb.

You’ll need your account’s endpoint URI and master key to connect to Cosmos DB from C#. To obtain them, head over to your Cosmos DB account in the Azure portal, open the Keys blade, and keep it open so that you can handily copy/paste them into the project.

You’ll also need Visual Studio. I’ll be using Visual Studio 2019, but the latest version of Visual Studio 2017 is fine as well. You can download the free community edition at https://visualstudio.microsoft.com/downloads.

Querying the Change Feed Directly

We’ll begin with the raw approach, which is to query the change feed directly using the SDK. The reality is that you’ll almost never want to go this route, except for the simplest small-scale scenarios. Still, it’s worth taking some time to examine this approach first, as I think you’ll benefit from learning how the change feed operates at a low level, and it will enhance your appreciation of the Change Feed Processor (CFP) library which I’ll cover in the next blog post of this series.

Fire up Visual Studio, create a new .NET Core console application, and name the project ChangeFeedDirect, and name the solution ChangeFeedDemos (we’ll be adding more projects to this solution in parts 2 and 3 of this blog series). Next, add the SDK to the ChangeFeedDirect project from the NuGet package Microsoft.Azure.DocumentDB.Core:

We’ll write some basic code to create a database with the two containers, with additional methods to create, update, and delete documents in the first container (partitioned by city). Then we’ll write our “sync” method that directly queries the change feed on the first container, in order to update the second container (partitioned by state) and reflect all the changes made.

Note: Our code (and the SDK) refers to containers as collections.

We’ll write all our code inside the Program.cs file. First, update the using statements at the very top of the file to get the right namespaces imported:

using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

Next, at the very top of the Program class, set two private string constants for your Cosmos DB account’s endpoint and master key. You can simply copy them from the Keys blade in the Azure portal, and paste them right into the code:

private const string CosmosEndpoint = "https://<account-name>.documents.azure.com:443/";
private const string CosmosMasterKey = "<account-key>";

Now edit the Main method with one line to call the Run method:

static void Main(string[] args)
{
  Task.Run(Run).Wait();
}

This one-liner invokes the Run method and waits for it to complete. Add the Run method next, which displays a menu and calls the various methods defined for each menu option:

private static async Task Run()
{
  while (true)
  {
    Console.WriteLine("Menu");
    Console.WriteLine();
    Console.WriteLine("DB - Create database");
    Console.WriteLine("CD - Create documents");
    Console.WriteLine("UD - Update document");
    Console.WriteLine("DD - Delete document");
    Console.WriteLine("SC - Sync collections");
    Console.WriteLine("Q - Quit");
    Console.WriteLine();
    Console.Write("Selection: ");

    var input = Console.ReadLine().ToUpper().Trim();
    if (input == "Q") break;
    else if (input == "DB") await CreateDatabase();
    else if (input == "CD") await CreateDocuments();
    else if (input == "UD") await UpdateDocument();
    else if (input == "DD") await DeleteDocument();
    else if (input == "SC") await SyncCollections();
    else Console.WriteLine("\nInvalid selection; try again\n");
  }
}

We’ll add the menu option methods one at a time, starting with CreateDatabase:

private static async Task CreateDatabase()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    // Create the database

    var dbUri = UriFactory.CreateDatabaseUri("multipk");

    try { await client.DeleteDatabaseAsync(dbUri); } catch { }

    await client.CreateDatabaseAsync(new Database { Id = "multipk" });

    // Create the two collections

    var partitionKeyDefinition = new PartitionKeyDefinition();
    partitionKeyDefinition.Paths.Add("/city");
    await client.CreateDocumentCollectionAsync(dbUri, new DocumentCollection
    {
      Id = "byCity",
      PartitionKey = partitionKeyDefinition,
      DefaultTimeToLive = -1
    }, new RequestOptions { OfferThroughput = 400 });

    partitionKeyDefinition = new PartitionKeyDefinition();
    partitionKeyDefinition.Paths.Add("/state");
    await client.CreateDocumentCollectionAsync(dbUri, new DocumentCollection
    {
      Id = "byState",
      PartitionKey = partitionKeyDefinition
    }, new RequestOptions { OfferThroughput = 400 });

    // Load the sync document into the first collection

    var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");
    var syncDocDef = new
    {
      city = "sync",
      state = "sync",
      lastSync = default(DateTime?)
    };
    await client.CreateDocumentAsync(collUri, syncDocDef);
  }

  Console.WriteLine("Created database for change feed demo");
}

Most of this code is intuitive, even if you’ve never done any Cosmos DB programming before. We first use our endpoint and master key to create a DocumentClient, and then we use the client to create a database named multipk with the two containers in it.

This works by first calling DeleteDatabaseAsync wrapped in a try block with an empty catch block. This effectively results in “delete if exists” behavior to ensure that the multipk database does not exist when we call CreateDatabaseAsync to create it.

Next, we call CreateDocumentCollection twice to create the two containers (again, a collection is a container). We name the first container byCity and assign it a partition key of /city, and we name the second container byState and assign /state as the partition key. Both containers reserve 400 request units (RUs) per second, which is the lowest throughput you can provision.

Notice the DefaultTimeToLive = -1 option applied to the first container. At the time of this writing, change feed does not support deletes. That is, if you delete a document from a container, it does not get picked up by the change feed. This may be supported in the future, but for now, the TTL (time to live) feature provides a very simple way to cope with deletions. Rather than physically deleting documents from the first container, we’ll just update them with a TTL of 60 seconds. That gives us 60 seconds to detect the update in the change feed, so that we can physically delete the corresponding document in the second container. Then, 60 seconds later, Cosmos DB will automatically physically delete the document from the first container by virtue of its TTL setting. You’ll see all this work in a moment when we run the code.

The other point to call out is the creation of our sync document, which is a special metadata document that won’t get copied over to the second container. Instead, we’ll use it to persist a timestamp to keep track of the last time we synchronized the containers. This way, each time we sync, we can request the correct point in time from which to consume changes that have occurred since the previous sync. The document is initialized with a lastSync value of null so that our first sync will consume the change feed from the beginning of time. Then lastSync is updated so that the next sync picks up precisely where the first one left off.

Now let’s implement CreateDocuments. This method simply populates three documents in the first container:

private static async Task CreateDocuments()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

    var dataDocDef = new
    {
      city = "Brooklyn",
      state = "NY",
      slogan = "Kings County"
    };
    await client.CreateDocumentAsync(collUri, dataDocDef);

    dataDocDef = new
    {
      city = "Los Angeles",
      state = "CA",
      slogan = "Golden"
    };
    await client.CreateDocumentAsync(collUri, dataDocDef);

    dataDocDef = new
    {
      city = "Orlando",
      state = "FL",
      slogan = "Sunshine"
    };
    await client.CreateDocumentAsync(collUri, dataDocDef);
  }

  Console.WriteLine("Created 3 documents in city collection");
}

Notice that all three documents have city and state properties, where the city property is the partition key for the container that we’re creating these documents in. The state property is the partition key for the second container, where our sync method will create copies of these documents as it picks them up from the change feed. The slogan property is just an ordinary document property. And although we aren’t explicitly supplying an id property, the SDK will automatically generate one for each document with a GUID as the id value.

We’ll also have an UpdateDocument method to perform a change on one of the documents:

private static async Task UpdateDocument()
{
  var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    // Update a document
    var brooklynDoc = client
      .CreateDocumentQuery(collUri, "SELECT * FROM c WHERE c.city = 'Brooklyn'")
      .AsEnumerable()
      .FirstOrDefault();

    brooklynDoc.slogan = "Fuhgettaboutit! " + Guid.NewGuid().ToString();
    await client.ReplaceDocumentAsync(brooklynDoc._self, brooklynDoc);
  }

  Console.WriteLine("Updated Brooklyn document in city collection");
}

This code retrieves the Brooklyn document, updates the slogan property, and calls ReplaceDocumentAsync to persist the change back to the container.

Next comes the DeleteDocument method:

private static async Task DeleteDocument()
{
  var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    // Delete a document (set time-to-live at 60 seconds)
    var orlandoDoc = client
      .CreateDocumentQuery(collUri, "SELECT * FROM c WHERE c.city = 'Orlando'")
      .AsEnumerable()
      .FirstOrDefault();

    orlandoDoc.ttl = 60;
    await client.ReplaceDocumentAsync(orlandoDoc._self, orlandoDoc);
  }

  Console.WriteLine("Deleted Orlando document in city collection");
}

Remember that (currently) the change feed doesn’t capture deleted documents, so we’re using the TTL (time to live) technique to keep our deletions in sync. Rather than calling DeleteDocumentAsync to physically delete the Orlando document, we’re simply updating it with a ttl property set to 60 and saving it back to the container with ReplaceDocumentAsync. To the change feed, this is just another update, so our sync method will pick it up normally as you’ll see in a moment. Meanwhile, Cosmos DB will physically delete the Orlando document from the first container in 60 seconds, giving our sync method up to one minute to pick it up from the change feed and delete it from the second container.

And finally, the sync method, which is what this whole discussion is all about. Here’s the code for SyncCollections:

private static async Task SyncCollections()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    var cityCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

    // Obtain last sync time

    var syncDoc = client
      .CreateDocumentQuery(cityCollUri, "SELECT * FROM c WHERE c.city = 'sync'")
      .AsEnumerable()
      .FirstOrDefault();

    var lastSync = (DateTime?)syncDoc.lastSync;

    // Step 1: Gather all the partition key ranges (physical partitions)

    var continuationToken = default(string);
    var partitionKeyRanges = new List<PartitionKeyRange>();
    var loop = true;

    while (loop)
    {
      var partitionKeyRange = await client
        .ReadPartitionKeyRangeFeedAsync(
          cityCollUri,
          new FeedOptions { RequestContinuation = continuationToken });

      partitionKeyRanges.AddRange(partitionKeyRange);
      continuationToken = partitionKeyRange.ResponseContinuation;
      loop = continuationToken != null;
    }

    // Step 2: Consume the change feed for each partition key range

    // (simple demo doesn't scale when continuation tokens are needed
    foreach (var partitionKeyRange in partitionKeyRanges)
    {
      var options = new ChangeFeedOptions
      {
        PartitionKeyRangeId = partitionKeyRange.Id,
        StartFromBeginning = (lastSync == null),
        StartTime = (lastSync == null ? null : lastSync),
      };

      var query = client.CreateDocumentChangeFeedQuery(cityCollUri, options);

      while (query.HasMoreResults)
      {
        var readChangesResponse = await query.ExecuteNextAsync();
        foreach (var changedDocument in readChangesResponse)
        {
          if (changedDocument.city != "sync")
          {
            if (JsonConvert.DeserializeObject(changedDocument.ToString())["ttl"] == null)
            {
              var stateCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byState");
              await client.UpsertDocumentAsync(stateCollUri, changedDocument);
              Console.WriteLine($"Upserted document id {changedDocument.id} in byState collection");
            }
            else
            {
              var stateDocUri = UriFactory.CreateDocumentUri("multipk", "byState", changedDocument.id);
              await client.DeleteDocumentAsync(stateDocUri, new RequestOptions
              {
                PartitionKey = new PartitionKey(changedDocument.state)
              });
              Console.WriteLine($"Deleted document id {changedDocument.id} in byState collection");
            }
          }
        }
      }
    }

    // Update last sync time
    syncDoc.lastSync = DateTime.UtcNow;
    await client.ReplaceDocumentAsync(syncDoc._self, syncDoc);
  }

}

Let’s break this down. First, we grab the last sync time from the sync document in the first container. Remember, this will be null the very first time we run this method. Then, we’re ready to query the change feed, which is a two-step process.

For step 1, we need to discover all the partition key ranges in the container. A partition key range is essentially a set of partition keys. In our small demo, where we have only one document each across three distinct partition keys (cities), Cosmos DB will host all three of these documents inside a single partition key range.

Although there is conceptually only one change feed per container, there is actually one change feed for each partition key range in the container. So step 1 calls ReadPartitionKeyRangeFeedAsync to discover the partition key ranges, with a loop that utilizes a continuation token from the response so that we retrieve all of the partition key ranges into a list.

Then, in step 2, we iterate the list to consume the change feed on each partition key range. Notice the ChangeFeedOptions object that we set on each iteration, which identifies the partition key range in PartitionKeyRangeId, and then sets either StartFromBeginning or StartTime, depending on whether lastSync is null or not. If it’s null (which will be true only on the very first sync), then StartFromBeginning will be set to true and StartTime will be set to null. Otherwise, StartFromBeginning gets set to false, and StartTime gets set to the timestamp from the last sync.

After preparing the options, we call CreateDocumentChangeFeedQuery that returns an iterator. As long as the iterator’s HasMoreResults property is true, we call ExecuteNextAsync on it to fetch the next set of results from the change feed. And here, ultimately, is where we plug in our sync logic.

Each result is a changed document. We know this will always include the sync document, because we’ll be updating it after every sync. This is metadata that we don’t need copied over to the second container each time, so we filter out the sync document by testing the city property for “sync.”

For all other changed documents, it now becomes a matter of performing the appropriate create, update, or delete operation on the second container. First, we check to see if there is a ttl property on the document. Remember that this is our indication of whether this is a delete or not. If the ttl property isn’t present, then it’s either a create or an update. In either case, we handle the change by calling UpsertDocumentAsync on the second container (upsert means “update or insert”).

Otherwise, if we detect the ttl property, then we call DeleteDocumentAsync to delete the document from the second container, knowing that Cosmos DB will delete its counterpart from the first container when the ttl expires.

Let’s test it out. Start the console app and run the DB (create database) and CD (create documents) commands. Then navigate to the Data Explorer in the Azure portal to verify that the database exists with the two containers, and that the byCity container has three documents in it, plus the sync document with a lastSync value of null indicating that no sync has yet occurred:

The byState container should be empty at this point, because we haven’t run our first sync yet:

Back in the console app, run the SC command to sync the containers. This copies all three documents from the first container’s change feed over to the second container, skipping the sync document which we excluded in our code:

Upserted document id 01501a99-e8df-4c3b-9892-ed2eadb81180 in byState collection
Upserted document id fb8d41ae-3aae-4892-bfe9-8c34bc8138d2 in byState collection
Upserted document id 5f5600f3-9f34-4a4d-bdb4-28061a5ab35a in byState collection

Returning to the portal, refresh the data explorer to confirm that the second container now has the same three documents as the first, although here they are partitioned by state rather than city:

Both containers are now in sync. Refresh the first container view, and you’ll see that the lastSync property has been changed from null to a timestamp from when the previous sync ran.

Run the UD command in the console app the update the first container with a change to the slogan property of the Brooklyn, NY document. Now run SC to sync the containers again:

Upserted document id 01501a99-e8df-4c3b-9892-ed2eadb81180 in byState collection

Refresh the second container view, and you’ll see that the slogan property has now been updated there as well.

Finally, run the DD command in the console app the delete the Orlando, FL document from the first container. Remember that this doesn’t actually delete the document, but rather updates it with a ttl property set to 60 seconds. Then run SC to sync the containers again:

Deleted document id 5f5600f3-9f34-4a4d-bdb4-28061a5ab35a in byState collection

You can now confirm that the Orlando, FL document is deleted from the second container, and within a minute (upon ttl expiration), you’ll see that it gets deleted from the first container as well.

However, don’t wait longer than a minute after setting the ttl before running the sync or you will run out of time. Cosmos DB will delete the document from the first container when the ttl expires, at which point it will disappear from the change feed and you will lose your chance to delete it from the second container.

What’s Next?

It didn’t take that much effort to consume the change feed, but that’s only because we have a tiny container with just a handful of changes, and we’re manually invoking each sync. To consume the change feed at scale, much more work needs to be done. For example, the change feed on each partition key range of the container can be consumed concurrently, so we could add multithreading logic to parallelize those queries. Long change feeds can also be consumed in chunks, using continuation tokens that we could persist as a “lease,” so that new clients can resume consumption where previous clients left off. We also want the sync automated, so that we don’t need to poll manually.

Fortunately, the Change Feed Processor (CFP) library handles all these details for you. It was certainly beneficial to start by querying the change feed directly, since exploring that option first is a great way to learn how change feed works internally. However, unless you have very custom requirements, the CFP library is the way to go.

So tune in to part 2, and we’ll see how much easier it is to implement our multiple partition key solution much more robustly using the CFP library.

Change Feed – Unsung Hero of Azure Cosmos DB

Introduction

Azure Cosmos DB is rapidly growing in popularity, and for good reason. Microsoft’s globally distributed, multi-model database service has massively scalable storage and throughput, provides a sliding scale for consistency, is fully and automatically indexed, and it exposes multiple APIs. Throw in a server-side programming model for ACID transactions with stored procedures, triggers, and user-defined functions, along with 99.999% SLAs on availability, throughput, latency, and consistency, and it’s easy to see why Cosmos DB is fast winning the hearts of developers and solution architects alike.

Yet still today, one of the most overlooked capabilities in Cosmos DB is its change feed. This little gem sits quietly behind every container in your database, watches for changes, and maintains a persistent record of them in the order they occur. This provides you with a reliable mechanism to consume a continuous and incremental feed of changes, as documents are actively written or modified in any container.

There are numerous use cases for this, and I’ll call out a few of the most common ones in a moment. But all of them share a need to respond to changes made to a Cosmos DB container. And the first thought that comes to the mind of a relational database developer is to use a trigger for this. Cosmos DB supports triggers as part of its server-side programming model, so it could be natural to think of using this feature to consume changes in real time when you need to.

Unfortunately, though, triggers in Cosmos DB do not fire automatically as they do in the relational world. They need to be explicitly referenced with each change in order to run, so they cannot be relied upon for capturing all changes made to a container. Furthermore, triggers are JavaScript-only, and they run in a bounded execution environment within Cosmos DB that is scoped to a single partition key. These characteristics further limit what triggers can practically accomplish in response to a change.

But with change feed, you’ve got a reliable mechanism for retrieving changes made to any container, all the way back to the beginning of time. You can write code (in your preferred language) that consumes the change feed to process it as needed and deploy that code to run on Azure. This paves an easy path for you to build many different solutions for many different scenarios.

Scenarios for Change Feed

Some of the more common use cases for change feed include:

  • Replicating containers for multiple partition keys
  • Denormalizing a document data model across containers
  • Triggering API calls for an event-driven architecture
  • Real time stream processing and materialized view patterns
  • Moving or archiving data to secondary data stores

Each of these deserves their own focused blog post (and will hopefully get one). For the broader context of this overview post, however, I’ll discuss them each at high level.

Replicating containers for multiple partition keys

One of the most (perhaps the most) important things you need to do when creating a container is to decide on an appropriate partition key – a single property in your data that the container will be partitioned by.

Now sometimes this is easy, and sometimes it is not. In order to settle on the correct choice, you need a clear understanding of how your data is used (written to and queried), and how horizontal partitioning works. You can read all about this in my previous blog post, Horizontal Partitioning in Azure Cosmos DB.

But what do you do when you can’t decide? What if there are two properties that make good choices, one for write performance, and another that’s better for query performance? This can lead to “analysis paralysis,” a scary condition that is fortunately and easily remedied using change feed.

All you do is create two containers, each partitioned by a different partition key. The first container uses a partition key that’s optimized for writes (it may also be appropriate for certain types of queries as well), while the second one uses a partition key optimized for most typical queries. Simply use change feed to monitor changes made to the container as the writes occur and replicate the changes out to the second container.

Your application then writes to the first container and queries from the second container, simple as that! I’ll show you a detailed walkthrough of exactly how to implement this using C# and the Change Feed Processor Library with Azure Functions in my next post.

Denormalizing a document data model across containers

Developers with a background in relational database design often struggle initially with the denormalized approach to data modeling in the NoSQL world of JSON documents. I personally empathize; from my own experience, I know that it can be difficult at first to embrace concepts that run contrary to deeply engrained practices that span decades of experience in the field.

Data duplication is a case in point, where this is considered a big no-no in the normalized world of relational databases with operational workloads. But with NoSQL, we often deliberately duplicate data in order to avoid expensive additional lookups. There is no concept of a JOIN in any NoSQL database engine, and we can avoid having to perform our own “manual” joins if we simply duplicate the same information across documents in different containers.

This is a somewhat finer-grained version of the previous scenario, which replicates entire documents between two containers. In this case, we have different documents in each container, with data fragments from changed documents in one container being replicated into other (related) documents in another container.

But how do you ensure that the duplicated data remains in sync as changes occur in the source container? Why, change feed of course! Just monitor the source container and update the target container. I’ll show you exactly how to do this in a future post.

Triggering API calls for an event-driven architecture

In this scenario, you source events to a set of microservices, each with a single responsibility. For instance, an ecommerce website with a large-scale order processing pipeline. The pipeline is broken up into a set of smaller microservices, each of which can be scaled out independently. Each microservice is responsible for a single task in the pipeline, such as calculating tax on each order, generating tax audit records, processing each order payment, sending orders off to a fulfillment center, and generating shipping notifications.

Thus, you potentially have N microservices communicating with up to N-1 other microservices, which adds significant complexity to the larger architecture. The design can be greatly simplified if, instead, all these microservices communicate through a bus; that is, a persistent event store. And Cosmos DB serves as an excellent persistent event store, because the change feed makes it easy to broker and source these events to each microservice. Furthermore, because the events themselves are persisted, the order processing pipeline itself is very robust and incredibly resilient to failures. You can also query and navigate the individual events, so that this data can be surfaced out through a customer care API.

Check out this link for a real-world case study of how Jet.com implemented an event-sourced architecture using the Cosmos DB change feed with hundreds of microservices: https://customers.microsoft.com/en-us/story/jet-com-powers-innovative-e-commerce-engine-on-azure-in-less-than-12-months. There is also a video on this from Microsoft Build 2018 here: https://channel9.msdn.com/Events/Build/2018/BRK3602

Real time stream processing and materialized view patterns

The change feed can also be used for performing real time stream processing and analytics. In the order processing pipeline scenario, for example, this would enable you to take all the events and materialize a single view for tracking the order status. You could then easily and efficiently present the order status through a user-facing API.

Other examples of so called “lambda architectures” include performing real time analytics on IoT telemetry or building a scoring leader board for a massively multiplayer online video game.

Moving or archiving data to secondary data stores

Another common scenario for using the change feed involves replicating data from Cosmos DB as your primary (hot) store to some other secondary (cold) data store. Cosmos DB is a superb hot store because it can sustain heavy write ingestion, and then immediately serve the ingested records back out to a user-facing API.

Over time as the volume of data mounts, however, you may want to offload older data to cold storage for archival. Once again, change feed is a wonderful mechanism to implement a replication strategy that does just that.

Consuming the Change Feed

So how do you actually work with the change feed? There are several different ways, and I’ll conclude this blog post by briefly explaining three of them. (Don’t worry, I’ll drill deeper into all three next post!)

Direct Access

First, you can query the change feed directly using the SDK. This raw approach works but is the hardest to implement at large scale. Essentially, you first need to discover all the container’s partitions, and then you query each of them for their changes. You also need to persist state metadata; for example, a timestamp for when the change feed was last queried, and a continuation token for each partition. Plus, you’ll want to optimize performance by spawning multiple tasks across different partitions so that they get processed in parallel.

If all this sounds like a lot of work, it is. Which is why you’ll almost certainly want to leverage the Change Feed Processor (CFP) library instead.

Change Feed Processor (CFP) Library

The CFP library provides a high-level abstraction over direct access that greatly simplifies the process of reading the change feed from all the different partitions of a container. This is a separate NuGet package that you pull into your project, and it handles all the aforementioned complexity for you. It will automatically persist state, track all the partitions of the container, and acquire leases so that you can scale out across many consumers.

To make this work, the CFP library persists a set of leases as documents in another dedicated Cosmos DB container. Then, when you spin up consumers, they attempt to acquire leases as they expire.

All you do is write an observer class that implements IChangeFeedObserver. The primary method of this interface that you need to implement is ProcessChangesAsync, which receives the change feed as a list of documents that you can process as needed. No partitions to worry about, no timestamps or continuation tokens to persist, and no scale-out needs to concern yourself with.

However, you still need to write your own host, and deploy the DLL with your observer class to an Azure app service. Although the process is straightforward, going with Azure Functions instead provides an even easier deployment model.

Azure Functions

The simplest way to consume the change feed is by using Azure Functions with a Cosmos DB trigger. If you’re not already familiar with Azure Functions, they let you write individual methods (functions), which you deploy for execution in a serverless environment hosted on Azure. The term “serverless” here means without also having to write a host and deploy an Azure app service to run your code.

Azure Functions are invoked by one of any number of triggers. Yes, Azure Functions also uses the term triggers, but unlike triggers in Cosmos DB, an Azure Functions trigger always fires when its conditions are met. There are several different triggers available, including the one that we care about here, the Cosmos DB trigger. This Azure Functions trigger binds to configuration that points to the container you want to monitor changes on, and the lease collection that gets managed by the CFP library under the covers.

From there, the process is identical to using the CFP library, only the deployment model is dramatically simpler. The Azure Functions method that is bound using the Cosmos DB trigger receives a parameter with the same list of documents that a CFP library’s observer class does in its ProcessChangesAsync method, and you process change feed data the way you need to just the same.

What’s Next?

Hopefully, this blog post opened your eyes to the change feed in Azure Cosmos DB. This powerful feature creates all sorts of exciting possibilities across a wide range of use cases. In the next installment, I’ll focus on container replication for multiple partition keys, and walk you step by step through the process of building a change feed solution using direct access, CFP library, and Azure Functions – all with working code samples. So, stay tuned!

Understanding Consistency Levels in Azure Cosmos DB

Developers with a background in relational databases are accustomed to achieving data integrity using transactions. Once a writer updates a bank balance and commits the transaction, it’s entirely unacceptable for a reader to ever be served the previous value, and a relational database ensures that this never happens. In the NoSQL world, this is referred to as strong consistency. Achieving strong consistency in a NoSQL database is more challenging, because NoSQL databases by design write to multiple replicas. And in the case of Azure Cosmos DB, these replicas can be geographically spread across multiple Microsoft data centers throughout the world.

First though, let’s understand consistency within the context of a single data center.

In one Azure data center, your data is written out to multiple replicas (four at least). Consistency is all about whether or not you can be sure that the data you are reading at any point in time is – in fact – the latest version of that data, because, you can be reading from any replica at any given time. And if it’s not the latest version, then this is known as a “dirty read.”

Cosmos DB supports five consistency levels, ranging from strong to eventual. I’ll discuss each of these levels in a moment, but you can see that there’s a sliding scale here that balances latency with availability:

On the one extreme, strong consistency guarantees you’ll never experience a dirty read, but of course, this guarantee comes with a cost, because Cosmos DB can’t permit reads until all the replicas are updated. This results in higher latency, which degrades performance.

On the other end of the spectrum, there’s eventual consistency, which offers no guarantees whatsoever, and you never know with certainty whether or not you’re reading the latest version of data. This gives Cosmos DB the freedom to serve your reads from any available replica, without having to wait for that replica to receive the latest updates from other writes. In turn, this results in the lowest latency, and delivers the best performance, at the cost of potentially experiencing dirty reads.

Replication within a region

Consistency is always a concern, but practically, it only becomes really important once you start geo-replicating your data. Because the reality is that, despite these varying consistency levels, it’s extremely rare to ever experience a dirty read within a data center, even when using eventual consistency, which is the weakest consistency level. And that’s because the replicas in a data center are located very close to one another, and data moves extremely fast between them… typically within one millisecond.

So in fact, there is little difference in consistency, latency, performance, and availability within a data center, where you’ll almost never encounter a dirty read, but it’s a completely different matter once you start replicating globally.

Global replication

Once you globally distribute your data, it takes hundreds of milliseconds to replicate changes across continents. This greatly increases the chances for dirty reads, which is why consistency becomes a much more pressing concern once you globally distribute your data across multiple data centers.

Five Consistency Levels

Strong

In the case of Strong Consistency, you can be certain that you’re always reading the latest data. But you pay for that certainty in performance. Because when somebody writes to the database, everybody waits for Cosmos DB to acknowledge that the write was successfully saved to all the replicas. Only then can Cosmos DB serve up consistent reads, guaranteed. Cosmos DB actually does support strong constency across regions, assuming that you can tolerate the high latency that will result while replicating globally.

Bounded Staleness

Next there’s bounded staleness, which is kind of one step away from strong. As the name implies, this means that you can tolerate stale data, but up to a point. So you can actually configure much how out of date you want to allow for stale reads. Think of it as controlling the level of freshness, where you might have dirty reads, but only if the data isn’t too much out of date. Otherwise, for data that’s too stale, Cosmos DB will switch to strong consistency. In terms of defining how stale is stale, you can specify lag in terms of time – like, no stale data older than X, or update operations – as in, no stale data that’s been updated more than Y number of times.

Session

And then there’s session consistency, which is actually the default consistency level. With session consistency, you maintain a session for your writer – or writers – by defining a unique session key that all writers include in their requests. Cosmos DB then uses the session key to ensure strong consistency for the writer, meaning that a writer is always guaranteed to read the same data that they wrote, and will never read a stale version from an out-of-date replica – while everyone else may experience dirty reads.

When you create a DocumentClient instance with the .NET SDK, it automatically establishes a session key and includes it with every request. This means that within the lifetime of a DocumentClient instance, you are guaranteed strong consistency for reading any data that you’ve written, when using session consistency. This is a perfect balance for many scenarios, which is why session consistency the default. Because very often, you want to sure immediately that the data you’ve written has – in fact – been written, but it’s perfectly OK if it takes a little extra time for everyone else to see what you’ve written, once all the replicas have caught up with your write.

Consistent Prefix

With consistent prefix, dirty reads are always possible. However, you do get some guarantees. First, when you do get stale data from a dirty read, you can at least be sure that the data you get has in fact been updated to all the replicas, even thought it’s not the most up-to-date version of that data, which has still yet to be replicated. And second, you’ll never experience reads out of order. For example, say the same data gets updated four times, so there are versions A, B, C, and D, where D is the must up to date version, and A, B, and C are stale versions. Now assume A and B have both been replicated, but C and D haven’t yet. You can be sure that, until C and D do get replicated, you’ll never read versions A and B out of order. That is, you might first get version A when only A has been replicated, and then get version B once version B has been replicated, and then you’ll never again get version A. You can only get the stale versions in order, so you’ll only continue getting version B until a later version gets replicated.

Eventual

Eventual consistency is the weakest consistency level. It offers no guarantees whatsoever, in exchange for the lowest latency and best performance compared to all the other consistency levels. With eventual, you never wait for any acknowledgement that data has been fully replicated before reading. This means that read requests can be serviced by replicas that have already been updated, or by those that haven’t. So you can get inconsistent results in a seemingly random fashion, until eventually, all the replicas get updated and then all the reads become consistent – until the next write operation, of course. So this really is the polar opposite of strong consistency, because now there’s no guarantee that you’re ever reading the latest data, but there’s also never any waiting, which delivers the fastest performance.

One more thing…

It’s also worth mentioning that when bounded staleness and session consistency levels don’t apply strong consistency, they fall back to consistent prefix, not all the way down eventual consistency. So for bounded staleness, which gives you strong consistency when the data is too stale, you’ll still get consistent prefix guarantees when the data isn’t too stale. Likewise, with session, which gives you strong consistency when reading your own writes, you’ll get consistent prefix when reading other writes.

Adding a User to your Azure Subscription with Resource Group Access

Introduction

So, you’ve got your Azure subscription in place, and you’re the global administrator. Now you want to let someone else access your subscription, but only a specific resource group within your subscription. In this blog post, I’ll show you how to add a new user to your Azure subscription’s directory, and how to then grant permission for that user to a specific resource group within your Azure subscription that they can manage. The new user won’t be able to see or manage any resources in your subscription outside the resource group that you grant them access for.

Step-by-step procedure

Let’s get started. First, log in to the Azure portal and open your subscription’s directory. To do this, search for directory and choose Azure Active Directory, as follows:

Next, take note of the directory name; this is the domain name for the email address of the users you can create in this directory. It will be based on your username, followed by .onmicrosoft.com. In my case, with username lennil@hotmail.com, the directory name is lennilhotmail.onmicrosoft.com.

Now click on Users:

You will see your username listed. Now click New user:

In the User blade, supply information for the new user. This includes the display name and the username. The username must be in the form of an email address, where the domain name matches the directory name.

Also check Show Password to view the auto-generated password so that you can send it to the new user (the portal will require that they change it the first time they log in).

Here I’m creating a new user for my buddy Andrew Brust:

At this point, I have created a new user for Andrew:

When Andrew logs in for the first time, he will be required to change his password. The login will succeed, but he won’t be able to see anything in the subscription until we grant him access to a specific resource group. Let’s do that next.

Click Resource groups, then select the resource group you want to give the user access to. Here I’m giving Andrew access to the sql-demo-rg resource group:

Next, click Access control (IAM):

We need to add the new user to this resource group. So click Add:

From the Role dropdown, select Owner. Then click on the new user and click Save:

This will make the new user an Owner over the entire resource group so that they can fully manage all the resources inside that group (and they can also create new resources inside the resource group). They will still have no access to any other resources in any other resource groups across your subscription.

You’re done! The new user now has full access to the resource group (and can’t see anything else) on the subscription.

To confirm, go back to the Active Directory blade for the new user and click Azure resources:

Here you can see that Andrew has Owner access to the sql-demo-rg resource group, but no access to anything else in the subscription.

Summary

In this blog post, I showed you how to create a new user to your Azure subscription directory, and how to grant Owner permissions for that user to a specific resource group in the subscription. Hope you all find this useful!

Demystifying the Multi-Model Capabilities in Azure Cosmos DB

When someone casually asks me, “Hey, what is Cosmos DB?,” I casually respond, “Well, that’s Microsoft’s globally distributed, massively scalable, horizontally partitioned, low latency, fully indexed, multi-model NoSQL database, of course.” One of two things happen then. I’ll often get a long weird look, after which the other person politely excuses themselves and moves along. But every now and again, I’ll hear “wow, that sounds awesome, tell me more!” If you’re still reading this, then I’m guessing you’re in the latter category.

If you start to elaborate on each of the bullet points in my soundbite response, there’s a lot to discuss before you get to “multi-model NoSQL” at the tail end. Starting with “globally distributed,” Cosmos DB is – first and foremost – a database designed for modern web and mobile applications, which are (typically) global applications in nature. Simply by clicking the mouse on a map in the portal, your Cosmos DB database is instantly replicated anywhere and everywhere Microsoft hosts a data center (there are nearly 50 of them worldwide, to date). This delivers high availability and low latency to users wherever they’re located.

Cosmos DB also delivers virtually unlimited scale, both in terms of storage – via server-side horizontal partitioning, and throughput – by provisioning a prescribed number of request units (RUs) per second. This ensures low latency, and is backed by comprehensive SLAs to yield predictable database performance for your applications. And it’s unique “inverted indexing” scheme enables automatic indexing of every property that you store, with minimal overhead.

Whew. That’s quite a lot to digest before we even start pondering Cosmos DB’s multi-model support, mentioned there at the end of my lengthy description. In fact, it’s very deliberately placed at the end. Because regardless of which data model you choose, it actually makes no difference to Cosmos DB. The capabilities around global distribution, horizontal partitioning, provisioned throughput, and automatic indexing apply – these are durable concepts that transcend whatever data model you choose. So you get to pick and choose among any of the supported data models, without compromising any of the core features of the Cosmos DB engine.

Which segues right into the topic of this blog post. What exactly is “multi-model”? And specifically, what does it mean for a database platform like Cosmos DB to support multiple data models?

It all boils down to how you’d like to treat your data – and this is where the developer comes in. Because, while massive scale is clearly important (if not critical), developers don’t really care about such details as long as it all “just works.” And it’s Cosmos DB’s job to make sure that this all works. When it comes to actually building applications – well, that’s the developer’s job, and this is where the decision of which data model to choose comes into play.

Depending on the type of application being built, it could be more appropriate to use one data model and not another. For example, if the application focuses more on relationships between entities than the entities themselves, then a graph data model may work better than a document model. In other cases, a developer may want to migrate an existing NoSQL application to Cosmos DB; for example, an existing Mongo DB or Cassandra application. In these scenarios, the Cosmos DB data model will be pre-determined; depending on the back-end database dependency of the application being ported, the developer would choose either the Mongo DB-compatible or Cassandra-compatible data model. Such a migration would require minimal (to no) changes to the existing application code. And yet, in other “green field” situations, developers that are very opinionated about how data should be modeled are free to choose whichever data model they prefer.

Each data model has an API for developers to work with in Cosmos DB. Put another way, the developer chooses an API for their database, and that determines the data model that is used. So, let’s break it down:

MM1

Document Data Model (SQL & MongoDB APIs)

The first thing to point out is that the SQL API is, essentially, the original DocumentDB programming model from the days when Cosmos DB was called DocumentDB. This is arguably the most robust and capable of all the APIs, because it is the only one that exposes a server-side programming model that lets you build fully transactional stored procedures, triggers, and user-defined functions.

So both the SQL and MongoDB APIs give you a document data model, but the two APIs themselves are radically different. Yes, they are similar from a data modeling perspective; you store complete denormalized entities as a hierarchical key-value document model; pure JSON in the case of the SQL API, or BSON in the case of the MongoDB API (BSON is MongoDB’s special binary-encoded version of JSON that extends JSON with additional data types and multi-language support).

The critical difference between the two APIs is the programming interface itself. The SQL API uses Microsoft’s innovative variant of structured query language (SQL) that is tailored for searching across hierarchical JSON documents. It also supports the server-side programming model (for example, stored procedures), which none of the other APIs do.

In contrast, the MongoDB API actually provides wire-level support, which is a compatibility layer that understands the protocol used by the MongoDB driver for sending packets over the network. MongoDB has a built-in find method used for querying documents (unlike the SQL support found in the SQL API). So the MongoDB API appeals to existing MongoDB developers, because they now enjoy the scale-out, throughput, and global distribution capabilities of Cosmos DB, without deserting the MongoDB ecosystem. Because the MongoDB API in Cosmos DB gives you full compatibility with existing MongoDB application code, and lets you continue working with familiar MongoDB tools.

Key-Value Data Model (Table API)

You can also model your data as a key-value store, using the Table API. This API is actually the evolution of Azure Table Storage – one of the very first NoSQL databases available on Azure. In fact, all existing Azure Table Storage customers will eventually be migrated over to Cosmos DB and the Table API.

With this data model, each entity consists of a key and a value pair, but the value itself is a set of key-value pairs. So this is nothing like a table in a relational database, where each row has the same columns; with the Table API in Cosmo DB, each entity’s value can have a different set of key-value pairs.

The Table API appeals primarily to existing Azure Table Storage customers, because it emulates the Azure Table Storage API. Using this API, existing Azure Table Storage apps can be migrated quickly and easily to Cosmos DB. For a new project though, there would be little reason to ever consider using the Table API, when you consider the fact that the SQL API is far more capable than the Table API.

So then, when would you actually choose to use the Table API? Well, again, the primary use case is to migrate an existing Azure Table Storage account over to Cosmos DB, without having to change any code in your applications. Remember that Microsoft is planning to do this for every customer over a long term migration, but there’s no reason to wait for them to do that, if you don’t want to wait. You can migrate the data yourself now, and then immediately start enjoying the benefits of Cosmos DB as a back-end, and you don’t have to make any changes whatsoever to your existing Azure Table Storage applications. You just change the connection string to point to Cosmos DB, and the application continues to work seamlessly against the Cosmos DB Table API.

Graph Data Model (Gremlin API)

You can also choose the Gremlin API, which gives you a graph database based on the Apache Tinkerpop open source project. Graph databases are becoming increasingly popular in the NoSQL world.

What do you put in a graph? One of two things; either a vertex or an edge. Now don’t let these terms intimidate you. They’re just fancy words for entities and relationships, respectively. So a vertex is an entity, and an edge is a one-way relationship between any two vertices. And that’s it – nothing more and nothing less. These are the building blocks of any graph database. And whether you’re storing a vertex or an edge, you can attach any number of arbitrary properties to it; much like the arbitrary key-value pairs you can define for a row using the Table API, or a flat JSON document using the SQL API.

The Gremlin API provides a succinct graph traversal language that enables you to efficiently query across the many relationships that exist in a graph database. For example, in a social networking application, you could easily find a user, then look for all of that user’s posts where the location is NY, and of those, find all the relationships where some other user has commented on or liked those posts.

Columnar (Cassandra API)

There’s a fourth option for choosing a data model, and that’s columnar, using the Cassandra API. Columnar is yet another way of modeling your data, where – in a departure from the typical way of dealing with schema-free data in the NoSQL world – you actually do define the schema of your data up-front. However, data is still stored physically in a column-oriented fashion, so it’s still OK to have sparse columns, and it has good support for aggregations. Columnar is somewhat similar to the Key-Value data model with the Table API, except that every item in the container is required to adhere to the schema defined for the container. And in that sense, columnar is really most similar to columnstore in SQL Server, except of course that it is implemented using a NoSQL architecture, so it’s distributed and partitioned to massively scale out big data.

Atom Record Sequence (ARS)

The fact of the matter is, these APIs merely project your data as different data models; whereas internally, your data is always stored as ARS – or Atom Record Sequence – a Microsoft creation that defines the persistence layer for key-value pairs. Now you don’t need to know anything about ARS; you don’t even need to know that it’s there. But it is there, under the covers, storing all your data as key-value pairs in a manner that’s agnostic to the data-model you’ve chosen to work with.

Because at the end of the day, it’s all just keys and values – not just the key-value data model, but all these data models. They’re all some form of keys and values. A JSON or BSON document is a collection of keys and values, where values can either be simple values, or they can contain nested key-value pairs. The key-value model is clearly based on keys and values, but so are graph and columnar. The relationships you define in a graph database are expressed as annotations that are, themselves key-value pairs, and certainly all the columns defined for a columnar data model can be viewed as key-value pairs as well.

So, these API’s are here to broaden your choices in terms of how you get to treat your data; they bear no consequence on the ability to scale your database. For example, if you want to be able to write SQL queries, you would choose the SQL API, and not the Table API. But if you want MongoDB or Azure Table Storage compatibility, then you’d go with the MongoDB or Table API respectively.

Switching Between Data Models

As you’ve seen, when you choose an API, you are also choosing a data model. And today (since the release of Cosmos DB in May 2017), you choose an API when you create a Cosmos DB account. This means that today, a Cosmos DB account is tied to one API, which ties it to one data model:

MM2

But again, each data model is merely a projection of the same underlying ARS format, and so eventually you will be able to create a single account, and then switch freely between different APIs within the account. So that then, you’ll be able to access one database as graph, key-value, document, or columnar, all at once, if you wish:

MM3

You can also expect to see additional APIs in the future, as Cosmos DB broadens its compatibility support for other database systems. This will enable a an even wider range of developers to stick with their database of choice, while leveraging Cosmos DB as a back end for horizontal partitioning, provisioned throughput, global distribution, and automatic indexing.

Summary

Azure Cosmos DB has multiple APIs and supports multiple data models. In this blog post, we explored the multi-API, multi-model capabilities of Cosmos DB, including the document data model with either the SQL or MongoDB APIs, key-value with the Table API, graph with the Gremlin API, and columnar with the Cassandra API.

Regardless of which data model you choose, however, Cosmos DB stores everything in ARS, and merely projects different data models, based on the different APIs. This provides developers with a wide range of choices for how they’d like to model their data, without making any compromises in scale, partitioning, throughput, indexing, or global distribution.

Horizontal Partitioning in Azure Cosmos DB

In Azure Cosmos DB, partitioning is what allows you to massively scale your database, not just in terms of storage but also throughput. You simply create a container in your database, and let Cosmos DB partition the data you store in that container, to manage its growth.

This means that you just work with the one container as a single logical resource where you store data. And you can just let the container grow and grow, without worrying about scale, because Cosmos DB creates as many partitions as needed behind the scenes to accommodate your data.

Automated Scale-Out

These partitions themselves are the physical storage for the data in your container. Think of partitions as individual buckets of data that, collectively, is the container. But you don’t need to think about it too much, because the whole idea is that it all just works automatically and transparently. So when one partition gets too full, Cosmos DB automatically creates a new partition, and frees up space in the growing partition by splitting its data, and moving some of it over into the new partition.

This technique scales the storage of your container, but the partitions themselves are also replicated internally, and so this also ensures availability of your container. Furthermore, Cosmos DB will split partitions even well before they grow too full, which also scales throughput because there are now more partitions available to satisfy a larger workload.

Life for a container begins with a single partition, and when you start storing data to the container, it gets written to that partition. Then you write some more data, which continues to fill the partition, until at some point, a new partition gets created to store the additional data. And this continues ad infinitum, resulting in horizontal scale-out that gives you a virtually unlimited container for your database:

But the question then becomes, how does Cosmos DB know which data is stored in which partitions? I mean, if your container grows to hundreds or thousands of partitions, how does Cosmos DB know where to look for data when you run a query? Having a piece of data stored arbitrarily in any given partition within the collection means that Cosmos DB needs to check each individual partition when you query. This doesn’t seem very efficient, which is why Cosmos DB can be much smarter about this, and that’s where you come in. Because your job in all of this – and really your only job – is to define a partition key for the container.

Selecting a Partition Key

The single most important thing you need to do when you create a container is to define its partition key. This could be any property in your data, but choosing the best property will scale your data for storage and throughput in the most efficient way. And conversely, some properties will be a poor choice, and will thus impede your ability to scale. So even though you don’t need to handle partitioning yourself, understanding how Cosmos DB internally partitions your data, combined with an understanding of how your data will be most typically accessed, will help you make the right choice.

Partition key values are hashed

For each item you write to the container, Cosmos DB calculates a hash on the property value that you’ve designated as the partition key, and it uses that hash to determine which physical partition the item should be stored in. This means that all the items in your container with the same value for the property you’ve chosen for the partition key will always be stored physically together in the same partition, guaranteed. They will never, ever, be spread across multiple partitions.

Physical partitions host multiple logical partitions

However, this certainly does not mean that Cosmos DB creates one partition for each unique partition key value. Your container can grow elastically, but the partitions themselves have a fixed storage capacity. So one physical partition per partition key (i.e., logical partition) is far from ideal, because you’d potentially wind up having Cosmos DB create far more partitions than are actually necessary, each with a large amount of free space that will never be used.

But as I said, Cosmos DB is much smarter than that, because it also uses a range algorithm to co-locate multiple partition keys within a single partition. This is totally transparent to you however – your data gets partitioned by the partition key that you choose, but then which physical partition is used to store an item, and what other partition keys (logical partitions) happen to be co-located on that same partition – well that’s all insignificant and irrelevant as far as we’re concerned.

Two primary considerations

So now you understand that all the items with the same partition key will always live together in the same partition, which makes it easier to figure out what property in your data might or might not be a good candidate to use for the partition key.

First, queries will be most efficient if they can be serviced by looking at one partition, as opposed to spreading the query to work across multiple partitions. That’s why, typically, you always want to supply the partition key value with any query you want to run, so that Cosmos DB can go directly to the partition where it knows all the data that could possibly be returned by your query lives. You can certainly override this, and tell Cosmos DB to query across all the partitions in your collection, but you would definitely want to pick a partition key where such a query would be the exception, rather than the general rule, which would be that most typical queries in your application (say 80% or more) could be scoped to a single partition key value.

The same is true of updates, because – as I’ll blog about soon – you can write server-side stored procedures that update multiple documents in a single transaction, so that all the updates succeed or fail together as a whole, and this is possible only across multiple documents that have the same partition key value, which is another thing to think about when defining the partition key for a container.

You also want to choose a value that won’t introduce bottlenecks for storage or throughput. Something that can be somewhat uniformly distributed across partitions, so that storage and throughput can be somewhat evenly spread within the container.

Let’s dig a little deeper with a few different scenarios. Here I’ve got a container that uses city as the partition key:

Like I’ve been explaining, all the items for any city are stored together within the same partition, and each partition is co-locating multiple partition keys. It’s not a perfect distribution – and it never will be – but at least at this point, it seems we can somewhat evenly distribute our data within the container using city as the partition key. Furthermore, any query within a given city can be serviced by a single partition, and multiple items in the same city can be updated in a single transaction using a stored procedure.

Now, what happens when we need to grow this data? Say we start getting more data for Chicago, but can already see the partition hosting Chicago is starting to run low on headroom. So when you start adding more Chicago data, Cosmos DB will at some point – automatically and transparently – split the partition. That is, it will create a new partition, and move roughly half the data from the Chicago partition into the new partition. In this case, Berlin got moved out as well as Chicago, and now there is sufficient room for Chicago to continue growing:

But is city still the best choice to use here? If you have a few cities that are significantly larger than others, or that are queried significantly more often than others, then perhaps a finer granularity might be better, like zip code, for example. That will yield many more distinct values than city, and ideally, you want a partition key with many distinct values – like hundreds or thousands, at a minimum.

Choosing the Right Partition Key

Ultimately, only you will be able to make the best choice, because the best choice will be driven by the typical data access patterns in your particular application. I have already explained that the choice you make determines if a query can be serviced by a single partition – which is preferred, and which should be the most common case – or if multiple partitions need to be looked at. And it also determines which items can be updated together in a single transaction, using a stored procedure – and which cannot.

So with that in mind, let’s see what would make a good partition key for the different kinds of applications you might be building.

User Profile Data

Typical social networking applications store user profile data, and very often the user ID is a good partition key choice for this type of application. It means that you can query across all the posts of a given user, and Cosmos DB can service the query by looking at just the one partition where all of that user’s posts are stored. And, for example, you could write a stored procedure that updates the location property of multiple posts belonging to any user, and those updates would occur inside of a transaction – they would all commit, or they would all rollback, together.

IOT

IoT applications deal with the so-called Internet of Things… where you’re storing telemetry data from all sorts of devices – it could be refrigerators, or automobiles – whatever. The device ID is typically used for IoT apps, so for example, if you’re storing information about different cars, you would go with the VIN number, the vehicle identification number that uniquely identifies each distinct automobile. Queries against any particular car, or device, can then be serviced by a single partition, and multiple items for the same car or device can then be updated together in a transaction by writing a stored procedure.

Multitenant

In a multi-tenant architecture, you provide storage for different customers, or tenants. Each tenant needs to be isolated, so the tenant ID seems like a natural candidate to choose for the partition key. Because then each tenant can run their own queries against a single partition, and transactionally update their own data in that partition using a stored procedure, completely isolated from other tenants.

Writes distributed

You also need to think about how your application writes to the container, because you want to make sure that they get spread somewhat uniformly across all the partition within the container. Say you’re building a social networking app. I mentioned that user ID might be a good choice, but what about creation date? Well, creation date is a bad choice – for any type of application, really – because of the way the application is going to write data.

So from a storage perspective, we do get nice uniformity by using the creation date for the partition key, and, as time marches on, Cosmos DB will automatically add new partitions for new data, as you’ve already seen:

But the problem is that – when the application writes new data, the writes will always be directed to the same partition, based on whatever day it is. This results in what’s called a hot partition, where we have a bottleneck that’s going to quickly consume a great deal more of the reserved throughput you’ve provisioned for the container. Specifically, Cosmos DB evenly distributes your provisioned throughput across all the physical partitions in the container. So in this case where there are four partitions, if you are provisioning (and thus paying for) 4,000 RU/sec (request units per second), you’ll only get the performance of about 1,000 RU/sec:

That’s why user ID is a much better choice. Because then, writes get directed to different partitions, depending on the user. So throughout the day, as user profile data gets written to the container, those writes are being much more evenly distributed, with user ID as the partition key.

Multiple containers

And then you need to consider that, in some scenarios, you can’t settle on one good partition key for a single container. In these cases, you’ll create multiple containers in the database, each with different partition keys, and then store different data in each container, based on usage.

The multi-tenant scenario makes a perfect example:

We’re partitioning by tenant ID, but clearly tenant number 10 is much bigger than all the other tenants. That could be because all the other tenants are, let’s say, small mom-and-pop shops, while tenant number 10 is some huge Fortune 500 company like Toyota. Compared to tenant number 10, all the other tenants have modest storage and throughput requirements.

This too creates a hot partition. First, look at storage. Partitions have a fixed size limit, which happens to be 10 gigabytes today, though that will likely change to 100 gigabytes in the future. But that limit is almost irrelevant, because independent of that, there is very uneven distribution of throughput. Tenant number 10 is much busier than the other tenants, so most writes are going to that one partition. And this is to the detriment of all the other tenants, because the reserved throughput is provisioned for the entire container. Furthermore, given the significantly larger volume of data for tenant number 10, it’s likely that large tenant will benefit from partitioning on some fine-grained property within their own data, and not just on the entire tenant ID, which is the partition key for the container.

And that’s a perfect example of when you’ll create a second container for your database. In this case, we can dedicate the entire second container to tenant number 10, and – let’s say it’s Toyota – that container can be partitioned by VIN number:

As a result, we have good uniformity across the entire database. One container handles small-sized tenants. This container is partitioned by tenant ID, and might be provisioned for relatively low throughput that gets shared by all the tenants in the container. And the other individual container handles the large tenant, Toyota. This container is partitioned by VIN number, and might even reserve more throughput for all of Toyota than the first container reserves for all smaller clients combined.

Learning Azure Cosmos DB

Hey everyone!

I’m extremely delighted to announce the launch of my brand new Pluralsight course on Azure Cosmos DB.

This is a 10-module course that targets developers – and that’s you!

The course should be live on Pluralsight in the next few days. If you’re interested in building next-generation global apps for the web and mobile, just join me for about 6 hours, and get up to speed in a heartbeat!

Here’s what the course delivers:

Part 1: Introduction and Overview

We begin by defining NoSQL, explaining big data, and describing the characteristics of a NoSQL Database. Then we introduce Cosmos DB with a high-level discussion around global distribution, server-side partitioning, and its multi-model capabilities (with support for documents, tables, and graphs). We also introduce the local emulator, which lets you develop for Cosmos DB without an internet connection or Azure subscription. This module has several demos showing how to create an Azure Cosmos DB account, how to download and install the local emulator, and how to use the SQL API to create a collection, populate it with documents, and query for documents using Cosmos DB SQL.

Part 2: Tuning for Throughput and Performance

This module explains how to provision throughput in Cosmos DB. We begin by learning about request units, or RUs. With RUs, you don’t worry about hardware concerns, like CPU, memory, or I/O. Instead, Cosmos DB delivers predictable performance by letting you reserve as many RUs per second that your application needs, and will throttle requests if you start to exceed to throughput limits that you reserve. The portal has many customizable charts and graphs that let you monitor the request unit consumption of your database, and you can whiteboard the cost to estimate your throughput needs using the online request unit calculator. The module concludes with an explanation of how Cosmos DB pricing is divided between storage and throughput.

Part 3: Horizontal Partitioning

This module discusses how Cosmos DB achieves elastic scale via server-side horizontal partitioning. We explain how containers are logical resources that Cosmos DB manages by creating and replicating multiple physical partitions to scale both storage and throughput. For best results, we discuss the considerations to keep in mind when choosing a partition key, which groups related items together for queries, and supports updating multiple items in a transaction using stored procedures. The module concludes by showing how to enable cross-partition queries, where Cosmos DB automatically fans out the query to multiple partitions, and aggregates each partition’s results into a single result for the query.

Part 4: Globally Distributing Data

In this module, we explore the geo-replication capabilities in Cosmos DB, also called turnkey global distribution. In addition to local replication within one Azure data center, Cosmos DB can replicate your database to any number of other Azure data centers in regions located throughout the world. This brings your data closer to your users, giving them low latency and high availability wherever they are located. You will see how easy it is to enable global distribution, and configure your application with a failover sequence of preferred data. We wrap up the module by discussing consistency, and explaining the five consistency levels that let you balance latency and availability with stale reads that occur when querying replicas that are not up to date.

Part 5: Data Modeling for the SQL API

Using the SQL API, you model items in the database as JSON documents. This module starts by calling out some of the key differences in data modeling between a traditional relational database platform (like SQL Server) and a JSON document database (like Cosmos DB with the SQL API). We then examine the special properties, including the resource ID and self-link properties that uniquely identify each resource, and the URI factory which simplifies the task of constructing the self-link to any resource in the database. The module concludes by showing how the Data Migration Tool can be used to transform and import data from SQL Server to Cosmos DB.

Part 6: Querying Documents

This module explores the special version of SQL used by Cosmos DB for querying JSON documents in a collection
with the SQL API. We explain how the query language is rooted in JSON and JavaScript semantics, and then
dive into numerous demos that show how to filter, sort, iterate arrays, and perform intra-document joins,
using the many available operators and built-in math, type checking, string, array, aggregate, and spatial
functions provided by Cosmos DB SQL.

Part 7: Programming with the .NET SDK

This module shows how to use the .NET SDK with the SQL API to build client applications for Cosmos DB. We cover how to use the SDK for working with databases and collections, creating and querying documents, indexing, and resource tokens based on users and permissions.

Part 8: Programming the Cosmos DB Server

This module teaches you how to write server-side code that runs within the Cosmos DB service, including stored procedures, triggers, and user-defined functions. We also show you how to cope with throttled requests, and how to implement a continuation model for long-running processes.

Part 9: Using the Table API

This module explains the Table API in Cosmos DB, which provides 100% compatibility with the original Azure Table Storage NoSQL service, but adds predictable reserved throughput, and automatic indexing. We discuss the key-value data model used by the Table API, explain when it makes sense to use this API and when it doesn’t, and show how to migrate an existing Azure Table Storage account and application to Cosmos DB and the Table API.

Part 10: Using the Gremlin API

This module explains the Gremlin API, which provides a graph data model over Cosmos DB. This API is based on the Apache TinkerPop open source project, and defines a set of steps that can be used to populate a graph with vertices (entities) and edges (relationships). After populating the graph, you can run Gremlin queries that traverse the many relationships defined in the database. We’ll create several demos, including a simple graph, airport model, and a multi-model comic book catalog.

I hope this has whet your appetite to learn more about Cosmos DB!