Multiple Partition Keys in Azure Cosmos DB (Part 3) – Azure Functions with Change Feed Trigger

Welcome to part 3 in this three-part series on using the Azure Cosmos DB change feed to implement a synchronized solution between multiple containers that store the same documents, each with a different partition key.

Start with part 1 for an overview of the problem we’re trying to solve. In a nutshell, we’re using the change feed to synchronize two containers, so that changes made to one container are replicated to the other. Thus the two containers have the same data in them, but each one is partitioned by a different partition key to best suit one of two common data access patterns. Part 1 presented a solution that queries the change feed directly. In part 2, I showed you how to use the Change Feed Processor (CFP) library instead, which provides numerous advantages over the low-level approach in part 1.

This post concludes the three-part series, and shows how to deploy the solution to execute in a serverless cloud environment using Azure Functions.

Building the Azure Functions Project

As you learned in part 2, the benefits of using the CFP library are clear. But we still need a way to deploy our host to run in Azure. We could certainly refactor the CfpLibraryHost console application as a web job. But the simplest way to achieve this is by using Azure Functions with a Cosmos DB trigger.

If you’re not already familiar with Azure Functions, they let you write individual methods (functions), which you deploy for execution in a serverless environment hosted on Azure. The term “serverless” here means without also having to create an Azure app service to deploy your host.

In the current ChangeFeedDemos solution, create a new Azure Functions project and name it CfpLibraryFunction. Visual Studio will offer up a selection of “trigger” templates to choose from, including the Cosmos DB Trigger that we’ll be using. However, we’ll just choose the Empty template and configure the project manually:

Next, add the Cosmos DB WebJob Extensions to the new project from the NuGet package Microsoft.Azure.WebJobs.Extensions.CosmosDB:

Now create a new class named MultiPkCollectionFunction in the CfpLibraryFunction project with the following code:

using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;

namespace CosmosDbDemos.MultiPkCollection.ChangeFeedFunction
{
  public static class MultiPkFunction
  {
    private const string CosmosEndpoint = "https://<account-name>.documents.azure.com:443/";
    private const string CosmosMasterKey = "<account-key>";

    private static readonly DocumentClient _client;

    static MultiPkFunction()
    {
      _client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey);
    }

    [FunctionName("MultiPkCollectionFunction")]
    public static void Run(
      [CosmosDBTrigger(
        databaseName: "multipk",
        collectionName: "byCity",
        ConnectionStringSetting = "CosmosDbConnectionString",
        LeaseCollectionName = "lease")]
      IReadOnlyList<Document> documents,
      ILogger log)
    {
      foreach (var document in documents)
      {
        var jDocument = JsonConvert.DeserializeObject<JObject>(document.ToString());

        if (jDocument["ttl"] == null)
        {
          var stateCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byState");
          _client.UpsertDocumentAsync(stateCollUri, document).Wait();
          log.LogInformation($"Upserted document id {document.Id} in byState collection");
        }
        else
        {
          var stateDocUri = UriFactory.CreateDocumentUri("multipk", "byState", document.Id);
          _client.DeleteDocumentAsync(stateDocUri, new RequestOptions
          {
            PartitionKey = new PartitionKey(jDocument["state"].Value<string>())
          }).Wait();
          log.LogInformation($"Deleted document id {document.Id} in byState collection");
        }
      }
    }
  }
}

Note that this is all the code we need. While the CFP library host project required less code than querying the change feed directly, it still required additional code to build and host the processor – plus you need a full Azure app service to deploy in the cloud.

Using an Azure Function, it’s super lean. We have the same minimal business logic as before, and still leverage all the CFP library goodness as before, but being “serverless,” we’re ready to deploy this as a standalone function in the cloud, without creating a full Azure app service. Plus, you can test the Azure Function locally using Visual Studio and the debugger, so that you can deploy to the cloud with confidence. Folks, it just doesn’t get much better than this!

The CosmosDBTrigger binding in the signature of the Run method causes this code to fire on every notification that get raised out of the CFP library that’s watching the byCity container for changes. The binding tells the trigger to monitor the byCity collection in the multipk database, and to persist leases to the container named lease. It also references a connection string setting named CosmosDbConnectionString to obtain the necessary endpoint and master key, which we’ll configure in a moment.

The actual implementation of the Run method is virtually identical to the ProcessChangesAsync method in our CFP library host project from part 2. The only difference is that we log output using the ILogger instance passed into our Azure Function, rather than using Console.WriteLine to log output in our CFP library host project.

Setting the connection string

Next we need to set the connection string for the trigger that we are referencing in the signature of the Run method. Azure Functions support configuration files similar to the way appsettings.json, app.config, or web.config is used for typical .NET applications. For Azure Functions, this file is named local.settings.json. Open this file and add the CosmosDbConnectionString setting that the trigger will use to monitor the byCity collection for changes:

{
  "IsEncrypted": false,
  "Values": {
    "CosmosDbConnectionString": "AccountEndpoint=https://<account-name>.documents.azure.com:443/;AccountKey=<account-key>;",
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet"
  }
} 

Testing locally

It’s very easy to test and debug our code locally, before deploying it to Azure.

Let’s see it work by running the two projects ChangeFeedDirect and CfpLibraryFunction at the same time. The ChangeFeedDirect console app provides an interactive menu for creating the database with the byCity, byState, and lease containers, and also for inserting, updating, and deleting documents in byCity container. The CfpLibraryFunction app, meanwhile, sits and waits to be notified of changes as they occur in the byCity container, so that they can be synchronized to the byState container.

Start the first console app (ChangeFeedDirect) and run the DB command to drop and recreate the database with empty byCity and byState containers. Then run CL to create the lease container.

Without closing the first console app, start the second one (CfpLibraryFunction). To do this, right-click the CfpLibraryFunction project in Solution Explorer and choose Debug, Start New Instance. You will see a console app open up which hosts the local webjob for our Azure function:

Wait a few moments until the local Azure Functions host indicates that the application has started:

