Multiple Partition Keys in Azure Cosmos DB (Part 2) – Using the Change Feed Processor Library

Welcome to part 2 in this series of blog posts on using the Azure Cosmos DB change feed to implement a synchronized solution between multiple containers that store the same documents, each with a different partition key.

Start with part 1 for an overview of the problem we’re trying to solve. In a nutshell, we’re using the change feed to synchronize two containers, so that changes made to one container are replicated to the other. Thus the two containers have the same data in them, but each one is partitioned by a different partition key to best suit one of two common data access patterns. Part 1 presented a solution that queries the change feed directly. In this post, we’ll use the Change Feed Processor (CFP) library instead, which provides numerous advantages over the low-level approach we took in part 1.

What is the CFP Library?

For the solution we built in part 1 to consume the change feed at scale, much more work needs to be done. For example, the change feed on each partition key range of the container can be consumed concurrently, so we could add multithreading logic to parallelize those queries. Long change feeds can also be consumed in chunks, using continuation tokens that we could persist as a “lease,” so that new clients can resume consumption where previous clients left off. We also want the sync automated, so that we don’t need to poll manually.

Fortunately, the Change Feed Processor (CFP) library handles all these details for you. In most cases, unless you have very custom requirements, the CFP library is the way to go over directly querying the change feed yourself.

The CFP library automatically discovers the container’s partition key ranges and parallelizes change feed consumption across each of them internally. This means that you only consume one logical change feed for the entire container, and don’t worry at all about the individual change feeds behind the individual partition key ranges. The library relies on a dedicated “lease” container to persist lease documents that maintain state for each partition key range. Each lease represents a checkpoint in time which tracks continuation tokens for chunking long change feeds across multiple clients. Thus, as you increase the number of clients, the overall change feed processing gets divided across them evenly.

Using the lease container also means we won’t need to rely on our own “sync” document that we were using to track the last time we queried the change feed directly. Therefore, our code will be simpler than the previous solution from part 1, yet we will simultaneously inherit all the aforementioned capabilities that the CFP library provides.

Building on the ChangeFeedDemo solution we created in part 1, open the Program.cs file in the ChangeFeedDirect project. In the Run method, add a line to display a menu item for creating the lease container:

Console.WriteLine("CL - Create lease collection"); 

Further down in the method, add an “else if” condition for the new menu item that calls CreateLeaseCollection:

else if (input == "CL") await CreateLeaseCollection();

Now add the CreateLeaseCollection method:

private static async Task CreateLeaseCollection()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    var dbUri = UriFactory.CreateDatabaseUri("multipk");

    var partitionKeyDefinition = new PartitionKeyDefinition();
    partitionKeyDefinition.Paths.Add("/id");
    await client.CreateDocumentCollectionAsync(dbUri, new DocumentCollection
    {
      Id = "lease",
      PartitionKey = partitionKeyDefinition
    }, new RequestOptions { OfferThroughput = 400 });
  }

  Console.WriteLine("Created lease collection");
} 

This code creates a container named lease provisioned for 400 RUs per second. Also notice that the lease container uses the id property itself as its partition key.

Next, create the host project, which is essentially the client. When an instance of this host runs, it will listen for and consume changes across the entire container. Then, as you spin up additional instances of the host, the work to consume changes across the entire container will be distributed uniformly across all the host instances by the CFP library.

In the current ChangeFeedDemos solution, create a new .NET Core console application and name the project CfpLibraryHost. Next, add the CFP library to the new project from the NuGet package Microsoft.Azure.DocumentDB.ChangeFeedProcessor:

Replace all the code in Program.cs as follows:

using System;
using System.Threading.Tasks;

namespace CfpLibraryHost
{
  class Program
  {
    static void Main(string[] args)
    {
      var host = new ChangeFeedProcessorHost();
      Task.Run(host.RunProcessor).Wait();

      Console.ReadKey();
    }
  }
} 

This startup code initializes and runs the host. It then enters a wait state during which the host will be notified automatically by the CFP library whenever changes are available for consumption.

Next, to implement the host, create a new class named ChangeFeedProcessorHost with the following code:

using Microsoft.Azure.Documents.ChangeFeedProcessor;
using System;
using System.Threading.Tasks;

namespace CfpLibraryHost
{
  public class ChangeFeedProcessorHost
  {
    public async Task RunProcessor()
    {
      var monitoredCollection = new DocumentCollectionInfo
      {
        Uri = new Uri(Constants.CosmosEndpoint),
        MasterKey = Constants.CosmosMasterKey,
        DatabaseName = "multipk",
        CollectionName = "byCity",
      };

      var leaseCollection = new DocumentCollectionInfo
      {
        Uri = new Uri(Constants.CosmosEndpoint),
        MasterKey = Constants.CosmosMasterKey,
        DatabaseName = "multipk",
        CollectionName = "lease",
      };

      var builder = new ChangeFeedProcessorBuilder();

      builder
        .WithHostName($"Change Feed Processor Host - {Guid.NewGuid()}")
        .WithFeedCollection(monitoredCollection)
        .WithLeaseCollection(leaseCollection)
        .WithObserverFactory(new MultiPkCollectionObserverFactory());

      var processor = await builder.BuildAsync();
      await processor.StartAsync();

      Console.WriteLine("Started change feed processor - press any key to stop");
      Console.ReadKey();

      await processor.StopAsync();
    }
  }

} 

The host builds a processor configured to monitor the byCity container, using the lease container to track internal state. The processor also references an observer factory which, in turn, supplies an observer to consume the changes as they happen.

This code relies on defined constants for the URI and master key of the Cosmos DB account. Define them as follows in a new class called Constants.cs:

namespace CfpLibraryHost
{
  public class Constants
  {
    public const string CosmosEndpoint = "https://<account-name>.documents.azure.com:443/";
    public const string CosmosMasterKey = "<account-key>";
  }
} 

The observer factory is super-simple. Just create a class that implements IChangeFeedObserverFactory, and return an observer instance (where the real logic will go) that implements IChangeFeedObserver. Name the class MultiPkCollectionObserverFactory, as referenced by WithObserverFactory in the host.

using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;

namespace CfpLibraryHost
{
  public class MultiPkCollectionObserverFactory : IChangeFeedObserverFactory
  {
    public IChangeFeedObserver CreateObserver() =>
      new MultiPkCollectionObserver();
  }
} 

Finally, create the observer class itself. Name this class MultiPkCollectionObserver, as referenced by the CreateObserver method in the factory class:

using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
using Microsoft.Azure.Documents.Client;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace CfpLibraryHost
{
  public class MultiPkCollectionObserver : IChangeFeedObserver
  {
    private static DocumentClient _client;

    static MultiPkCollectionObserver() =>
      _client = new DocumentClient(new Uri(Constants.CosmosEndpoint), Constants.CosmosMasterKey);

    public async Task OpenAsync(IChangeFeedObserverContext context)
    {
      Console.WriteLine($"Start observing partition key range {context.PartitionKeyRangeId}");
    }

    public async Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
    {
      Console.WriteLine($"Stop observing partition key range {context.PartitionKeyRangeId} because {reason}");
    }

    public async Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> documents, CancellationToken cancellationToken)
    {
      foreach (var document in documents)
      {
        var jDocument = JsonConvert.DeserializeObject<JObject>(document.ToString());

        if (jDocument["ttl"] == null)
        {
          var stateCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byState");
          await _client.UpsertDocumentAsync(stateCollUri, document);
          Console.WriteLine($"Upserted document id {document.Id} in byState collection");
        }
        else
        {
          var stateDocUri = UriFactory.CreateDocumentUri("multipk", "byState", document.Id);
          await _client.DeleteDocumentAsync(stateDocUri, new RequestOptions
          {
            PartitionKey = new PartitionKey(jDocument["state"].Value<string>())
          });
          Console.WriteLine($"Deleted document id {document.Id} in byState collection");
        }
      }
    }
  }
} 

Our synchronization logic is embedded inside the ProcessChangesAsync method. The benefits of using the CFP library should really stand out now, compared to our solution from part 1. This code is significantly simpler yet dramatically more robust than the SyncCollections method we wrote in the ChangeFeedDirect project. We are no longer concerned with discovering and iterating partition key ranges, nor do we need to track the last sync time ourselves. Instead, the CFP library handles those details, plus it parallelizes the work and uniformly distributes large-scale processing across multiple client instances. Our code is now limited to just the business logic we require, which is the same logic as before: Upsert new or changed documents into the byState container as they occur in the byCity container, or delete them if their TTL property is set.

Let’s see it work. We’ll want to run both projects at the same time. The ChangeFeedDirect console app provides an interactive menu for creating the database with the byCity, byState, and lease containers, and also for inserting, updating, and deleting documents in byCity container. The CfpLibraryHost app, meanwhile, sits and waits to be notified of changes as they occur in the byCity container, so that they can be synchronized to the byState container.

Start the first console app (ChangeFeedDirect) and run the DB command to drop and recreate the database with empty byCity and byState containers. Then run CL to create the lease container.

Without closing the first console app, start the second one (CfpLibraryHost). To do this, right-click the CfpLibraryHost project in Solution Explorer and choose Debug, Start New Instance. The host fires up, and now just sits and waits for changes from the CFP library:

Started change feed processor - press any key to stop
Start observing partition key range 0

Back in the first console app (ChangeFeedDirect), run the CD command to create three documents in the byCity container. Then run the UD command to update a document, and the DD command to delete another document (by setting its ttl property). Do not run SC to manually sync with the direct change feed queries from part 1. Instead, sit back and watch it happen automatically with the CFP library. Within a few moments, the host is notified of the changes, and our observer class synchronizes them to the byState container:

Started change feed processor - press any key to stop
Start observing partition key range 0
Upserted document id 63135c76-d5c6-44a8-acd3-67dfc54faae6 in byState collection
Upserted document id 06296937-de8b-4d72-805d-f3120bf06403 in byState collection
Upserted document id 0b222279-06df-487e-b746-7cb347acdf18 in byState collection
Upserted document id 63135c76-d5c6-44a8-acd3-67dfc54faae6 in byState collection
Deleted document id 0b222279-06df-487e-b746-7cb347acdf18 in byState collection 

The beauty here is that the client does not need to manually poll the change feed for updates. And we can just spin up as many clients as needed to scale out the processing for very large containers with many changes.

What’s Next?

The benefits of using the CFP library are clear, but we still need to deploy our host to run in Azure. We could certainly deploy the CfpLibraryHost console application to a web app in App Service as an Azure WebJob. But the simplest way to achieve this is by using Azure Functions with a Cosmos DB trigger.

So tune in to part 3, where I’ll conclude this three-part series by showing you how to deploy our solution to Azure using Azure Functions.

2 Responses to “Multiple Partition Keys in Azure Cosmos DB (Part 2) – Using the Change Feed Processor Library”

  1. Multiple Partition Keys in Azure Cosmos DB (Part 3) – Azure Functions with Change Feed Trigger | Lenni's Technology Blog Says:

    […] common data access patterns. part 1 presented a solution that queries the change feed directly. In part 2, I showed you how to use the Change Feed Processor (CFP) library instead, which provides numerous […]


Leave a reply to Multiple Partition Keys in Azure Cosmos DB (Part 3) – Azure Functions with Change Feed Trigger | Lenni's Technology Blog Cancel reply