Back in the first console app (ChangeFeedDirect), run the CD command to create three documents in the byCity container. Then run the UD command to update a document, and the DD command to delete another document (by setting its ttl property). Do not run SC to manually sync with the direct change feed queries from part 1. Instead, sit back and watch it happen automatically with the CFP library under control of the local Azure Functions webjob host:

Before we deploy to Azure, let re-initialize the database. Back in the first console app (ChangeFeedDirect), run the DB command to drop and recreate the database with empty byCity and byState containers. Then run CL to create the lease container.

Deploying to Azure

Satisfied that our code is working properly, we can now deploy the Azure function to run in the cloud.

Right-click the CfpLibraryFunction project and choose Publish.

Select Create New and then click Publish.

Assign a name for the deployed function, and select the desired subscription, resource group, hosting plan, and storage account (or just accept the defaults):

Now click Create and wait for the deployment to complete.

Over in the Azure portal, navigate to All Resources, filter on types for App Services, and locate your new Azure Functions service:

Click on the App Service, and then click Configuration:

Next, click New Application Setting:

This is the cloud equivalent of the local.settings.json file, where you need to plug in the same CosmosDbConnection string setting that you provided earlier when testing locally:

Now click Update, Save, and close the application settings blade.

You’re all done! To watch it work, click on MultiPkCollectionFunction, scroll down, and expand the Logs panel:

Back in the ChangeFeedDirect console app, run the CD command to create three documents in the byCity container. Then run the UD command to update a document, and the DD command to delete another document (by setting its ttl property). Do not run SC to manually sync with the direct change feed queries from part 1. Instead, sit back and watch it happen automatically with the CFP library running in our Azure function. Within a few moments, the Azure function trigger fires, and our code synchronizes them to the byState container:

What’s Next?

A few more pointers before concluding:

Conclusion

This concludes the three-post series on using the Cosmos DB change feed to synchronize two containers with different partition keys over the same data. Thanks for reading, and happy coding!

Advertisements

Multiple Partition Keys in Azure Cosmos DB (Part 2) – Using the Change Feed Processor Library

Welcome to part 2 in this series of blog posts on using the Azure Cosmos DB change feed to implement a synchronized solution between multiple containers that store the same documents, each with a different partition key.

Start with part 1 for an overview of the problem we’re trying to solve. In a nutshell, we’re using the change feed to synchronize two containers, so that changes made to one container are replicated to the other. Thus the two containers have the same data in them, but each one is partitioned by a different partition key to best suit one of two common data access patterns. Part 1 presented a solution that queries the change feed directly. In this post, we’ll use the Change Feed Processor (CFP) library instead, which provides numerous advantages over the low-level approach we took in part 1.

What is the CFP Library?

For the solution we built in part 1 to consume the change feed at scale, much more work needs to be done. For example, the change feed on each partition key range of the container can be consumed concurrently, so we could add multithreading logic to parallelize those queries. Long change feeds can also be consumed in chunks, using continuation tokens that we could persist as a “lease,” so that new clients can resume consumption where previous clients left off. We also want the sync automated, so that we don’t need to poll manually.

Fortunately, the Change Feed Processor (CFP) library handles all these details for you. In most cases, unless you have very custom requirements, the CFP library is the way to go over directly querying the change feed yourself.

The CFP library automatically discovers the container’s partition key ranges and parallelizes change feed consumption across each of them internally. This means that you only consume one logical change feed for the entire container, and don’t worry at all about the individual change feeds behind the individual partition key ranges. The library relies on a dedicated “lease” container to persist lease documents that maintain state for each partition key range. Each lease represents a checkpoint in time which tracks continuation tokens for chunking long change feeds across multiple clients. Thus, as you increase the number of clients, the overall change feed processing gets divided across them evenly.

Using the lease container also means we won’t need to rely on our own “sync” document that we were using to track the last time we queried the change feed directly. Therefore, our code will be simpler than the previous solution from part 1, yet we will simultaneously inherit all the aforementioned capabilities that the CFP library provides.

Building on the ChangeFeedDemo solution we created in part 1, open the Program.cs file in the ChangeFeedDirect project. In the Run method, add a line to display a menu item for creating the lease container:

Console.WriteLine("CL - Create lease collection"); 

Further down in the method, add an “else if” condition for the new menu item that calls CreateLeaseCollection:

else if (input == "CL") await CreateLeaseCollection();

Now add the CreateLeaseCollection method:

private static async Task CreateLeaseCollection()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    var dbUri = UriFactory.CreateDatabaseUri("multipk");

    var partitionKeyDefinition = new PartitionKeyDefinition();
    partitionKeyDefinition.Paths.Add("/id");
    await client.CreateDocumentCollectionAsync(dbUri, new DocumentCollection
    {
      Id = "lease",
      PartitionKey = partitionKeyDefinition
    }, new RequestOptions { OfferThroughput = 400 });
  }

  Console.WriteLine("Created lease collection");
} 

This code creates a container named lease provisioned for 400 RUs per second. Also notice that the lease container uses the id property itself as its partition key.

Next, create the host project, which is essentially the client. When an instance of this host runs, it will listen for and consume changes across the entire container. Then, as you spin up additional instances of the host, the work to consume changes across the entire container will be distributed uniformly across all the host instances by the CFP library.

In the current ChangeFeedDemos solution, create a new .NET Core console application and name the project CfpLibraryHost. Next, add the CFP library to the new project from the NuGet package Microsoft.Azure.DocumentDB.ChangeFeedProcessor:

Replace all the code in Program.cs as follows:

using System;
using System.Threading.Tasks;

namespace CfpLibraryHost
{
  class Program
  {
    static void Main(string[] args)
    {
      var host = new ChangeFeedProcessorHost();
      Task.Run(host.RunProcessor).Wait();

      Console.ReadKey();
    }
  }
} 

This startup code initializes and runs the host. It then enters a wait state during which the host will be notified automatically by the CFP library whenever changes are available for consumption.

Next, to implement the host, create a new class named ChangeFeedProcessorHost with the following code:

using Microsoft.Azure.Documents.ChangeFeedProcessor;
using System;
using System.Threading.Tasks;

namespace CfpLibraryHost
{
  public class ChangeFeedProcessorHost
  {
    public async Task RunProcessor()
    {
      var monitoredCollection = new DocumentCollectionInfo
      {
        Uri = new Uri(Constants.CosmosEndpoint),
        MasterKey = Constants.CosmosMasterKey,
        DatabaseName = "multipk",
        CollectionName = "byCity",
      };

      var leaseCollection = new DocumentCollectionInfo
      {
        Uri = new Uri(Constants.CosmosEndpoint),
        MasterKey = Constants.CosmosMasterKey,
        DatabaseName = "multipk",
        CollectionName = "lease",
      };

      var builder = new ChangeFeedProcessorBuilder();

      builder
        .WithHostName($"Change Feed Processor Host - {Guid.NewGuid()}")
        .WithFeedCollection(monitoredCollection)
        .WithLeaseCollection(leaseCollection)
        .WithObserverFactory(new MultiPkCollectionObserverFactory());

      var processor = await builder.BuildAsync();
      await processor.StartAsync();

      Console.WriteLine("Started change feed processor - press any key to stop");
      Console.ReadKey();

      await processor.StopAsync();
    }
  }

} 

The host builds a processor configured to monitor the byCity container, using the lease container to track internal state. The processor also references an observer factory which, in turn, supplies an observer to consume the changes as they happen.

This code relies on defined constants for the URI and master key of the Cosmos DB account. Define them as follows in a new class called Constants.cs:

namespace CfpLibraryHost
{
  public class Constants
  {
    public const string CosmosEndpoint = "https://<account-name>.documents.azure.com:443/";
    public const string CosmosMasterKey = "<account-key>";
  }
} 

The observer factory is super-simple. Just create a class that implements IChangeFeedObserverFactory, and return an observer instance (where the real logic will go) that implements IChangeFeedObserver. Name the class MultiPkCollectionObserverFactory, as referenced by WithObserverFactory in the host.

using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;

namespace CfpLibraryHost
{
  public class MultiPkCollectionObserverFactory : IChangeFeedObserverFactory
  {
    public IChangeFeedObserver CreateObserver() =>
      new MultiPkCollectionObserver();
  }
} 

Finally, create the observer class itself. Name this class MultiPkCollectionObserver, as referenced by the CreateObserver method in the factory class:

using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
using Microsoft.Azure.Documents.Client;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace CfpLibraryHost
{
  public class MultiPkCollectionObserver : IChangeFeedObserver
  {
    private static DocumentClient _client;

    static MultiPkCollectionObserver() =>
      _client = new DocumentClient(new Uri(Constants.CosmosEndpoint), Constants.CosmosMasterKey);

    public async Task OpenAsync(IChangeFeedObserverContext context)
    {
      Console.WriteLine($"Start observing partition key range {context.PartitionKeyRangeId}");
    }

    public async Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
    {
      Console.WriteLine($"Stop observing partition key range {context.PartitionKeyRangeId} because {reason}");
    }

    public async Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> documents, CancellationToken cancellationToken)
    {
      foreach (var document in documents)
      {
        var jDocument = JsonConvert.DeserializeObject<JObject>(document.ToString());

        if (jDocument["ttl"] == null)
        {
          var stateCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byState");
          await _client.UpsertDocumentAsync(stateCollUri, document);
          Console.WriteLine($"Upserted document id {document.Id} in byState collection");
        }
        else
        {
          var stateDocUri = UriFactory.CreateDocumentUri("multipk", "byState", document.Id);
          await _client.DeleteDocumentAsync(stateDocUri, new RequestOptions
          {
            PartitionKey = new PartitionKey(jDocument["state"].Value<string>())
          });
          Console.WriteLine($"Deleted document id {document.Id} in byState collection");
        }
      }
    }
  }
} 

Our synchronization logic is embedded inside the ProcessChangesAsync method. The benefits of using the CFP library should really stand out now, compared to our solution from part 1. This code is significantly simpler yet dramatically more robust than the SyncCollections method we wrote in the ChangeFeedDirect project. We are no longer concerned with discovering and iterating partition key ranges, nor do we need to track the last sync time ourselves. Instead, the CFP library handles those details, plus it parallelizes the work and uniformly distributes large-scale processing across multiple client instances. Our code is now limited to just the business logic we require, which is the same logic as before: Upsert new or changed documents into the byState container as they occur in the byCity container, or delete them if their TTL property is set.

Let’s see it work. We’ll want to run both projects at the same time. The ChangeFeedDirect console app provides an interactive menu for creating the database with the byCity, byState, and lease containers, and also for inserting, updating, and deleting documents in byCity container. The CfpLibraryHost app, meanwhile, sits and waits to be notified of changes as they occur in the byCity container, so that they can be synchronized to the byState container.

Start the first console app (ChangeFeedDirect) and run the DB command to drop and recreate the database with empty byCity and byState containers. Then run CL to create the lease container.

Without closing the first console app, start the second one (CfpLibraryHost). To do this, right-click the CfpLibraryHost project in Solution Explorer and choose Debug, Start New Instance. The host fires up, and now just sits and waits for changes from the CFP library:

Started change feed processor - press any key to stop
Start observing partition key range 0

Back in the first console app (ChangeFeedDirect), run the CD command to create three documents in the byCity container. Then run the UD command to update a document, and the DD command to delete another document (by setting its ttl property). Do not run SC to manually sync with the direct change feed queries from part 1. Instead, sit back and watch it happen automatically with the CFP library. Within a few moments, the host is notified of the changes, and our observer class synchronizes them to the byState container:

Started change feed processor - press any key to stop
Start observing partition key range 0
Upserted document id 63135c76-d5c6-44a8-acd3-67dfc54faae6 in byState collection
Upserted document id 06296937-de8b-4d72-805d-f3120bf06403 in byState collection
Upserted document id 0b222279-06df-487e-b746-7cb347acdf18 in byState collection
Upserted document id 63135c76-d5c6-44a8-acd3-67dfc54faae6 in byState collection
Deleted document id 0b222279-06df-487e-b746-7cb347acdf18 in byState collection 

The beauty here is that the client does not need to manually poll the change feed for updates. And we can just spin up as many clients as needed to scale out the processing for very large containers with many changes.

What’s Next?

The benefits of using the CFP library are clear, but we still need to deploy our host to run in Azure. We could certainly deploy the CfpLibraryHost console application to a web app in App Service as an Azure WebJob. But the simplest way to achieve this is by using Azure Functions with a Cosmos DB trigger.

So tune in to part 3, where I’ll conclude this three-part series by showing you how to deploy our solution to Azure using Azure Functions.

Multiple Partition Keys in Azure Cosmos DB (Part 1) – Querying the Change Feed Directly

To begin, let’s be clear that an Azure Cosmos DB container can have only one partition key. I say this from the start in case “multiple partition keys” in the title is somehow misinterpreted to imply otherwise. You always need to come up with a single property to serve as the partition key for every container and choosing the best property for this can sometimes be difficult.

Understanding the Problem

Making the right choice requires intimate knowledge about the data access patterns of your users and applications. You also need to understand how horizontal partitioning works behind the scenes in terms of storage and throughput, how queries and stored procedures are scoped by partition key, and the performance implications of running cross-partition queries and fan-out queries. So there are many considerations to take into account before settling on the one property to partition a container on. I discuss all this at length in my previous blog post Horizontal Partitioning in Azure Cosmos DB.

But here’s the rub. What if, after all the analysis, you come to realize that you simply cannot settle on a single property that serves as an ideal partition key for all scenarios? Let’s say for example, from a write perspective, you find one property will best distribute writes uniformly across all the partitions in the container. But from a query perspective, you find that using the same partition key results in too much fanning out. Or, you might identify two categories of common queries, where it’s roughly 50/50; meaning, about half of all the queries are of one type, and half are of the other. What do you do if the two query categories would each benefit from different partition keys?

Your brain can get caught in an infinite loop over this until you wind up in that state of “analysis paralysis,” where you recognize that there’s just no single best property to choose as the partition key. To break free, you need to think outside the box. Or, let’s say, think outside the container. Because the solution here is to simply create another container that’s a complete “replica” of the first. This second container holds the exact same set of documents as the first but defines a different partition key.

I placed quotes around the word “replica” because this second container is not technically a replica in the true Cosmos DB sense of the word (where, internally, Cosmos DB automatically maintains replicas of the physical partitions in every container). Rather, it’s a manual replica that you maintain yourself. Thus, it’s your job to keep it in sync with changes when they happen in the first container, which is partitioned by a property that’s optimized for writes. As those writes occur in real time, you need to respond by updating the second collection, which is partitioned by a property that’s optimized for queries.

Enter Change Feed

Fortunately, change feed comes to the rescue here. Cosmos DB maintains a persistent record of changes for every container that can be consumed using the change feed. This gives you a reliable mechanism for retrieving changes made to any container, all the way back to the beginning of time. For an introduction to change feed, have a look at my previous blog post Change Feed – Unsung Hero of Azure Cosmos DB

In this three-part series of blog posts, I’ll dive into different techniques you can use for consuming the change feed to synchronize containers:

Let’s get started. Assume that we’ve done our analysis and established that city is the ideal partition key for writes, as well as roughly half of the most common queries our users will be running. But we’ve also determined that state is the ideal partition key for the other (roughly half) commonly executed queries. This means we’ll want one container partitioned by city, and another partitioned by state. And we’ll want to consume the city-partitioned container’s change feed to keep the state-partitioned container in sync with changes as they occur. We’ll then be able to direct our city-based queries to the first container, and our state-based queries to the second container, which then eliminates fan-out queries in both cases.

Setting Up

If you’d like to follow along, you’ll need to be sure your environment is setup properly. First, of course, you’ll need to have a Cosmos DB account. The good news here is that you can get a free 30-day account with the “try cosmos” offering, which doesn’t even require a credit card or Azure subscription (just a free Microsoft account). Even better, there’s no limit to the number of times you can start a new 30-day trial. Create your free account at http://azure.microsoft.com/try/cosmosdb.

You’ll need your account’s endpoint URI and master key to connect to Cosmos DB from C#. To obtain them, head over to your Cosmos DB account in the Azure portal, open the Keys blade, and keep it open so that you can handily copy/paste them into the project.

You’ll also need Visual Studio. I’ll be using Visual Studio 2019, but the latest version of Visual Studio 2017 is fine as well. You can download the free community edition at https://visualstudio.microsoft.com/downloads.

Querying the Change Feed Directly

We’ll begin with the raw approach, which is to query the change feed directly using the SDK. The reality is that you’ll almost never want to go this route, except for the simplest small-scale scenarios. Still, it’s worth taking some time to examine this approach first, as I think you’ll benefit from learning how the change feed operates at a low level, and it will enhance your appreciation of the Change Feed Processor (CFP) library which I’ll cover in the next blog post of this series.

Fire up Visual Studio, create a new .NET Core console application, and name the project ChangeFeedDirect, and name the solution ChangeFeedDemos (we’ll be adding more projects to this solution in parts 2 and 3 of this blog series). Next, add the SDK to the ChangeFeedDirect project from the NuGet package Microsoft.Azure.DocumentDB.Core:

We’ll write some basic code to create a database with the two containers, with additional methods to create, update, and delete documents in the first container (partitioned by city). Then we’ll write our “sync” method that directly queries the change feed on the first container, in order to update the second container (partitioned by state) and reflect all the changes made.

Note: Our code (and the SDK) refers to containers as collections.

We’ll write all our code inside the Program.cs file. First, update the using statements at the very top of the file to get the right namespaces imported:

using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

Next, at the very top of the Program class, set two private string constants for your Cosmos DB account’s endpoint and master key. You can simply copy them from the Keys blade in the Azure portal, and paste them right into the code:

private const string CosmosEndpoint = "https://<account-name>.documents.azure.com:443/";
private const string CosmosMasterKey = "<account-key>";

Now edit the Main method with one line to call the Run method:

static void Main(string[] args)
{
  Task.Run(Run).Wait();
}

This one-liner invokes the Run method and waits for it to complete. Add the Run method next, which displays a menu and calls the various methods defined for each menu option:

private static async Task Run()
{
  while (true)
  {
    Console.WriteLine("Menu");
    Console.WriteLine();
    Console.WriteLine("DB - Create database");
    Console.WriteLine("CD - Create documents");
    Console.WriteLine("UD - Update document");
    Console.WriteLine("DD - Delete document");
    Console.WriteLine("SC - Sync collections");
    Console.WriteLine("Q - Quit");
    Console.WriteLine();
    Console.Write("Selection: ");

    var input = Console.ReadLine().ToUpper().Trim();
    if (input == "Q") break;
    else if (input == "DB") await CreateDatabase();
    else if (input == "CD") await CreateDocuments();
    else if (input == "UD") await UpdateDocument();
    else if (input == "DD") await DeleteDocument();
    else if (input == "SC") await SyncCollections();
    else Console.WriteLine("\nInvalid selection; try again\n");
  }
}

We’ll add the menu option methods one at a time, starting with CreateDatabase:

private static async Task CreateDatabase()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    // Create the database

    var dbUri = UriFactory.CreateDatabaseUri("multipk");

    try { await client.DeleteDatabaseAsync(dbUri); } catch { }

    await client.CreateDatabaseAsync(new Database { Id = "multipk" });

    // Create the two collections

    var partitionKeyDefinition = new PartitionKeyDefinition();
    partitionKeyDefinition.Paths.Add("/city");
    await client.CreateDocumentCollectionAsync(dbUri, new DocumentCollection
    {
      Id = "byCity",
      PartitionKey = partitionKeyDefinition,
      DefaultTimeToLive = -1
    }, new RequestOptions { OfferThroughput = 400 });

    partitionKeyDefinition = new PartitionKeyDefinition();
    partitionKeyDefinition.Paths.Add("/state");
    await client.CreateDocumentCollectionAsync(dbUri, new DocumentCollection
    {
      Id = "byState",
      PartitionKey = partitionKeyDefinition
    }, new RequestOptions { OfferThroughput = 400 });

    // Load the sync document into the first collection

    var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");
    var syncDocDef = new
    {
      city = "sync",
      state = "sync",
      lastSync = default(DateTime?)
    };
    await client.CreateDocumentAsync(collUri, syncDocDef);
  }

  Console.WriteLine("Created database for change feed demo");
}

Most of this code is intuitive, even if you’ve never done any Cosmos DB programming before. We first use our endpoint and master key to create a DocumentClient, and then we use the client to create a database named multipk with the two containers in it.

This works by first calling DeleteDatabaseAsync wrapped in a try block with an empty catch block. This effectively results in “delete if exists” behavior to ensure that the multipk database does not exist when we call CreateDatabaseAsync to create it.

Next, we call CreateDocumentCollection twice to create the two containers (again, a collection is a container). We name the first container byCity and assign it a partition key of /city, and we name the second container byState and assign /state as the partition key. Both containers reserve 400 request units (RUs) per second, which is the lowest throughput you can provision.

Notice the DefaultTimeToLive = -1 option applied to the first container. At the time of this writing, change feed does not support deletes. That is, if you delete a document from a container, it does not get picked up by the change feed. This may be supported in the future, but for now, the TTL (time to live) feature provides a very simple way to cope with deletions. Rather than physically deleting documents from the first container, we’ll just update them with a TTL of 60 seconds. That gives us 60 seconds to detect the update in the change feed, so that we can physically delete the corresponding document in the second container. Then, 60 seconds later, Cosmos DB will automatically physically delete the document from the first container by virtue of its TTL setting. You’ll see all this work in a moment when we run the code.

The other point to call out is the creation of our sync document, which is a special metadata document that won’t get copied over to the second container. Instead, we’ll use it to persist a timestamp to keep track of the last time we synchronized the containers. This way, each time we sync, we can request the correct point in time from which to consume changes that have occurred since the previous sync. The document is initialized with a lastSync value of null so that our first sync will consume the change feed from the beginning of time. Then lastSync is updated so that the next sync picks up precisely where the first one left off.

Now let’s implement CreateDocuments. This method simply populates three documents in the first container:

private static async Task CreateDocuments()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

    var dataDocDef = new
    {
      city = "Brooklyn",
      state = "NY",
      slogan = "Kings County"
    };
    await client.CreateDocumentAsync(collUri, dataDocDef);

    dataDocDef = new
    {
      city = "Los Angeles",
      state = "CA",
      slogan = "Golden"
    };
    await client.CreateDocumentAsync(collUri, dataDocDef);

    dataDocDef = new
    {
      city = "Orlando",
      state = "FL",
      slogan = "Sunshine"
    };
    await client.CreateDocumentAsync(collUri, dataDocDef);
  }

  Console.WriteLine("Created 3 documents in city collection");
}

Notice that all three documents have city and state properties, where the city property is the partition key for the container that we’re creating these documents in. The state property is the partition key for the second container, where our sync method will create copies of these documents as it picks them up from the change feed. The slogan property is just an ordinary document property. And although we aren’t explicitly supplying an id property, the SDK will automatically generate one for each document with a GUID as the id value.

We’ll also have an UpdateDocument method to perform a change on one of the documents:

private static async Task UpdateDocument()
{
  var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    // Update a document
    var brooklynDoc = client
      .CreateDocumentQuery(collUri, "SELECT * FROM c WHERE c.city = 'Brooklyn'")
      .AsEnumerable()
      .FirstOrDefault();

    brooklynDoc.slogan = "Fuhgettaboutit! " + Guid.NewGuid().ToString();
    await client.ReplaceDocumentAsync(brooklynDoc._self, brooklynDoc);
  }

  Console.WriteLine("Updated Brooklyn document in city collection");
}

This code retrieves the Brooklyn document, updates the slogan property, and calls ReplaceDocumentAsync to persist the change back to the container.

Next comes the DeleteDocument method:

private static async Task DeleteDocument()
{
  var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    // Delete a document (set time-to-live at 60 seconds)
    var orlandoDoc = client
      .CreateDocumentQuery(collUri, "SELECT * FROM c WHERE c.city = 'Orlando'")
      .AsEnumerable()
      .FirstOrDefault();

    orlandoDoc.ttl = 60;
    await client.ReplaceDocumentAsync(orlandoDoc._self, orlandoDoc);
  }

  Console.WriteLine("Deleted Orlando document in city collection");
}

Remember that (currently) the change feed doesn’t capture deleted documents, so we’re using the TTL (time to live) technique to keep our deletions in sync. Rather than calling DeleteDocumentAsync to physically delete the Orlando document, we’re simply updating it with a ttl property set to 60 and saving it back to the container with ReplaceDocumentAsync. To the change feed, this is just another update, so our sync method will pick it up normally as you’ll see in a moment. Meanwhile, Cosmos DB will physically delete the Orlando document from the first container in 60 seconds, giving our sync method up to one minute to pick it up from the change feed and delete it from the second container.

And finally, the sync method, which is what this whole discussion is all about. Here’s the code for SyncCollections:

private static async Task SyncCollections()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    var cityCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

    // Obtain last sync time

    var syncDoc = client
      .CreateDocumentQuery(cityCollUri, "SELECT * FROM c WHERE c.city = 'sync'")
      .AsEnumerable()
      .FirstOrDefault();

    var lastSync = (DateTime?)syncDoc.lastSync;

    // Step 1: Gather all the partition key ranges (physical partitions)

    var continuationToken = default(string);
    var partitionKeyRanges = new List<PartitionKeyRange>();
    var loop = true;

    while (loop)
    {
      var partitionKeyRange = await client
        .ReadPartitionKeyRangeFeedAsync(
          cityCollUri,
          new FeedOptions { RequestContinuation = continuationToken });

      partitionKeyRanges.AddRange(partitionKeyRange);
      continuationToken = partitionKeyRange.ResponseContinuation;
      loop = continuationToken != null;
    }

    // Step 2: Consume the change feed for each partition key range

    // (simple demo doesn't scale when continuation tokens are needed
    foreach (var partitionKeyRange in partitionKeyRanges)
    {
      var options = new ChangeFeedOptions
      {
        PartitionKeyRangeId = partitionKeyRange.Id,
        StartFromBeginning = (lastSync == null),
        StartTime = (lastSync == null ? null : lastSync),
      };

      var query = client.CreateDocumentChangeFeedQuery(cityCollUri, options);

      while (query.HasMoreResults)
      {
        var readChangesResponse = await query.ExecuteNextAsync();
        foreach (var changedDocument in readChangesResponse)
        {
          if (changedDocument.city != "sync")
          {
            if (JsonConvert.DeserializeObject(changedDocument.ToString())["ttl"] == null)
            {
              var stateCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byState");
              await client.UpsertDocumentAsync(stateCollUri, changedDocument);
              Console.WriteLine($"Upserted document id {changedDocument.id} in byState collection");
            }
            else
            {
              var stateDocUri = UriFactory.CreateDocumentUri("multipk", "byState", changedDocument.id);
              await client.DeleteDocumentAsync(stateDocUri, new RequestOptions
              {
                PartitionKey = new PartitionKey(changedDocument.state)
              });
              Console.WriteLine($"Deleted document id {changedDocument.id} in byState collection");
            }
          }
        }
      }
    }

    // Update last sync time
    syncDoc.lastSync = DateTime.UtcNow;
    await client.ReplaceDocumentAsync(syncDoc._self, syncDoc);
  }

}

Let’s break this down. First, we grab the last sync time from the sync document in the first container. Remember, this will be null the very first time we run this method. Then, we’re ready to query the change feed, which is a two-step process.

For step 1, we need to discover all the partition key ranges in the container. A partition key range is essentially a set of partition keys. In our small demo, where we have only one document each across three distinct partition keys (cities), Cosmos DB will host all three of these documents inside a single partition key range.

Although there is conceptually only one change feed per container, there is actually one change feed for each partition key range in the container. So step 1 calls ReadPartitionKeyRangeFeedAsync to discover the partition key ranges, with a loop that utilizes a continuation token from the response so that we retrieve all of the partition key ranges into a list.

Then, in step 2, we iterate the list to consume the change feed on each partition key range. Notice the ChangeFeedOptions object that we set on each iteration, which identifies the partition key range in PartitionKeyRangeId, and then sets either StartFromBeginning or StartTime, depending on whether lastSync is null or not. If it’s null (which will be true only on the very first sync), then StartFromBeginning will be set to true and StartTime will be set to null. Otherwise, StartFromBeginning gets set to false, and StartTime gets set to the timestamp from the last sync.

After preparing the options, we call CreateDocumentChangeFeedQuery that returns an iterator. As long as the iterator’s HasMoreResults property is true, we call ExecuteNextAsync on it to fetch the next set of results from the change feed. And here, ultimately, is where we plug in our sync logic.

Each result is a changed document. We know this will always include the sync document, because we’ll be updating it after every sync. This is metadata that we don’t need copied over to the second container each time, so we filter out the sync document by testing the city property for “sync.”

For all other changed documents, it now becomes a matter of performing the appropriate create, update, or delete operation on the second container. First, we check to see if there is a ttl property on the document. Remember that this is our indication of whether this is a delete or not. If the ttl property isn’t present, then it’s either a create or an update. In either case, we handle the change by calling UpsertDocumentAsync on the second container (upsert means “update or insert”).

Otherwise, if we detect the ttl property, then we call DeleteDocumentAsync to delete the document from the second container, knowing that Cosmos DB will delete its counterpart from the first container when the ttl expires.

Let’s test it out. Start the console app and run the DB (create database) and CD (create documents) commands. Then navigate to the Data Explorer in the Azure portal to verify that the database exists with the two containers, and that the byCity container has three documents in it, plus the sync document with a lastSync value of null indicating that no sync has yet occurred:

The byState container should be empty at this point, because we haven’t run our first sync yet:

Back in the console app, run the SC command to sync the containers. This copies all three documents from the first container’s change feed over to the second container, skipping the sync document which we excluded in our code:

Upserted document id 01501a99-e8df-4c3b-9892-ed2eadb81180 in byState collection
Upserted document id fb8d41ae-3aae-4892-bfe9-8c34bc8138d2 in byState collection
Upserted document id 5f5600f3-9f34-4a4d-bdb4-28061a5ab35a in byState collection

Returning to the portal, refresh the data explorer to confirm that the second container now has the same three documents as the first, although here they are partitioned by state rather than city:

Both containers are now in sync. Refresh the first container view, and you’ll see that the lastSync property has been changed from null to a timestamp from when the previous sync ran.

Run the UD command in the console app the update the first container with a change to the slogan property of the Brooklyn, NY document. Now run SC to sync the containers again:

Upserted document id 01501a99-e8df-4c3b-9892-ed2eadb81180 in byState collection

Refresh the second container view, and you’ll see that the slogan property has now been updated there as well.

Finally, run the DD command in the console app the delete the Orlando, FL document from the first container. Remember that this doesn’t actually delete the document, but rather updates it with a ttl property set to 60 seconds. Then run SC to sync the containers again:

Deleted document id 5f5600f3-9f34-4a4d-bdb4-28061a5ab35a in byState collection

You can now confirm that the Orlando, FL document is deleted from the second container, and within a minute (upon ttl expiration), you’ll see that it gets deleted from the first container as well.

However, don’t wait longer than a minute after setting the ttl before running the sync or you will run out of time. Cosmos DB will delete the document from the first container when the ttl expires, at which point it will disappear from the change feed and you will lose your chance to delete it from the second container.

What’s Next?

It didn’t take that much effort to consume the change feed, but that’s only because we have a tiny container with just a handful of changes, and we’re manually invoking each sync. To consume the change feed at scale, much more work needs to be done. For example, the change feed on each partition key range of the container can be consumed concurrently, so we could add multithreading logic to parallelize those queries. Long change feeds can also be consumed in chunks, using continuation tokens that we could persist as a “lease,” so that new clients can resume consumption where previous clients left off. We also want the sync automated, so that we don’t need to poll manually.

Fortunately, the Change Feed Processor (CFP) library handles all these details for you. It was certainly beneficial to start by querying the change feed directly, since exploring that option first is a great way to learn how change feed works internally. However, unless you have very custom requirements, the CFP library is the way to go.

So tune in to part 2, and we’ll see how much easier it is to implement our multiple partition key solution much more robustly using the CFP library.

Change Feed – Unsung Hero of Azure Cosmos DB

Introduction

Azure Cosmos DB is rapidly growing in popularity, and for good reason. Microsoft’s globally distributed, multi-model database service has massively scalable storage and throughput, provides a sliding scale for consistency, is fully and automatically indexed, and it exposes multiple APIs. Throw in a server-side programming model for ACID transactions with stored procedures, triggers, and user-defined functions, along with 99.999% SLAs on availability, throughput, latency, and consistency, and it’s easy to see why Cosmos DB is fast winning the hearts of developers and solution architects alike.

Yet still today, one of the most overlooked capabilities in Cosmos DB is its change feed. This little gem sits quietly behind every container in your database, watches for changes, and maintains a persistent record of them in the order they occur. This provides you with a reliable mechanism to consume a continuous and incremental feed of changes, as documents are actively written or modified in any container.

There are numerous use cases for this, and I’ll call out a few of the most common ones in a moment. But all of them share a need to respond to changes made to a Cosmos DB container. And the first thought that comes to the mind of a relational database developer is to use a trigger for this. Cosmos DB supports triggers as part of its server-side programming model, so it could be natural to think of using this feature to consume changes in real time when you need to.

Unfortunately, though, triggers in Cosmos DB do not fire automatically as they do in the relational world. They need to be explicitly referenced with each change in order to run, so they cannot be relied upon for capturing all changes made to a container. Furthermore, triggers are JavaScript-only, and they run in a bounded execution environment within Cosmos DB that is scoped to a single partition key. These characteristics further limit what triggers can practically accomplish in response to a change.

But with change feed, you’ve got a reliable mechanism for retrieving changes made to any container, all the way back to the beginning of time. You can write code (in your preferred language) that consumes the change feed to process it as needed and deploy that code to run on Azure. This paves an easy path for you to build many different solutions for many different scenarios.

Scenarios for Change Feed

Some of the more common use cases for change feed include:

  • Replicating containers for multiple partition keys
  • Denormalizing a document data model across containers
  • Triggering API calls for an event-driven architecture
  • Real time stream processing and materialized view patterns
  • Moving or archiving data to secondary data stores

Each of these deserves their own focused blog post (and will hopefully get one). For the broader context of this overview post, however, I’ll discuss them each at high level.

Replicating containers for multiple partition keys

One of the most (perhaps the most) important things you need to do when creating a container is to decide on an appropriate partition key – a single property in your data that the container will be partitioned by.

Now sometimes this is easy, and sometimes it is not. In order to settle on the correct choice, you need a clear understanding of how your data is used (written to and queried), and how horizontal partitioning works. You can read all about this in my previous blog post, Horizontal Partitioning in Azure Cosmos DB.

But what do you do when you can’t decide? What if there are two properties that make good choices, one for write performance, and another that’s better for query performance? This can lead to “analysis paralysis,” a scary condition that is fortunately and easily remedied using change feed.

All you do is create two containers, each partitioned by a different partition key. The first container uses a partition key that’s optimized for writes (it may also be appropriate for certain types of queries as well), while the second one uses a partition key optimized for most typical queries. Simply use change feed to monitor changes made to the container as the writes occur and replicate the changes out to the second container.

Your application then writes to the first container and queries from the second container, simple as that! I’ll show you a detailed walkthrough of exactly how to implement this using C# and the Change Feed Processor Library with Azure Functions in my next post.

Denormalizing a document data model across containers

Developers with a background in relational database design often struggle initially with the denormalized approach to data modeling in the NoSQL world of JSON documents. I personally empathize; from my own experience, I know that it can be difficult at first to embrace concepts that run contrary to deeply engrained practices that span decades of experience in the field.

Data duplication is a case in point, where this is considered a big no-no in the normalized world of relational databases with operational workloads. But with NoSQL, we often deliberately duplicate data in order to avoid expensive additional lookups. There is no concept of a JOIN in any NoSQL database engine, and we can avoid having to perform our own “manual” joins if we simply duplicate the same information across documents in different containers.

This is a somewhat finer-grained version of the previous scenario, which replicates entire documents between two containers. In this case, we have different documents in each container, with data fragments from changed documents in one container being replicated into other (related) documents in another container.

But how do you ensure that the duplicated data remains in sync as changes occur in the source container? Why, change feed of course! Just monitor the source container and update the target container. I’ll show you exactly how to do this in a future post.

Triggering API calls for an event-driven architecture

In this scenario, you source events to a set of microservices, each with a single responsibility. For instance, an ecommerce website with a large-scale order processing pipeline. The pipeline is broken up into a set of smaller microservices, each of which can be scaled out independently. Each microservice is responsible for a single task in the pipeline, such as calculating tax on each order, generating tax audit records, processing each order payment, sending orders off to a fulfillment center, and generating shipping notifications.

Thus, you potentially have N microservices communicating with up to N-1 other microservices, which adds significant complexity to the larger architecture. The design can be greatly simplified if, instead, all these microservices communicate through a bus; that is, a persistent event store. And Cosmos DB serves as an excellent persistent event store, because the change feed makes it easy to broker and source these events to each microservice. Furthermore, because the events themselves are persisted, the order processing pipeline itself is very robust and incredibly resilient to failures. You can also query and navigate the individual events, so that this data can be surfaced out through a customer care API.

Check out this link for a real-world case study of how Jet.com implemented an event-sourced architecture using the Cosmos DB change feed with hundreds of microservices: https://customers.microsoft.com/en-us/story/jet-com-powers-innovative-e-commerce-engine-on-azure-in-less-than-12-months. There is also a video on this from Microsoft Build 2018 here: https://channel9.msdn.com/Events/Build/2018/BRK3602

Real time stream processing and materialized view patterns

The change feed can also be used for performing real time stream processing and analytics. In the order processing pipeline scenario, for example, this would enable you to take all the events and materialize a single view for tracking the order status. You could then easily and efficiently present the order status through a user-facing API.

Other examples of so called “lambda architectures” include performing real time analytics on IoT telemetry or building a scoring leader board for a massively multiplayer online video game.

Moving or archiving data to secondary data stores

Another common scenario for using the change feed involves replicating data from Cosmos DB as your primary (hot) store to some other secondary (cold) data store. Cosmos DB is a superb hot store because it can sustain heavy write ingestion, and then immediately serve the ingested records back out to a user-facing API.

Over time as the volume of data mounts, however, you may want to offload older data to cold storage for archival. Once again, change feed is a wonderful mechanism to implement a replication strategy that does just that.

Consuming the Change Feed

So how do you actually work with the change feed? There are several different ways, and I’ll conclude this blog post by briefly explaining three of them. (Don’t worry, I’ll drill deeper into all three next post!)

Direct Access

First, you can query the change feed directly using the SDK. This raw approach works but is the hardest to implement at large scale. Essentially, you first need to discover all the container’s partitions, and then you query each of them for their changes. You also need to persist state metadata; for example, a timestamp for when the change feed was last queried, and a continuation token for each partition. Plus, you’ll want to optimize performance by spawning multiple tasks across different partitions so that they get processed in parallel.

If all this sounds like a lot of work, it is. Which is why you’ll almost certainly want to leverage the Change Feed Processor (CFP) library instead.

Change Feed Processor (CFP) Library

The CFP library provides a high-level abstraction over direct access that greatly simplifies the process of reading the change feed from all the different partitions of a container. This is a separate NuGet package that you pull into your project, and it handles all the aforementioned complexity for you. It will automatically persist state, track all the partitions of the container, and acquire leases so that you can scale out across many consumers.

To make this work, the CFP library persists a set of leases as documents in another dedicated Cosmos DB container. Then, when you spin up consumers, they attempt to acquire leases as they expire.

All you do is write an observer class that implements IChangeFeedObserver. The primary method of this interface that you need to implement is ProcessChangesAsync, which receives the change feed as a list of documents that you can process as needed. No partitions to worry about, no timestamps or continuation tokens to persist, and no scale-out needs to concern yourself with.

However, you still need to write your own host, and deploy the DLL with your observer class to an Azure app service. Although the process is straightforward, going with Azure Functions instead provides an even easier deployment model.

Azure Functions

The simplest way to consume the change feed is by using Azure Functions with a Cosmos DB trigger. If you’re not already familiar with Azure Functions, they let you write individual methods (functions), which you deploy for execution in a serverless environment hosted on Azure. The term “serverless” here means without also having to write a host and deploy an Azure app service to run your code.

Azure Functions are invoked by one of any number of triggers. Yes, Azure Functions also uses the term triggers, but unlike triggers in Cosmos DB, an Azure Functions trigger always fires when its conditions are met. There are several different triggers available, including the one that we care about here, the Cosmos DB trigger. This Azure Functions trigger binds to configuration that points to the container you want to monitor changes on, and the lease collection that gets managed by the CFP library under the covers.

From there, the process is identical to using the CFP library, only the deployment model is dramatically simpler. The Azure Functions method that is bound using the Cosmos DB trigger receives a parameter with the same list of documents that a CFP library’s observer class does in its ProcessChangesAsync method, and you process change feed data the way you need to just the same.

What’s Next?

Hopefully, this blog post opened your eyes to the change feed in Azure Cosmos DB. This powerful feature creates all sorts of exciting possibilities across a wide range of use cases. In the next installment, I’ll focus on container replication for multiple partition keys, and walk you step by step through the process of building a change feed solution using direct access, CFP library, and Azure Functions – all with working code samples. So, stay tuned!