Server-side Encryption in Azure Cosmos DB

Encryption is an important way to keep you data secured. Cosmos DB has server-side encryption built into to the service, so that on the server-side of things, your data is always encrypted – both in flight – as it travels the network, and at rest – when it’s written to disk. As I said, this functionality is built-in, and there’s simply no way to disable this encryption.

One nice part of this feature is that Microsoft creates and manages the keys used for server-side encryption, so there are no extra steps you need to take in order to make this work. Your data is simply encrypted on the server-side using encryption keys managed entirely by Microsoft. As part of this management, Microsoft applies the usual best practices, and protects these keys with a security life cycle that includes rotating keys on a regular basis.

In addition, customer-managed encryption keys are also supported, so that you can use your own encryption keys. This adds another layer of encryption, on top of the encryption based on Microsoft-managed keys that, again, can never be disabled. That said, using customer-managed keys is generally discouraged, unless you are absolutely obliged by regulatory compliance guidelines mandated by your industry. And the reason for this is simple; because then you become responsible for managing those keys, and that’s a huge responsibility. Now you need to maintain and rotate encryption keys yourself, and if you should lose a key, then you lose access to all your data, and Microsoft will not be able to help you recover it. And another relatively minor consideration is that you’re going to incur a slight increase in RU charges when using customer-managed keys, because of the additional CPU overhead needed to encrypt your data with them.

So again, using customer-managed keys does not disable the default behavior; your data still gets encrypted using a Microsoft managed key. The result is double-encryption, where that encrypted data is then encrypted again using your own key that you supply.

Currently, you can only configure your account to use customer-managed keys when you’re creating the account; they cannot be enabled on an existing account. So here in the portal, during the process of creating a new account, the Encryption tab lets you set it up.

The default is to use a service-managed key – that is, the one provided and managed by Microsoft, but you can also choose to use a customer-managed key. And when you make this choice, the portal prompts you for the Key URI, and that’s simply the path to your own key that you’ve got stored in your own Azure key vault.

So you see that it’s pretty simple to configure your Cosmos DB account to encrypt your data using your own encryption key that you have in Azure Key Vault.

Beyond server-side encryption, Cosmos DB supports client-side encryption – a feature that’s also known as Always Encrypted. Because your data is always encrypted on the server-side, enabling client-side encryption as well means that your data is truly always encrypted, even as it flows into and out of your client. So stay tuned for my next post where we’ll dive into client-side encryption with Always Encrypted.

Network Security in Azure Cosmos DB

Overview

Network security is your first line of defense against unauthorized access to your Cosmos DB account. Before a client can even attempt to authenticate against your account, it needs to be able to establish a physical network connection to it.

Now Cosmos DB is a cloud database service on Azure, and Azure is a public cloud, so by default, a new Cosmos DB account can be accessed from anywhere on the internet. While this is very convenient when you’re getting started, such exposure is often unacceptable for sensitive mission-critical databases running in production environments.

And so, there are several ways for you to lock down network access to your Cosmos DB account.

IP Firewall

First, you can use the IP firewall which is both simple and effective. This works very much like firewalls that you find in other systems, where you maintain a list of approved IP addresses, which could be individual addresses, or address ranges. Network traffic from IP addresses that are not approved, get blocked from the firewall; while traffic from approved addresses are allowed to pass through the firewall, and reach your account by its public IP address.

So here I’ve got my Cosmos DB account, which can be reached by its public endpoint, and that’s a public IP address on the internet. And we’ve got two clients hanging out on the internet, each on their own public IP address. Now by default, both clients can reach our account, but if we enable the IP firewall, then we can add the IP address for the first one, with the IP address ending in 140, so that incoming traffic from that client is allowed access to the account’s public endpoint. Meanwhile, since we haven’t added the IP address for the second client ending in 150, that client gets blocked and can’t access the account.

In the Azure Portal, head on over to Firewall and Virtual Networks, where by default, any client can access the account. But if you choose selected networks, then you can configure the firewall.

There are actually two sections to this page, and right now we’re focused on the bottom section, Firewall. Here you can add IP addresses, or IP address ranges, one at a time, for approved clients, while clients from all other IP addresses are unapproved and get blocked by the firewall.

To make it easier to connect through the firewall from you own local machine, you can click on Add My Current IP, where the portal automatically detects your local machine’s IP address. You can see I’ve done this in the previous screenshot, where my local IP address has been added to the firewall as an approved client.

Also take note of the bottom two checkboxes for exceptions. First, you may reluctantly want to select the first checkbox, which accepts connections from within public Azure data centers. You would only do this if you need access to your account from a client running on Azure that can’t share its IP address or IP address range. And these can be VMs, functions, app services; any kind of client. Again, this is why you want to be cautious with this checkbox, because selecting it would accept connections from any other customer that’s running a VM.

The second checkbox, which is selected by default, makes sure that the account can always be reached by the Cosmos DB portal IPs; so you’ll want to keep this setting if you want to use portal features – notably the Data Explorer, with your account.

When you click Save, be prepared to wait a bit for the changes to take effect. With the distributed nature of the Cosmos DB service, it can take up to 15 minutes for the configuration to propagate, although it usually less time than that. And then, the firewall is enabled, blocking network traffic from all clients except my local machine.

VNet Through Service Endpoint

Using the IP firewall is the most basic way to configure access control, but it does carry the burden of having to know the IP addresses of all your approved clients, and maintaining that list.

With an Azure virtual network, or a VNet, you can secure network access to all clients hosted within that VNet, without knowing or caring what those client IP addresses are. You just approve one or more VNets, and access to your account’s public endpoint is granted only to clients hosted by those VNets. Like the IP firewall, all other clients from outside approved VNets are blocked from your account’s public endpoint.

So once again, we have our Cosmos DB account with its public endpoint. And we’ve got a number of clients that need to access our account. But rather than approve each of them to the firewall individually by their respective IP addresses, we simply host them all on the same virtual network. Then, when we approve this VNet for our account, that creates a service endpoint which allows access to our account only from clients that the VNet is hosting.

If you run an NSLookup to the fully qualified domain name for your account, from within the VNet, you can see that this resolves to a public IP address, for your account’s public endpoint:

You can approve individual VNets from the top section of the same page we were using to configure the IP firewall, where you use these links to choose a new or existing vnet.

Here I’ve added an existing VNet which I’ve named cosmos-demos-rg-vnet, and you can see the VNet is added as approved. Also notice that you can mix-and-match, combining VNet access with the IP firewall, so you can list both approved IP addresses and approved VNets on this one page in the Azure portal, like you see above.

VNet Through Private Endpoint

VNet access using service endpoints is very convenient, but there’s still a potential security risk from within the VNet. Because as I just showed you, the Cosmos DB account is still being accessed by its public endpoint through the VNet’s service endpoint, and that means that the VNet itself can connect to the public internet. So a user that’s authorized to access the VNet could, theoretically, connect to your Cosmos DB account from inside the VNet, and then export it out to anywhere on the public internet.

This security concern known as exfiltration, and can be addressed using your third option, VNet access using private endpoints.

This is conceptually similar to using VNets with service endpoints like we just saw, where only approved VNets can access your account. However now, the VNet itself has no connection to the public internet. And so, when you enable private endpoints, you’ll see that your Cosmos DB account itself is accessible only through private IP addresses that are local to the VNet. This essentially brings your Cosmos DB account into the scope of your VNet.

So again here’s our Cosmos DB account which, technically still has a public endpoint, but that public endpoint is now completely blocked to public internet traffic. Then we’ve got our clients that, like before, all belong to the same VNet that we’ve approved for the account. However, now the VNet has no public internet access, and communicates with your Cosmos DB account using a private IP address accessible only via the private endpoint that you create just for this VNet.

So if you run NSLookup from within a VNet secured with a private endpoint, you can see that your account’s fully qualified domain name now resolves to a private IP address. And so, there’s simply no way for a malicious user that may be authorized for access to the VNet, to exfiltrate data from your account out to the public internet.

It’s fairly straightforward to configure a private endpoint for your VNet. Start by clicking to open the Private Endpoint Connections blade, and then to create a new one. This is actually a new resource that you’re adding to he same resource group as the VNet that you’re securing.

Just give your new private endpoint a name, and click Next to open the Resource tab.

Here you select the resource type, which is Microsoft Azure Cosmos DB slash Database Accounts, and the resource itself, which is the Cosmos DB account you are connecting to with this new private endpoint. Then there’s the target sub-resource, which is just the API that you’ve chosen for the account, and that’s the SQL API in this case.

Then click Next for the Virtual Network tab, where you can choose the VNet and its subnet for the new private endpoint.

Notice that Private DNS Integration option is turned on by default, and this is what transparently maps your account’s fully qualified domain name to a private IP address. So, really without you having to do anything different in terms of how you work with your Cosmos DB account, this makes the account accessible only from the private endpoint in this VNet, which itself is accessible only through private IP addresses, and no access to the public internet.

And that’s pretty much it. Of course, you can assign tags to the private endpoint just like with any Azure resource, and then just click Create. It takes a few moments for Azure to create and deploy the new private endpoint, and then you’re done.

Summary

This post examined three different ways to secure network access to your Cosmos DB account. You can approve IP addresses and IP address ranges with the IP firewall, or approve specific virtual networks using service endpoints. Both of these options carry the risk of exfiltration, since the Cosmos DB account is still accessible via its public endpoint. However, you can eliminate that risk using your third option, which is to use private endpoints with virtual networks that are completely isolated from the public internet.

Backup and Restore in Azure Cosmos DB

Overview

Protecting your data is critical, and Cosmos DB goes a long way to provide redundancy so you get continuous uptime in the face of hardware failures, or even if an entire region suffers an outage. But there are other bad things that can happen too – notably, any form of accidental corruption or deletion of data – and that’s why Cosmos DB has comprehensive backup and restore features.

There are actually two versions of this feature you can choose from; periodic backups, and continuous backups.

Periodic backups are the default, and they are both automatic and free. Meaning that, at regular intervals, Cosmos DB takes a backup of your entire account, and saves it to Azure Blob Storage, always retaining at least the two most recent backups. This occurs without absolutely zero impact on database performance, and with no additional charges to your Azure bill.

Backups are retained for 8 hours by default, but you can request retention for up to 30 days if you need. However, this is where you begin to incur charges for periodic backups, since the longer you retain backups, the more snapshot copies need to be kept in Azure Blob Storage – and only the first two are free. So depending on how frequently you set the backup interval, and how long you request backup retention – Cosmos DB may keep anywhere from 2 to 720 snapshots in Azure Blob Storage available for restore.

In the unfortunate event that you do need a restore, you do that by opening a support ticket in the portal, and I’ll show you what that looks like in a moment.

Your other option is to use continuous backup, which does cost extra, but lets you perform a point-in-time restore – meaning, you can restore your data as it was at any point in time within the past 30 days, and you don’t deal with intervals, snapshots, and retention.

Continuous backup carries a few limitations – at least right now, you can use it only with the SQL and Mongo DB APIs. And while it does work with geo-replicated accounts, it doesn’t work if you’ve enabled multi-master; that is, multi-region writes – it needs to be a single-write region account.

If you need to restore data to an account using continuous backups, you can do it yourself right inside the Azure portal. You don’t need to open a ticket with the support team, like you do with periodic backups.

Periodic Backups

As I said, periodic backup is the default backup mode, and it can be configured at the same time you create a new Cosmos DB account. Here in the Backup Policy tab for a new account, we see periodic backup is selected by default, to automatically take backups every 4 hours (that’s 240 minutes) and retain them for 8 hours. This results in 2 copies always being retained at any given time.

Also notice that, by default, you get geo-redundant backup storage, which adds yet another layer of data protection. So – even for single-region accounts – your backups will be written to two regions; the region where the account resides, and the regional pair associated with that region. All backups are encrypted at rest, and in-flight as well, as they get transferred for geo-redundancy to the regional pair, using a secure non-public network.

Once the account is created, you can navigate over to the Backup and Restore blade and reconfigure the periodic backups. For example, I can go ahead and change the interval from four hours, to be every hour.

Increasing the frequency for the same retention period of 8 hours results in 8 copies being retained, and so we see a message telling us that we’re incurring an extra charge for storage, since only the first 2 copies are free. Currently, that charge is fifteen cents a month per gigabyte of backup storage, though charges can vary by region.

Now if I bump up the retention period from 8 hours to 720, which is 30 days now I’m retaining 720 backup copies, since we’re taking one backup an hour and retaining them for 720 hours.

To restore from a periodic backup, you need to open a support ticket by scrolling all the way down on the left and choosing New Support Request. For basic information, you need to type a brief summary description, and indicate that type of problem you need help with. For example, I’ll just say Restore from backup for the summary. Then when I choose Backup and Restore as the Problem Type, I can then choose Restore Data For My Account as the Problem Subtype.

Then click Next to move on to the Solutions tab.

There’s no input on this tab, it just has a lot of useful information to read about restoring your data. For example, it recommends increasing the retention to within 8 of the desired restore point, in order to give the Cosmos DB team enough time to restore your account – among other things to consider for the restore. It also explains how long you can expect to wait for the restore, which – while there’s no guarantee – is roughly four hours to restore about 500 GB of data.

Next you’re on the Details tab, where you supply the specifics. Like when the problem started, and whether you need to restore deleted or corrupted data. As you input this form, you can indicate whether you need to restore the entire account, or only certain databases and containers. Then scrolling down some, you can put in your contact info and how you prefer to be contacted.

Then click Next to review your support request.

Finally, click Create to open the ticket to restore your data.

Continuous Backup

Now lets have a look at continuous backups, which work quite a bit differently than periodic backups.

Here on the Backup Policy tab, when you’re creating a new account, you can just click Continuous, and you’ll get continuous backups for the new account.

And there again we get a message about the additional charges that we’ll incur for this feature. Currently, the charge for continuous backup storage is 25 cents a month per gigabyte, and for geo-replicated accounts, that gets multiplied by the number of regions in the account. There’s also a charge for if and when you need to perform a point-in-time restore, and that’s currently 19 cents per gigabyte of restored data. And again, this can vary by region, so it’s always best to check the Cosmos DB Pricing page for accurate charges.

If you don’t enable continuous backups at account creation time, you’ll get periodic backups instead. And then, you can head over to the Features blade showing that Continuous Backup is Off.

Simply click to enable it; though note that once continuous backups are turned on for the account, you cannot switch back to using periodic backups.

Now that continuous backups are enabled, the Backup and Restore blade gets replaced by the Point-In-Time Restore blade.

This blade lets you restore from the continuous backup yourself, without opening a support ticket with the Cosmos DB team. Just plug in any desired point-in-time for the restore within the past 30 days. You can also click the link on Need help with identifying restore point? to open the event feed, which shows a chronological display of all operations to help you determine that the exact time should be.

Then you pick the location where the account gets restored, which needs to be a location where the account existed at the specified point-in-time. Then you get to choose whether to restore the entire account, or only specific databases and containers, and finally, the resource group and name for a new account where all the data gets restored to.

Then click Submit, and you’ve kicked off the restore.

Summary

Recovering from backups is something we all like to avoid. But when it becomes necessary, it’s good to know that Cosmos DB has you covered with your choice of periodic or continuous backups. So you can sleep easy knowing that you’re protected, if you ever need to recover from accidental data corruption or deletion.

Implementing Time Zone Support in Angular and ASP.NET Core Applications

In this blog post, I’ll show you how to easily and effectively implement time zone support, with Daylight Savings Time (DST) awareness, in your web applications (Angular with ASP.NET Core)

Historically, these issues have always stuck a thorn in the side of application development. Even if all your users are in the same time zone, Daylight Savings Time (DST) will still pose a difficult challenge (if the time zone supports DST). Because even within a single time zone, the difference between 8:00am one day and 8:00am the next day will actually be 23 hours or 25 hours (depending on whether DST had started or ended overnight in between), and not 24 hours like it is in all other instances throughout the year.

When your application is storing and retrieving historical date/time values, absolute precision is critical. There can be no ambiguity around the exact moment an entry was recorded in the system, regardless of the local time zone or DST status at any tier; the client (browser), application (web service), or database server.

General consensus therefore is to always store date/time values in the database as UTC (coordinated universal time), a zero-offset value that never uses DST. Likewise, all date/time manipulation performed by the application tier in the web service is all in UTC. By treating all date/time values as UTC on the back end (from the API endpoint through the database persistence layer), your application is accurately recording the precise moment that an event occurs, regardless of the time zone or DST status at the various tiers of the running application.

Note: UTC is often conflated with GMT (Greenwich Mean Time), since they both represent a +00:00 hour/minute offset. However, GMT is a time zone that happens to align with UTC, while UTC is a time standard for the +00:00 offset that never uses DST. Unlike UTC, some of the countries that use GMT switch to different time zones during their DST period.

Note: SQL Server 2008 introduced the datetimeoffset data type (also supported in Azure SQL Database). This data type stores date/time values as UTC internally, but also embeds the local time zone offset to UTC in each datetimeoffset instance. Thus, date/time values appear to get stored and retrieved as local times with different UTC offsets, while under the covers, it’s still all UTC (so sorting and comparisons work as you’d expect). This sounds great, but unfortunately, the datetimeoffset data type is not recommended for two reasons; it lacks .NET support (it is not a CLS-compliant data type), and it is not DST-aware. This is why, for example, the SQL Server temporal feature (offering point-in-time access to any table) uses the datetime2 data type to persist date/time values that are all stored in UTC.

Once you store all your date/time values as UTC, you have accurate data on the back end. But now, how do you adapt to the local user’s time zone and DST status? Users will want to enter and view data in their local time zone, with DST adjusted based on that time zone. Or, you might want to configure your application with a designated “site” time zone for all users regardless of their location. In either scenario, you need to translate all date/time values that are received in client requests from a non-UTC time zone that may or may not be respecting DST at the moment, into UTC for the database. Likewise, you need to translate all date/time values in the responses that are returned to the client from UTC to their local time zone and current DST status.

Of course, it’s possible to perform these conversions in your application, by including the current local time zone offset appended as part of the ISO-formatted value in each API request; for example, 2021-02-05T08:00:00-05:00, where -05:00 represents the five hour offset for Eastern Standard Time (EST) here in New York. Unfortunately, here are three complications with this approach.

The first problem is that it only works for current points in time. For example, in July, EST respects DST, and so the offset is only four hours from UTC, not five; for example, 2021-07-05T08:00:00-04:00. This translates fine, since the current offset in July is actually four hours and not five. But if, in July, the user is searching for 8:00am on some past date in February, then the value 2021-02-05T08:00:00-04:00 is off by an hour, which of course yields incorrect results. This means that you really need to know the actual time zone itself, not just the current offset.

Second, resolving DST remains a problem that’s not easily solved on the client. Because some time zones don’t respect DST at all, and those that do respect DST all switch to and from DST at different dates from year to year.

And last, the onus is then on every developer to include code that converts from local to UTC in the request, and from UTC to local in the response. Every date/time value instance must be accounted for in both directions, so it is all too easy for a developer to miss a spot here and there. The result, of course, is a system that’s both difficult to maintain and error prone.

The solution explained here addresses all these concerns by leveraging middleware in ASP.NET Core. This is a feature that allows you to intercept all incoming requests and modify the request content after it leaves the client, but before it reaches your endpoint. Likewise, you can intercept all outgoing responses and modify the response content before it gets returned to the client, but after you have finished processing the request.

Thus, we will write some middleware that discovers every date/time value found in the JSON body (or query string parameters in the URI) associated with the request, and converts it to UTC. Likewise, it discovers every date/time value found in the JSON body associated with the response, and converts it to the user’s local time zone. Of course, to properly perform the conversions with DST awareness, the server needs to know the local time zone (not just the current time zone offset). This is the only responsibility that falls on the client; it needs to inform the server what the local time zone is with every request. All the rest of the heavy processing happens in the middleware on the server.

Write an Angular Interceptor

This is fairly straightforward; we want to inject the local time zone into the HTTP header of every API request issued by our Angular client:

import { HttpEvent, HttpHandler, HttpInterceptor, HttpRequest } from "@angular/common/http";
import { Injectable } from "@angular/core";
import { Observable } from "rxjs";

@Injectable()
export class TimeZoneInterceptorService implements HttpInterceptor {
  public intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
    const modifiedReq = req.clone({
      headers: req.headers.set('MyApp-Local-Time-Zone-Iana', Intl.DateTimeFormat().resolvedOptions().timeZone),
    });
    return next.handle(modifiedReq);
  }
}

This class implements the HttpInterceptor interface provided by Angular, which requires that you supply an intercept method. It receives the incoming request in req and modifies it by cloning it while adding a new header named MyApp-Local-Time-Zone-Iana with the value returned by Intl.DateTimeFormat().resolvedOptions().timeZone. That mouthful simply returns the client’s local time zone in IANA format, one of several different standards for expressing time zones. For example, here in New York, the IANA-formatted time zone is America/New_York.

Configure the Middleware Class

In Starup.cs, add code to the Configure method to plug in a class that we’ll call RequestResponseTimeZoneConverter. As the name implies, this class will convert incoming and outgoing date/time values to and from UTC and the client’s local time zone:

public void Configure(IApplicationBuilder app)
{
  // :
  app.UseMiddleware<RequestResponseTimeZoneConverter>();
  // :
}

Implement the Invoke method

Our middleware class needs to implement the Invoke method that will fire for each request, which can process each incoming request and outgoing response. Start with the namespace imports and class definition like so:

using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Extensions;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using OregonLC.Shared;
using OregonLC.Shared.Extensions;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using TimeZoneConverter;

namespace TimeZoneDemo
{
  public class RequestResponseTimeZoneConverter
  {
    private readonly AppConfig _appConfig;
    private readonly RequestDelegate _next;

    public RequestResponseTimeZoneConverter(
        IOptions<AppConfig> appConfig,
        RequestDelegate next)
    {
      this._appConfig = appConfig.Value;
      this._next = next;
    }

  }
}

Notice that we’re also injecting IOptions<AppConfig> for a strongly typed configuration object based on appsettings.json, where we define settings that can influence the time zone conversion behavior of our middleware.

Now implement the Invoke method:

public async Task Invoke(HttpContext context)
{
  // Request content and parameters won't be modified if disableTimeZoneConversion=true is specified
  // as a query string parameter in the URI
  var disableConversion =
    context.Request.Query.ContainsKey("disableTimeZoneConversion") &&
    context.Request.Query["disableTimeZoneConversion"] == "true";

  // Get the local time zone for UTC conversion
  var localTimeZone = this.GetLocalTimeZone(context);

  // If conversion isn't disabled, and the local time zone can be detected (and isn't UTC),
  // modify the request content (convert local to UTC)
  if (!disableConversion && localTimeZone != null && localTimeZone.Id != "UTC")
  {
    // Modify the date/time request parameters in the URI
    this.ModifyRequestParameters(context, localTimeZone);

    // Don't modify the request content unless the Content-Type is application/json
    var isJsonContent =
      context.Request.Headers.ContainsKey("Content-Type") &&
      context.Request.Headers["Content-Type"] == "application/json";

    if (isJsonContent)
    {
      // Modify the date/time properties in the request content
      await this.ModifyRequestContent(context, localTimeZone);
    }
  }

  // Prepare for modifying the response body
  var responseStream = context.Response.Body;
  var modifiedResponseStream = new MemoryStream();
  context.Response.Body = modifiedResponseStream;

  try
  {
    await this._next(context).ConfigureAwait(false);
  }
  finally
  {
    context.Response.Body = responseStream;
  }

  // Modify the response content (convert UTC to local)
  modifiedResponseStream = this.ModifyResponseContent(context, disableConversion, localTimeZone, modifiedResponseStream);
  await modifiedResponseStream.CopyToAsync(responseStream).ConfigureAwait(false);
}

private TimeZoneInfo GetLocalTimeZone(HttpContext context)
{
  // If the app config doesn't permit multiple time zones, then treat every user as if
  // they were in the same "site" time zone
  if (!this._appConfig.SupportMultipleTimeZones)
  {
    return TimeZoneInfo.FindSystemTimeZoneById(this._appConfig.SiteTimeZoneId);
  }

  // If the request headers include the user's local time zone (IANA name, injected by client-side HTTP interceptor),
  // use that time zone
  if (context.Request.Headers.TryGetValue("MyApp-Local-Time-Zone-Iana", out StringValues localTimeZoneIana))
  {
    return TZConvert.GetTimeZoneInfo(localTimeZoneIana);
  }

  // The app config permits multiple time zones, but the user request doesn't specify the time zone
  return null;
}

The code is heavily commented to make it self-describing, so I’ll just call out the main aspects. We’ll allow the client to disable time zone conversion on a per-request basis, by supplying disableTimeZoneConversion=true as a query string parameter. We then call GetLocalTimeZone, which first checks our app configuration to see if the server application wants to support multiple time zones, or a consistent “site” time zone for all clients. So if we see that SupportMultipleTimeZones is false, then we ignore the client’s local time zone injected by our Angular interceptor, and use the same “site” time zone for all users instead, as defined by SiteTimeZoneId in the app configuration.

Otherwise, we get the client’s local time zone by retrieving the MyApp-Local-Time-Zone-Iana custom HTTP header injected by the interceptor. Because .NET requires the time zone in Windows format, we use the TZConvert.GetTimeZoneInfo method (available by installing the TimeZoneConverter NuGet package) to convert from the IANA format supplied by the client into a Windows TimeZoneInfo object. For example, it will convert America/New_York to (UTC-05:00) Eastern Time (US & Canada).

Then, as long as this client request hasn’t explicitly disabled time conversion, and the client’s local time zone itself isn’t UTC (meaning no conversion is necessary anyway), we call ModifyRequestParameters to convert all date/time values supplied as query string parameters in the request URI to UTC. And then, after verifying that the request content being posted to our API is, in fact, a JSON payload, we call ModifyRequestContent to convert all date/time values inside that JSON content to UTC as well.

Before allowing the request to process, we capture the current response stream in case of exception, and set the response body to a new stream for our modified response. We then call this._next to invoke the request, with exception handling in place to use the original response stream in case an error occurs processing the request. Otherwise, we call ModifyResponseContent to convert all date/time values inside the JSON response payload from UTC back to the client’s local time zone.

Modify the Request Query String Parameters (local to UTC)

Now add the ModifyRequestParameters method to handle query string parameters:

private void ModifyRequestParameters(HttpContext context, TimeZoneInfo localTimeZone)
{
  // Get all the query parameters from the URI
  var queryParameters = context.Request.Query
    .SelectMany(kvp =>
      kvp.Value, (col, value) =>
        new KeyValuePair<string, string>(col.Key, value))
    .ToList();

  // Nothing to do if there aren't any
  if (queryParameters.Count == 0)
  {
    return;
  }

  // Build a new list of query parameters, converting date/time values
  var modifiedQueryParameters = new List<KeyValuePair<string, string>>();

  var modified = false;
  foreach (var item in queryParameters)
  {
    var value = item.Value;
    if (value.FromDateTimeIsoString(out DateTime local))
    {
      var utc = TimeZoneInfo.ConvertTimeToUtc(local, localTimeZone);
      value = utc.ToDateTimeIsoString();
      var modifiedQueryParameter = new KeyValuePair<string, string>(item.Key, value);
      modifiedQueryParameters.Add(modifiedQueryParameter);
      modified = true;
    }
    else
    {
      var unmodifiedQueryParameter = new KeyValuePair<string, string>(item.Key, value);
      modifiedQueryParameters.Add(unmodifiedQueryParameter);
    }
  }

  if (modified)
  {
    var qb = new QueryBuilder(modifiedQueryParameters);
    context.Request.QueryString = qb.ToQueryString();
  }
}

We first obtain all the query parameters from the request URI, from which we build a new list of parameters with any date/time values converted from local to UTC. This code relies on the two extension methods FromDateTimeIsoString and ToDateTimeIsoString to parse to and from ISO-formatted date/time strings and native .NET DateTime types:

public static bool FromDateTimeIsoString(this string value, out DateTime dateTime)
{
  if (
      (value.Length == 16 || (value.Length == 19 && value[16] == ':')) &&
      value[4] == '-' &&
      value[7] == '-' &&
      value[10] == 'T' &&
      value[13] == ':' &&
      DateTime.TryParse(value, out DateTime parsedDateTime)  // calls DateTime.TryParse only after passing the smell test
     )
  {
    dateTime = parsedDateTime;
    return true;
  }

  dateTime = DateTime.MinValue;
  return false;
}

public static string ToDateTimeIsoString(this DateTime value) =>
  value.ToString("yyyy-MM-ddTHH:mm:ss");

Modify the Request Body Content (local to UTC)

Next add the ModifyRequestContent method to handle the JSON request payload:

private async Task<TimeZoneInfo> ModifyRequestContent(HttpContext context, TimeZoneInfo localTimeZone)
{
  // Read the request content from the request body stream; if it's a JSON object, we'll process it
  var requestStream = context.Request.Body;
  var originalRequestContent = await new StreamReader(requestStream).ReadToEndAsync();

  // Try to get the JSON object from the request content
  var jobj = originalRequestContent.TryDeserializeToJToken();

  // If the request content is a JSON object, convert all of it's date/time properties from local time to UTC
  var modified = false;
  if (jobj != null)
  {
    modified = jobj.ConvertLocalToUtc(localTimeZone);
  }

  if (modified)
  {
    // Replace the stream with the updated request content
    var json = JsonConvert.SerializeObject(jobj);
    var requestContent = new StringContent(json, Encoding.UTF8, "application/json");
    requestStream = await requestContent.ReadAsStreamAsync();
  }
  else
  {
    // Replace the stream with the original request content
    requestStream = new MemoryStream(Encoding.UTF8.GetBytes(originalRequestContent));
  }

  // Replace the request body stream
  context.Request.Body = requestStream;

  // Return the time zone info for the reverse conversion on the response
  return localTimeZone;
}

We first try to deserialize the JSON, and if that succeeds, we convert all of its date/time properties from local to UTC. We then replace the original stream with the updated content, unless we detect that there were no date/time properties at all in the request, in which case we continue with the original stream.

Again, we have some extension methods here to help out, which are TryDeserializeToJToken and ConvertLocalToUtc:

public static JToken TryDeserializeToJToken(this string json)
{
  if (json == null || (!json.StartsWith("[") && !json.StartsWith("{")))
  {
    return null;
  }

  // Try to get the JSON object from the request content
  var jToken = default(JToken);
  try
  {
    jToken = JsonConvert.DeserializeObject<JToken>(json);
  }
  catch
  {
    // Ignore the exception, returning null to indicate bad JSON
  }

  return jToken;
}

public static bool ConvertLocalToUtc(this JToken token, TimeZoneInfo localTimeZone, bool wasModified = false)
{
  var modified = wasModified;
  if (token.Type == JTokenType.Object)
  {
    modified = ConvertLocalToUtcForObject(token, localTimeZone, wasModified, modified);
  }
  else if (token.Type == JTokenType.Array)
  {
    modified = ConvertLocalToUtcForArray(token, localTimeZone, wasModified, modified);
  }
  return modified;
}

private static bool ConvertLocalToUtcForObject(JToken token, TimeZoneInfo localTimeZone, bool wasModified, bool modified)
{
  foreach (var prop in token.Children<JProperty>())
  {
    var child = prop.Value;
    if (child is JValue jValue)
    {
      var value = ParseJsonValueForDateTime(jValue.Value);
      if (value is DateTime)
      {
        var local = (DateTime)value;
        var utc = TimeZoneInfo.ConvertTimeToUtc(local, localTimeZone);
        jValue.Value = utc;
        modified = true;
      }
    }
    else if (child.HasValues)
    {
      modified = child.ConvertLocalToUtc(localTimeZone, wasModified) || modified;
    }
  }

  return modified;
}

private static bool ConvertLocalToUtcForArray(JToken token, TimeZoneInfo localTimeZone, bool wasModified, bool modified)
{
  foreach (var item in token.Children())
  {
    var child = item;
    if (child.HasValues)
    {
      modified = child.ConvertLocalToUtc(localTimeZone, wasModified) || modified;
    }
  }

  return modified;
}

Modify the Response Body Content (UTC to local)

The last part is the ModifyResponseContent to handle the response, which converts date/time values from UTC back to the client’s local time zone:

private MemoryStream ModifyResponseContent(
  HttpContext context,
  bool disableConversion,
  TimeZoneInfo localTimeZone,
  MemoryStream responseStream)
{
  // Rewind the unmodified response stream
  responseStream.Position = 0;
  var modified = false;

  // Will capture the unmodified response for time zone conversion
  var responseContent = default(string);

  // Only attempt to modify the response if time zone conversion is not disabled
  // and we have a local time zone that was used to modify the request
  if (!disableConversion && localTimeZone != null)
  {
    // Capture the unmodified response
    responseContent = new StreamReader(responseStream).ReadToEnd();

    // Try to get the JSON object from the response content
    var jobj = responseContent.TryDeserializeToJToken();

    // If the response content is a JSON object, convert all of it's date/time properties from local time to UTC
    if (jobj != null && jobj.ConvertUtcToLocal(localTimeZone))
    {
      responseContent = JsonConvert.SerializeObject(jobj);
      modified = true;
    }
  }

  // If no changes were made (i.e., there were no converted date/time properties),
  // use the original unmodified response
  if (!modified)
  {
    responseStream.Position = 0;
    context.Response.ContentLength = responseStream.Length;
    return responseStream;
  }

  // Write the changed response content to a new modified response stream
  var modifiedResponseStream = new MemoryStream();
  var sw = new StreamWriter(modifiedResponseStream);
  sw.Write(responseContent);
  sw.Flush();
  modifiedResponseStream.Position = 0;

  // Use the new modified response
  context.Response.ContentLength = modifiedResponseStream.Length;
  return modifiedResponseStream;
}

Here, we capture the unmodified response, attempt to deserialize its JSON, and if that succeeds, we convert all of its date/time values from UTC to local. We then return the modified response stream, unless no date/time values at all were present in the response, in which case we return the original unmodified response stream.

The actual time zone conversion happens inside the ConvertUtcToLocal extension method:

private static bool ConvertUtcToLocalForObject(
  JToken token,
  TimeZoneInfo localTimeZone,
  bool wasModified,
  bool modified)
{
  foreach (var prop in token.Children<JProperty>())
  {
    var child = prop.Value;
    if (child is JValue jValue)
    {
      var value = ParseJsonValueForDateTime(jValue.Value);
      if (value is DateTime)
      {
        var utc = (DateTime)value;
        // Only convert if Kind is unspecified and the property name
        // does not end in "Date" (i.e., it's a date/time, not just a date)
        if (utc.Kind == DateTimeKind.Unspecified && !prop.Name.EndsWith("Date"))
        {
          var tz = TimeZoneInfo.FindSystemTimeZoneById(localTimeZone.Id);
          var local = TimeZoneInfo.ConvertTimeFromUtc(utc, tz);
          jValue.Value = local;
          modified = true;
        }
      }
      else if (prop.Name.EndsWith("Json") && value is string)
      {
        // Also handle JSON "embedded" in the response; i.e., string properties that contain JSON
        var stringValue = value.ToString();
        var embeddedJObject = stringValue.TryDeserializeToJToken();
        if (embeddedJObject != null)
        {
          if (embeddedJObject.ConvertUtcToLocal(localTimeZone))
          {
            jValue.Value = JsonConvert.SerializeObject(embeddedJObject);
            modified = true;
          }
        }
      }
    }
    else if (child.HasValues)
    {
      modified = child.ConvertUtcToLocal(localTimeZone, wasModified) || modified;
    }
  }

  return modified;
}

private static bool ConvertUtcToLocalForArray(JToken token, TimeZoneInfo localTimeZone, bool wasModified, bool modified)
{
  foreach (var item in token.Children())
  {
    var child = item;
    if (child.HasValues)
    {
      modified = child.ConvertUtcToLocal(localTimeZone, wasModified) || modified;
    }
  }

  return modified;
}

private static object ParseJsonValueForDateTime(object value)
{
  // If a date/time value includes seconds, it will be cast as a DateTime automatically
  // But if it's missing seconds, it will be treated as a string that we'll have to convert to a DateTime

  if (value is string)
  {
    var stringValue = value.ToString();

    if (stringValue.FromDateTimeIsoString(out DateTime dateTimeValue))
    {
      value = dateTimeValue;
    }
  }

  return value;
}

And there you have it. With this framework in place, you can enjoy automatic time zone conversion between local client time zones and UTC built right into your API, complete with proper Daylight Savings Time adjustments when needed.

I hope this helps you cope with time zone concerns in your own applications, so you can focus on the things that really matter, like core business logic.

As always, happy coding!

Transactions in Azure Cosmos DB with the .NET SDK

Introduction

Azure Cosmos DB supports transactions, which means you can run a set of insert, update, and delete operations as one single operation, and they’re all guaranteed to either succeed or fail together. This is of course, quite different than bulk execution (see my previous post), where each individual operation succeeds or fails independently. The traditional technique for implementing transactions in Cosmos DB is to write a server-side stored procedure in JavaScript to perform the updates. But with transactional batch in the .NET SDK, you can implement transactions with C# right inside your client application.

Transactions are supported per partition key, which means that they are scoped to a single logical partition. A transaction cannot span multiple partition keys. So you need to supply the partition key for the transaction, and then you can insert, update, and delete documents within that partition key, inside the transaction. You just batch up your operations in a single TransactionBatch object, and then the SDK ships it off in a single request to Cosmos DB where it runs within a transaction that succeeds or fails as a whole.

Demo: Transactional Batch

In this example, we want to update a customer document and a customer order document, as a single transaction. In our project, we have a shared class that provides us with a CosmosClient from the .NET SDK:

public static class Shared
{
    public static CosmosClient Client { get; private set; }

    static Shared()
    {
        var config = new ConfigurationBuilder().AddJsonFile("appsettings.json").Build();
        var endpoint = config["CosmosEndpoint"];
        var masterKey = config["CosmosMasterKey"];

        Client = new CosmosClient(endpoint, masterKey);
    }
}

First let’s create a container with 400 RUs of throughput, which I’ll partition on customer ID:

var containerDef = new ContainerProperties
{
    Id = "batchDemo",
    PartitionKeyPath = "/customerId",
};

var database = Shared.Client.GetDatabase("myDatabase");
await database.CreateContainerAsync(containerDef, 400);

So we’re storing a customer document and all that customer’s order documents, in the same logical partition, using customer ID. And so that means we can update all those documents together, in one transaction. This illustration shows the grouping of documents within a single logical partitioned (dashed blue lines), with logical partitions spread across two physical partitions:

Next, let’s create a customer with some orders. We’ll give this customer an ID of cust1, and then create a Customer object and two Order objects.

var newCustomerDoc = new Customer { Id = "cust1", CustomerId = "cust1", Name = "John Doe", OrderCount = 2 };
var newOrderDoc1 = new Order { Id = "order1", CustomerId = "cust1", Item = "Surface Pro", Quantity = 1 };
var newOrderDoc2 = new Order { Id = "order2", CustomerId = "cust1", Item = "Surface Book", Quantity = 4 };

The Customer and Order classes are simple POCOs that I’ve defined for this demo, with JsonProperty attributes to convert our properties to camelCase for the documents stored in Cosmos DB:

public class Customer
{
    [JsonProperty("id")]
    public string Id { get; set; }
    [JsonProperty("customerId")]
    public string CustomerId { get; set; }
    [JsonProperty("name")]
    public string Name { get; set; }
    [JsonProperty("orderCount")]
    public int OrderCount { get; set; }
}

public class Order
{
    [JsonProperty("id")]
    public string Id { get; set; }
    [JsonProperty("customerId")]
    public string CustomerId { get; set; }
    [JsonProperty("item")]
    public string Item { get; set; }
    [JsonProperty("quantity")]
    public int Quantity { get; set; }
}

Both objects have an ID of course, as well as the customer ID which is the partition key. The new customer document has the same value for both its ID and customer ID, while the two new order documents have IDs that are unique to each order belonging to the same customer (order1 and order2 in this case). And of course, all three documents have the same partition key in the customer ID property, cust1.

Now that we’ve got our three objects, we call CreateTransactionalBatch on the container, supply the partition key, and chain one CreateItem method after another for each of the three documents:

var container = Shared.Client.GetContainer("myDatabase", "batchDemo");

var batch = container.CreateTransactionalBatch(new PartitionKey("cust1"))
    .CreateItem(newCustomerDoc)
    .CreateItem(newOrderDoc1)
    .CreateItem(newOrderDoc2);

Alright, that gives us a transactional batch, that we can now execute by calling passing it to ExecuteBatch:

await ExecuteBatch(batch);

Here’s the code for ExecuteBatch, and you can see that it’s pretty simple:

private static async Task ExecuteBatch(TransactionalBatch batch)
{
    var batchResponse = await batch.ExecuteAsync();

    using (batchResponse)
    {
        if (batchResponse.IsSuccessStatusCode)
        {
            Console.WriteLine("Transcational batch succeeded");
            for (var i = 0; i < batchResponse.Count; i++)
            {
                var result = batchResponse.GetOperationResultAtIndex<dynamic>(i);
                Console.WriteLine($"Document {i + 1}:");
                Console.WriteLine(result.Resource);
            }
        }
        else
        {
            Console.WriteLine("Transcational batch failed");
            for (var i = 0; i < batchResponse.Count; i++)
            {
                var result = batchResponse.GetOperationResultAtIndex<dynamic>(i);
                Console.WriteLine($"Document {i + 1}: {result.StatusCode}");
            }
        }
    }
}

We just call ExecuteAsync on the batch, and check the response for success. Remember, it’s an all or nothing transaction, so we’ll either have three new documents, or no new documents. In this case, the transaction succeeds, so we can iterate the response and call GetOperationResultAtIndex to get back each new document. Here in the container, you can see the three new documents.

Notice that we’ve got an orderCount property in the customer document, showing two orders for John Doe. This right here is a bit of denormalization, where we always know how many orders a customer has without having to run a separate count query on their order documents. We’ll always increment this orderCount in the customer document, any time we create a new order for that customer, so it’s really important to make sure that those operations are always performed using a transaction.

Let’s create a third order for this customer. First, we’ll read the customer document by calling ReadItemAsync, which needs the ID and partition key for the customer – and again, for a customer document, that’s the same value for both, cust1. Then we’ll increment that orderCount property (we know that current value is 2 right now, so this increments it to 3):

var container = Shared.Client.GetContainer("myDatabase", "batchdemo");
var result = await container.ReadItemAsync<Customer>("cust1", new PartitionKey("cust1"));
var existingCustomerDoc = result.Resource;
existingCustomerDoc.OrderCount++;

Finally, we’ll create the new order document for three Surface mice.

var newOrderDoc = new Order { Id = "order3", CustomerId = "cust1", Item = "Surface Mouse", Quantity = 3 };

OK, let’s create another transactional batch. This time we’re doing a ReplaceItem for the existing customer document, along with a CreateItem for the new order document.

var batch = container.CreateTransactionalBatch(new PartitionKey(customerId))
    .ReplaceItem(existingCustomerDoc.Id, existingCustomerDoc)
    .CreateItem(newOrderDoc);

Now let’s call ExecuteBatch once again to run this transaction.

await ExecuteBatch(batch);

This batch also succeeds. And back in the portal, sure enough, the order count in the customer document is now three, to match the three order documents:

Alright, now let’s make the transaction fail with a bad order.

Once again, we get the customer document by calling ReadItemAsync with the same customer ID for the document’s ID and partition key. And then, once again, increment the orderCount, this time changing it from 3 to 4.

var container = Shared.Client.GetContainer("myDatabase", "batchdemo");
var result = await container.ReadItemAsync<Customer>("cust1", new PartitionKey("cust1"));
var existingCustomerDoc = result.Resource;
existingCustomerDoc.OrderCount++;

Now for new order:

var newOrderDoc = new Order { Id = "order3", CustomerId = "cust1", Item = "Surface Mouse", Quantity = 3 };

It’s for two Surface keyboards, but the order ID order3 is a duplicate of an existing order document in the container. Order ID’s must be unique within each customer’s logical partition, so this is a bad order. Meanwhile, we’ve already incremented the customer’s order count to four, let’s see what happens now when we wrap this up inside another transactional batch. Of course, this is the same code we used to create the previous order, but this time we’re expecting the transaction to fail:

var batch = container.CreateTransactionalBatch(new PartitionKey(customerId))
    .ReplaceItem(existingCustomerDoc.Id, existingCustomerDoc)
    .CreateItem(newOrderDoc);

await ExecuteBatch(batch);

Now when we execute the batch, the response tells us that the transaction was unsuccessful, and returns the StatusCode to tell us what went wrong:

Transactional batch failed
Document 1: FailedDependency
Document 2: Conflict

This shows us that there was really nothing wrong with the first operation to replace the customer document with an incremented order count. But the second operation to create the new order failed with a conflict, as we expected, and so that first operation also failed as a dependency.

If you jumping over once more to the data explorer, you can see that there’s been no change at all to the container – the customer’s order count is still three, and there are still only three orders.

That’s it for transactional batch. As you can see, it’s pretty simple to implement transactions in your client application using the .NET SDK, which provides a nice alternative to using JavaScript stored procedures to achieve the same functionality.

Enjoy, and happy coding!

Azure Cosmos DB Bulk Execution with the .NET SDK

Introduction

Using the .NET SDK, it’s fast and easy to store individual documents in an Azure Cosmos DB container, where typically, each write operation completes in under 10 milliseconds.

However, the .NET SDK can also support bulk operations, for scenarios where you need to load large volumes of data, with as much throughput as possible. Like, imagine you need to dump two million documents into a container – that’s bulk. And while bulk inserts are most common, bulk updates and deletes are also supported. On the other hand, if you’re running individual point operations that need to complete as quickly as possible, then it would not be appropriate to issue those using bulk execution.

It’s very easy to enable bulk execution in your applications. First, set AllowBulkExecution to true in your Cosmos client constructor. Then, populate a list of tasks, one for each operation. Finally, you just run Task.WhenAll on the list, and the rest just happens like magic.

For example, you create a single document by calling await CreateItemAsync. Well, if you’ve got a thousand documents to create, you just create a list of a thousand tasks. Then you call the same CreateItemAsync method on each, only without the await keyword – and that returns a task for the operation, without running it. Finally, calling await Task.WhenAll on the entire list executes the bulk operation much more efficiently than doing it one at a time, and as you’ll see in this post, yields dramatic performance gains.

You’ll also need to handle exceptions of course, since each task is still an individual operation that can succeed or fail on its own. For a single insert, we’d use a typical try/catch block, but there’s a different pattern for bulk execution. When you add each task to the list, task on a call to ContinueWith, and that lets you run any code you want after each task completes. Instead of a try/catch, you’ll get handed back the same task object that just ran. And that object has a Status property, which you can test for Faulted, meaning that an unhandled exception occurred causing the task to fail. In that case, you can get the exception from the task’s Exception property, and handle it any way that you need to.

Again, it all just happens like magic, thanks to the .NET SDK, which – internally – groups concurrent operations on the client, and then dispatches a single request for each group. As a result, the client issues far fewer requests then actual documents – which yields far greater performance for bulk operations. Let’s see bulk execution in action.

Generating Documents

To demonstrate, we’ll use a test container that has throughput provisioned at 10,000 RUs (request units) a second, which is fairly high. Issuing individual write operations one at a time on a single thread, we’d never come anywhere near utilizing all that throughput. But with bulk execution, the .NET SDK will attempt to saturate all that available throughput.

First, let’s write a GenerateItems method to create an array of documents in memory for writing to the container:

private static Item[] GenerateItems(int count)
{
var items = new Item[count];
for (var i = 0; i < count; i++)
{
var id = Guid.NewGuid().ToString();
items[i] = new Item
{
id = id,
pk = id,
username = $"user{i}"
};
}

// Simulate a duplicate to cause an error inserting a document
items[1].id = items[0].id;
items[1].pk = items[0].pk;

return items;
}

All we’re doing here is creating an array of Item objects, which have ID, PK (partition key), and Username properties. And to help test our exception handling, we’ll simulate a duplicate by copying the ID and PK values from the first element into the second element. This means that second document will fail, because you can’t have two documents with the same ID and partition key in one container. So if we generate 100 documents for example, we should really expect only 99 documents the get created successfully.

Without Bulk Execution

We’ll load documents into the container twice – first, one at a time, and then again using bulk execution. Here’s the code to perform the writes one at a time (not using bulk execution):

var items = GenerateItems(count);
var cost = 0D;
var errors = 0;
var started = DateTime.Now;
var container = Shared.Client.GetContainer("adventure-works", "bulkdemo");

foreach (var item in items)
{
try
{
var result = await container.CreateItemAsync(item, new PartitionKey(item.pk));
cost += result.RequestCharge;
}
catch (Exception ex)
{
Console.WriteLine($"Error creating document: {ex.Message}");
errors++;
}
}

Console.WriteLine($"Created {count - errors} documents (non-bulk): {cost:0.##} RUs in {DateTime.Now.Subtract(started)}");

This code loops through the items array, and creates a document in Cosmos DB for each item. For each item, we simply await on a call to CreateItemAsync for each document, and check for errors using a try/catch. So let’s run that for 100 documents.

We see the Conflict (409) error caused by the duplicate and displayed by our catch block. And when it’s done, we can see that we successfully inserted only 99 documents, in about 10 seconds, at a cost of 622 RUs.

With Bulk Execution

OK, now let’s do the same thing using bulk execution. First, remember we need to enable bulk execution in our Cosmos client, which is defined by a shared Client property:

public static class Shared
{
public static CosmosClient Client { get; private set; }

static Shared()
{
var config = new ConfigurationBuilder().AddJsonFile("appsettings.json").Build();
var endpoint = config["CosmosEndpoint"];
var masterKey = config["CosmosMasterKey"];

Client = new CosmosClient(endpoint, masterKey,
new CosmosClientOptions { AllowBulkExecution = true });
}
}

Here you can see that, inside the Cosmos client constructor, we’re setting AllowBulkExecution to true.

With bulk execution enabled, we now instantiate and populate a list of tasks:

var items = GenerateItems(count);
var cost = 0D;
var errors = 0;
var started = DateTime.Now;
var container = Shared.Client.GetContainer("adventure-works", "bulkdemo");
var tasks = new List<Task>(count);

foreach (var item in items)
{
var task = container.CreateItemAsync(item, new PartitionKey(item.pk));
tasks.Add(task
.ContinueWith(t =>
{
if (t.Status == TaskStatus.RanToCompletion)
{
cost += t.Result.RequestCharge;
}
else
{
Console.WriteLine($"Error creating document: {t.Exception.Message}");
errors++;
}
}));
}
await Task.WhenAll(tasks);

Console.WriteLine($"Created {count - errors} documents (bulk): {cost:0.##} RUs in {DateTime.Now.Subtract(started)}");

For each document, we call CreateItemAsync as before, only without the await. This gives us the task for the document, which we add to the list, and call ContinueWith so we can check on the success of each individual operation. If the task isn’t faulted, we track the RU charge, otherwise and exception occurred. So the else block inside ContinueWith is kind of your catch block when you’re doing bulk execution. The faulted task gives us the actual exception that occurred in its Exception property, which we just write out to the console.

Running this code, you can see how the bulk execution dramatically boosts performance:

We see the same single failure for the duplicate document, with 99 documents being created in just .14 seconds. Also notice that the bulk operations also lowered the throughput cost slightly, down from 622 RUs to 591.

That’s .14 seconds using bulk execution, compared to 10 seconds without.

Now let’s run the demo again, this time with 1,000 documents:

Well, there’s the same error for that duplicate, as the documents get created one at a time, until finally, we see that 999 documents got created in a minute and 40 seconds, at a cost of 6,283 RUs.

Finally, let’s compare that to bulk execution for another 1,000 documents:

Hard to believe, but we just did the same thing in about half a second, again at a slightly lower cost of about 6,088 RUs.

As you can see, the performance gains are dramatic, so make sure to leverage bulk execution when you need to load large amounts of data into Cosmos DB from your .NET applications.

Happy coding!

 

Data Modeling and Partitioning Patterns in Azure Cosmos DB

Introduction

Many of us are familiar with relational databases like SQL Server and Oracle. But Azure Cosmos DB is a NoSQL (non-relational) database – which is very different, and there are new ways to think about data modeling. In this post, we’ll use a familiar real-world relational data model and refactor it as a non-relational data model for Azure Cosmos DB.

First, there are many ways to describe Azure Cosmos DB, but for our purposes, we can define it as having two primary characteristics: it is horizontally scalable, and it is non-relational.

Horizontally scalable

In Azure Cosmos DB, you store data in a single logical container. But behind the container, Azure Cosmos DB manages a cluster of servers, and distributes the workload across multiple physical machines. This is transparent to us – we never worry about these back-end servers – we just work with the one container. Azure Cosmos DB, meanwhile, automatically maintains the cluster, and dynamically adds more and more servers as needed, to accommodate your growth. And this is the essence of horizontal scale.

Diagram of a database container distributing data across many machines, enabling horizontal scale

Each server has its own disk storage and CPU, just like any machine. And that means that – effectively – you get unlimited storage and unlimited throughput. There’s no practical limit to the number of servers in the cluster, meaning no limit on disk space or processing power for your Cosmos DB container.

Non-relational

Think about how things work in the relational world, where we store data in rows. We focus on how those rows get joined along primary and foreign keys, and we rely on the database to enforce constraints on those keys.

But in the non-relational world, we store data in documents – that is JSON documents. Now there’s certainly nothing you can store in a row that you can’t store in a JSON document, so you could absolutely design a highly normalized data model with documents that reference other documents on some key, like so:

multiple documents in a row with arrows connecting them

Unfortunately, this results in a very inefficient design for Azure Cosmos DB. Why? Because again, Azure Cosmos DB is horizontally scalable, where documents that you write to a container are very likely to be stored across multiple physical servers behind the scenes:

A database container with multiple documents and servers underneath it

Although it is technically possible to enforce relational constraints across a cluster of servers, doing so would have an enormous negative impact on performance. And well, “speed and performance” is the name of the game in Azure Cosmos DB, with comprehensive SLAs on availability, throughput, latency, and consistency. Reads and writes have extremely low, single-digit millisecond latency – meaning that they complete within 9 milliseconds or less in most cases. So, in order to deliver on these performance guarantees, Azure Cosmos DB simply doesn’t support the concept of joins, and can’t enforce relational constraints across documents.

Suitable for relational workloads?

With no joins and no relational constraints, the obvious question becomes: “Is Azure Cosmos DB suitable for relational workloads?” And the answer is, yes, of course it is. Otherwise, the post would end right here.

And when you think about it, most real-world use cases are relational. But because Azure Cosmos DB is horizontally scalable and non-relational, we need to use different techniques to materialize relationships between entities. And this means a whole new approach to designing your data model. In some cases, the new methods are radically different, and run contrary to best practices that some of us have been living by for decades.

Fortunately, while it is very different, it’s not really very difficult. It’s just that, again, you need to think about things differently when designing your data model for Azure Cosmos DB.

WebStore relational model

Our sample database is for an e-commerce web site that we’re calling WebStore. Here is the relational data model for WebStore in SQL Server:

Example of a relational database model in SQL Server

This data model is relatively small, but still representative of a typical production model. It has one-to-many relationships, like the ones from Customer to CustomerAddress and SalesOrder. There is also a one-to-one relationship from Customer to CustomerPassword, and the ProductTags table implements a many-to-many relationship between Product and ProductTag.

Container per table?

It’s very natural at first to think of a container in Azure Cosmos DB like a table. Your first instinct may be to say, OK, we have nine tables in our data model, let’s create nine containers in Azure Cosmos DB.

Now you can certainly do this, but again, this would be the worst possible design, being horizontally scalable and non-relational. Azure Cosmos DB will expose no way to join documents, or to enforce relational constraints between them. Therefore, this approach will not only perform poorly, but it will be very difficult to maintain and program against.

What’s the answer? Let’s get there one step at a time.

Embed vs. reference

Let’s start with customers and their related entities. JSON is hierarchical, so we don’t need separate documents for every type. So now think about the distinction between one-to-many and one-to-few. It’s reasonable to impose an upper limit on the number of addresses a customer can have, and there’s only one password per customer, so we could combine all of those into a single document:

Example data document with a user's information including ID, title, first name, and more

This has immediately solved the problem of joining between customers and their addresses, and passwords, because that’s all “pre-joined” by embedding the one-to-few relationship for addresses as a nested array, and the one-to-one relationship for the password as an embedded object. Simply by embedding, we’ve reduced three relational tables to a single customer document.

On the other hand, we would certainly not want an upper limit on the number of sales orders per customer – ideally, that is unbounded (while the maximum document size is 2 MB). The orders will be stored in separate documents, referenced by customer ID.

The rules for when to embed and when to reference are simple, as we’re demonstrating. One-to-few and one-to-one relationships often benefit from embedding, while one-to-many (particularly unbounded) and many-to-many relationships require that you reference. Embedding is also useful when all the entities are typically queried and/or updated together (for example, a customer profile with addresses and password), while it’s usually better to separate entities that are most often queried and updated separately (such as individual sales orders).

The next – and arguably most important – step is to choose a partition key for the customer document. Making the right choice requires that you understand how partitioning works.

Understanding partitioning

When you create a container, you supply a partition key. This is some value in your documents that Azure Cosmos DB groups documents together by in logical partitions. Each server in the cluster is a physical partition that can host any number of logical partitions, each of which in turn stores any number of documents with the same partition key value. Again, we don’t think about the physical partitions, we’re concerned primarily with the logical partitions that are based on the partition key that we select.

All the documents in a logical partition will always be stored on the same physical partition; a logical partition will never be spread across multiple servers in the cluster. Ideally, therefore, you want to choose a partition key whose value will be known for most of your typical queries. When the partition key is known, then Azure Cosmos DB can route your query directly to the physical partition where it knows all the documents that can possibly satisfy the query are stored. This is called a single-partition query.

If the partition key is now known, then it’s a cross-partition query (also often called a fan-out query). In this case, Azure Cosmos DB needs to visit every physical partition and aggregate their results into a single resultset for the query. This is fine for occasional queries, but adds unacceptable overhead for common queries in a heavy workload.

You also want a partition key that results in a uniform distribution of both storage and throughput. A logical partition can’t exceed 20 GB, but regardless, you don’t want some logical partitions to be huge and others very small. And from a throughput perspective, you don’t want some logical partitions to be heavily accessed for reads and writes, and not others. These “hot partition” situations should always be avoided.

Choosing a partition key

With this understanding, we can select a partition key for the customer documents that we’ll store in a customer container. The question is always the same: “What’s the most common query?” For customers, we most often want to query a customer by its ID, like so:

SELECT * FROM c WHERE c.id = ‘<custId>’

In this case, we want to choose the id property itself as the partition key. This means you’ll get only one document in every logical partition, which is fine. It’s desirable to use a partition key that yields a large spectrum of distinct values. You may have thousands of logical partitions for thousands of customers, but with only one document each, you will achieve a highly uniform distribution across the physical partitions.

We’ll take a very different approach for product categories. Users visiting the website will typically want to view the complete list of product categories. Then, they’ll want to query for all the product that belong to a category that interests them, which is essentially a query on the product category container with no WHERE clause. The problem though, is that would be a cross-partition query, and we want to get all our category documents using a single-partition query.

The trick here is to add another property called type to each product category document, and set its value to “category” in every document. Then we can partition the product category container on the type property. This would store all the category documents in a single logical partition, and the following query could retrieve them as a single-partition query:

SELECT * FROM c WHERE c.type = ‘category’

This same is true of tags; users will typically want a full list of tags and then drill to view the products associated with interesting tags. This is a typical pattern for short lookup lists that are often retrieved all at once. So that would be another container for product tags, partitioned on a type property where all the documents have the same value “tag” in that property, and then queried with:

SELECT * FROM c WHERE c.type = ‘tag’

Many-to-many relationships

Now for the many-to-many relationship between products and tags. This can be modeled by embedding an array of IDs on one side or the other; we could either store a list of tag IDs in each product, or a list of product IDs in each tag. Since there will be fewer products per tag than tags per product, we’ll store tag IDs in each product document, like so:

Once the user chooses a category, the next typical query would be to retrieve all the products in a given category by category ID, like so:

Example of a document with item details including ID, category ID, description, price, and more

SELECT * FROM c WHERE c.categoryId = ‘<catId>’

To make this a single-partition query, we want to partition the product contain on the product category ID, and that will store all the products for the same category in the same logical partition.

Introducing denormalization

Now we have a new challenge, because product documents hold just the category ID and an array of tag IDs – it doesn’t have the category and tag names themselves. And we already know that Azure Cosmos DB won’t join related documents together for us. So, if we want to display the category and tag names on the web page – which we do – then we need to run additional queries to get that information. First, we need to query the product category container to get the category name, and then – for each product in the category – we need to query the product tag container to get all the tag names.

We definitely need to avoid this, and we’ll solve the problem using denormalization. And that just means that – unlike in a normalized data model – we will duplicate information as necessary in order to make it more readily available for queries that need it. That means that we’ll store a copy of the category name, and copies of the tag names, in each related product document:

Example of a document for an item that includes multiple attributes and embedded details in the Tags attribute

Now we have everything we need to display about a product self-contained inside a single product document. And that will work great, until of course, there’s a category name or tag name is changed. Because now we need a way to cascade that name change to all the related copies in order to keep our data consistent.

Denormalizing with the change feed

This is a perfect situation for the Azure Cosmos DB change feed, which is a persistent log of all changes made to any container. By subscribing to the change feed on the category and tag containers, we can respond to updates and then propagate the change out to all related product documents so that everything remains in sync.

This can be achieved with a minimal amount of code, and implemented out-of-band with the main application by deploying the change feed code to run as an Azure function:

image of three database containers

Any change to a category or tag name triggers and Azure function to update all related product documents. This lets us maintain a denormalized model that’s optimized to retrieve all relevant information about a product with one single-partition query.

Combining different types

The last part of our schema are the customer orders and order details. First, we’ll embed the details into each order as a single document for the sales order container, because that’s another one-to-few relationship between entities that are typically retrieved and updated together.

It will be very common for customers to retrieve their orders using the following query:

SELECT * FROM c WHERE c.customerId = ‘<custId>’

That makes the customer ID the best choice for the partition key. But before we jump to create another container for sales orders, remember that we’re also partitioning customers on the customer ID in the customer container. And unlike a relational database where tables have defined schemas, Azure Cosmos DB lets you mix different types of documents in the same container. And it makes sense to do that when those different types share the same partition key.

We’ll combine customer and sales order documents in the same customer container, which will require just a minor tweak to the customer document. We’ll need to add a customerId property to hold a copy of the customer ID in the id property. Then we can partition on customerId which will be present in both document types:

A database container called Customer with two documents showing customer details

Notice that we’ve also added a type property to distinguish between the two types of documents. So now, there is still only one customer document per logical partition, but each logical partition also includes the related orders for that customer. And this kind of gets us our joins back, because now we can retrieve a customer and all their related orders with the following single-partition query:

SELECT * FROM c WHERE c.id = ‘<custId>’

Denormalizing with a stored procedure

Let’s wrap up with one more query to retrieve our top customers; essentially, a list of customers sorted descending by order count. In the relational world, we would just run a SELECT COUNT(*) on the Order table with a GROUP BY on the customer ID, and then sort descending on that count.

But in Azure Cosmos DB, the answer is once again to denormalize. We’ll just add a salesOrderCount property to each customer document. Then our query becomes as simple as:

SELECT * FROM c WHERE c.type = ‘customer’ ORDER BY c.salesOrderCount DESC

Of course, we need to keep that salesOrderCount property in sync; each time we create a new sales order document, we also need to increment the salesOrderCount property in the related customer document. We could use the change feed like before, but stored procedures are a better choice when your updates are contained to a single logical partition.

In this case, the new sales order document is being written to the same logical partition as the related customer document. We can write a stored procedure in JavaScript that runs within the Azure Cosmos DB service which creates the new sales order document and updates the customer document with the incremented sales order count.

The big advantage here is that stored procedures in Azure Cosmos DB run as a transaction that succeeds or fails as a whole. Both write operations will need to complete successfully or they both roll back. This guarantees consistency between the salesOrderCount property in the customer document and the true number of related sales order documents in the same logical partition.

One last thing to mention is that this is a cross-partition query, unlike of our previous examples, which were all single-partition queries. Remember again that cross-partition queries aren’t necessarily evil, as long as they aren’t very common. In our case, this last query won’t run routinely on the website; it’s more like a “back office” query that an executive runs every now and again to find the top customers.

Summary

This post has walked you through the steps to refactor a relational data model as non-relational for Azure Cosmos DB. We collapsed multiple entities by embedding, and we support denormalization through the use of the change feed and stored procedures.

We also combined customer and sales order documents in the same container, because they are both partitioned on the same value (customer ID). To wrap up the design, we can also combine the product category and product tag documents in a single “product metadata” container, since they are both partitioned on the same type property.

That brings us to our final design:

three database containers: customer, product, and product metadata

Using a combination of non-relational modeling techniques, we’ve reduced nine tables into just three containers, where majority of queries run by the application are all scoped to a single logical partition.

Multiple Partition Keys in Azure Cosmos DB (Part 3) – Azure Functions with Change Feed Trigger

Welcome to part 3 in this three-part series on using the Azure Cosmos DB change feed to implement a synchronized solution between multiple containers that store the same documents, each with a different partition key.

Start with part 1 for an overview of the problem we’re trying to solve. In a nutshell, we’re using the change feed to synchronize two containers, so that changes made to one container are replicated to the other. Thus the two containers have the same data in them, but each one is partitioned by a different partition key to best suit one of two common data access patterns. Part 1 presented a solution that queries the change feed directly. In part 2, I showed you how to use the Change Feed Processor (CFP) library instead, which provides numerous advantages over the low-level approach in part 1.

This post concludes the three-part series, and shows how to deploy the solution to execute in a serverless cloud environment using Azure Functions.

Building the Azure Functions Project

As you learned in part 2, the benefits of using the CFP library are clear. But we still need a way to deploy our host to run in Azure. We could certainly refactor the CfpLibraryHost console application as a web job. But the simplest way to achieve this is by using Azure Functions with a Cosmos DB trigger.

If you’re not already familiar with Azure Functions, they let you write individual methods (functions), which you deploy for execution in a serverless environment hosted on Azure. The term “serverless” here means without also having to create an Azure app service to deploy your host.

In the current ChangeFeedDemos solution, create a new Azure Functions project and name it CfpLibraryFunction. Visual Studio will offer up a selection of “trigger” templates to choose from, including the Cosmos DB Trigger that we’ll be using. However, we’ll just choose the Empty template and configure the project manually:

Next, add the Cosmos DB WebJob Extensions to the new project from the NuGet package Microsoft.Azure.WebJobs.Extensions.CosmosDB:

Now create a new class named MultiPkCollectionFunction in the CfpLibraryFunction project with the following code:

using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;

namespace CosmosDbDemos.MultiPkCollection.ChangeFeedFunction
{
  public static class MultiPkFunction
  {
    private const string CosmosEndpoint = "https://<account-name>.documents.azure.com:443/";
    private const string CosmosMasterKey = "<account-key>";

    private static readonly DocumentClient _client;

    static MultiPkFunction()
    {
      _client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey);
    }

    [FunctionName("MultiPkCollectionFunction")]
    public static void Run(
      [CosmosDBTrigger(
        databaseName: "multipk",
        collectionName: "byCity",
        ConnectionStringSetting = "CosmosDbConnectionString",
        LeaseCollectionName = "lease")]
      IReadOnlyList<Document> documents,
      ILogger log)
    {
      foreach (var document in documents)
      {
        var jDocument = JsonConvert.DeserializeObject<JObject>(document.ToString());

        if (jDocument["ttl"] == null)
        {
          var stateCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byState");
          _client.UpsertDocumentAsync(stateCollUri, document).Wait();
          log.LogInformation($"Upserted document id {document.Id} in byState collection");
        }
        else
        {
          var stateDocUri = UriFactory.CreateDocumentUri("multipk", "byState", document.Id);
          _client.DeleteDocumentAsync(stateDocUri, new RequestOptions
          {
            PartitionKey = new PartitionKey(jDocument["state"].Value<string>())
          }).Wait();
          log.LogInformation($"Deleted document id {document.Id} in byState collection");
        }
      }
    }
  }
}

Note that this is all the code we need. While the CFP library host project required less code than querying the change feed directly, it still required additional code to build and host the processor – plus you need a full Azure app service to deploy in the cloud.

Using an Azure Function, it’s super lean. We have the same minimal business logic as before, and still leverage all the CFP library goodness as before, but being “serverless,” we’re ready to deploy this as a standalone function in the cloud, without creating a full Azure app service. Plus, you can test the Azure Function locally using Visual Studio and the debugger, so that you can deploy to the cloud with confidence. Folks, it just doesn’t get much better than this!

The CosmosDBTrigger binding in the signature of the Run method causes this code to fire on every notification that get raised out of the CFP library that’s watching the byCity container for changes. The binding tells the trigger to monitor the byCity collection in the multipk database, and to persist leases to the container named lease. It also references a connection string setting named CosmosDbConnectionString to obtain the necessary endpoint and master key, which we’ll configure in a moment.

The actual implementation of the Run method is virtually identical to the ProcessChangesAsync method in our CFP library host project from part 2. The only difference is that we log output using the ILogger instance passed into our Azure Function, rather than using Console.WriteLine to log output in our CFP library host project.

Setting the connection string

Next we need to set the connection string for the trigger that we are referencing in the signature of the Run method. Azure Functions support configuration files similar to the way appsettings.json, app.config, or web.config is used for typical .NET applications. For Azure Functions, this file is named local.settings.json. Open this file and add the CosmosDbConnectionString setting that the trigger will use to monitor the byCity collection for changes:

{
  "IsEncrypted": false,
  "Values": {
    "CosmosDbConnectionString": "AccountEndpoint=https://<account-name>.documents.azure.com:443/;AccountKey=<account-key>;",
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet"
  }
} 

Testing locally

It’s very easy to test and debug our code locally, before deploying it to Azure.

Let’s see it work by running the two projects ChangeFeedDirect and CfpLibraryFunction at the same time. The ChangeFeedDirect console app provides an interactive menu for creating the database with the byCity, byState, and lease containers, and also for inserting, updating, and deleting documents in byCity container. The CfpLibraryFunction app, meanwhile, sits and waits to be notified of changes as they occur in the byCity container, so that they can be synchronized to the byState container.

Start the first console app (ChangeFeedDirect) and run the DB command to drop and recreate the database with empty byCity and byState containers. Then run CL to create the lease container.

Without closing the first console app, start the second one (CfpLibraryFunction). To do this, right-click the CfpLibraryFunction project in Solution Explorer and choose Debug, Start New Instance. You will see a console app open up which hosts the local webjob for our Azure function:

Wait a few moments until the local Azure Functions host indicates that the application has started:

Back in the first console app (ChangeFeedDirect), run the CD command to create three documents in the byCity container. Then run the UD command to update a document, and the DD command to delete another document (by setting its ttl property). Do not run SC to manually sync with the direct change feed queries from part 1. Instead, sit back and watch it happen automatically with the CFP library under control of the local Azure Functions webjob host:

Before we deploy to Azure, let re-initialize the database. Back in the first console app (ChangeFeedDirect), run the DB command to drop and recreate the database with empty byCity and byState containers. Then run CL to create the lease container.

Deploying to Azure

Satisfied that our code is working properly, we can now deploy the Azure function to run in the cloud.

Right-click the CfpLibraryFunction project and choose Publish.

Select Create New and then click Publish.

Assign a name for the deployed function, and select the desired subscription, resource group, hosting plan, and storage account (or just accept the defaults):

Now click Create and wait for the deployment to complete.

Over in the Azure portal, navigate to All Resources, filter on types for App Services, and locate your new Azure Functions service:

Click on the App Service, and then click Configuration:

Next, click New Application Setting:

This is the cloud equivalent of the local.settings.json file, where you need to plug in the same CosmosDbConnection string setting that you provided earlier when testing locally:

Now click Update, Save, and close the application settings blade.

You’re all done! To watch it work, click on MultiPkCollectionFunction, scroll down, and expand the Logs panel:

Back in the ChangeFeedDirect console app, run the CD command to create three documents in the byCity container. Then run the UD command to update a document, and the DD command to delete another document (by setting its ttl property). Do not run SC to manually sync with the direct change feed queries from part 1. Instead, sit back and watch it happen automatically with the CFP library running in our Azure function. Within a few moments, the Azure function trigger fires, and our code synchronizes them to the byState container:

What’s Next?

A few more pointers before concluding:

Conclusion

This concludes the three-post series on using the Cosmos DB change feed to synchronize two containers with different partition keys over the same data. Thanks for reading, and happy coding!

Multiple Partition Keys in Azure Cosmos DB (Part 2) – Using the Change Feed Processor Library

Welcome to part 2 in this series of blog posts on using the Azure Cosmos DB change feed to implement a synchronized solution between multiple containers that store the same documents, each with a different partition key.

Start with part 1 for an overview of the problem we’re trying to solve. In a nutshell, we’re using the change feed to synchronize two containers, so that changes made to one container are replicated to the other. Thus the two containers have the same data in them, but each one is partitioned by a different partition key to best suit one of two common data access patterns. Part 1 presented a solution that queries the change feed directly. In this post, we’ll use the Change Feed Processor (CFP) library instead, which provides numerous advantages over the low-level approach we took in part 1.

What is the CFP Library?

For the solution we built in part 1 to consume the change feed at scale, much more work needs to be done. For example, the change feed on each partition key range of the container can be consumed concurrently, so we could add multithreading logic to parallelize those queries. Long change feeds can also be consumed in chunks, using continuation tokens that we could persist as a “lease,” so that new clients can resume consumption where previous clients left off. We also want the sync automated, so that we don’t need to poll manually.

Fortunately, the Change Feed Processor (CFP) library handles all these details for you. In most cases, unless you have very custom requirements, the CFP library is the way to go over directly querying the change feed yourself.

The CFP library automatically discovers the container’s partition key ranges and parallelizes change feed consumption across each of them internally. This means that you only consume one logical change feed for the entire container, and don’t worry at all about the individual change feeds behind the individual partition key ranges. The library relies on a dedicated “lease” container to persist lease documents that maintain state for each partition key range. Each lease represents a checkpoint in time which tracks continuation tokens for chunking long change feeds across multiple clients. Thus, as you increase the number of clients, the overall change feed processing gets divided across them evenly.

Using the lease container also means we won’t need to rely on our own “sync” document that we were using to track the last time we queried the change feed directly. Therefore, our code will be simpler than the previous solution from part 1, yet we will simultaneously inherit all the aforementioned capabilities that the CFP library provides.

Building on the ChangeFeedDemo solution we created in part 1, open the Program.cs file in the ChangeFeedDirect project. In the Run method, add a line to display a menu item for creating the lease container:

Console.WriteLine("CL - Create lease collection"); 

Further down in the method, add an “else if” condition for the new menu item that calls CreateLeaseCollection:

else if (input == "CL") await CreateLeaseCollection();

Now add the CreateLeaseCollection method:

private static async Task CreateLeaseCollection()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    var dbUri = UriFactory.CreateDatabaseUri("multipk");

    var partitionKeyDefinition = new PartitionKeyDefinition();
    partitionKeyDefinition.Paths.Add("/id");
    await client.CreateDocumentCollectionAsync(dbUri, new DocumentCollection
    {
      Id = "lease",
      PartitionKey = partitionKeyDefinition
    }, new RequestOptions { OfferThroughput = 400 });
  }

  Console.WriteLine("Created lease collection");
} 

This code creates a container named lease provisioned for 400 RUs per second. Also notice that the lease container uses the id property itself as its partition key.

Next, create the host project, which is essentially the client. When an instance of this host runs, it will listen for and consume changes across the entire container. Then, as you spin up additional instances of the host, the work to consume changes across the entire container will be distributed uniformly across all the host instances by the CFP library.

In the current ChangeFeedDemos solution, create a new .NET Core console application and name the project CfpLibraryHost. Next, add the CFP library to the new project from the NuGet package Microsoft.Azure.DocumentDB.ChangeFeedProcessor:

Replace all the code in Program.cs as follows:

using System;
using System.Threading.Tasks;

namespace CfpLibraryHost
{
  class Program
  {
    static void Main(string[] args)
    {
      var host = new ChangeFeedProcessorHost();
      Task.Run(host.RunProcessor).Wait();

      Console.ReadKey();
    }
  }
} 

This startup code initializes and runs the host. It then enters a wait state during which the host will be notified automatically by the CFP library whenever changes are available for consumption.

Next, to implement the host, create a new class named ChangeFeedProcessorHost with the following code:

using Microsoft.Azure.Documents.ChangeFeedProcessor;
using System;
using System.Threading.Tasks;

namespace CfpLibraryHost
{
  public class ChangeFeedProcessorHost
  {
    public async Task RunProcessor()
    {
      var monitoredCollection = new DocumentCollectionInfo
      {
        Uri = new Uri(Constants.CosmosEndpoint),
        MasterKey = Constants.CosmosMasterKey,
        DatabaseName = "multipk",
        CollectionName = "byCity",
      };

      var leaseCollection = new DocumentCollectionInfo
      {
        Uri = new Uri(Constants.CosmosEndpoint),
        MasterKey = Constants.CosmosMasterKey,
        DatabaseName = "multipk",
        CollectionName = "lease",
      };

      var builder = new ChangeFeedProcessorBuilder();

      builder
        .WithHostName($"Change Feed Processor Host - {Guid.NewGuid()}")
        .WithFeedCollection(monitoredCollection)
        .WithLeaseCollection(leaseCollection)
        .WithObserverFactory(new MultiPkCollectionObserverFactory());

      var processor = await builder.BuildAsync();
      await processor.StartAsync();

      Console.WriteLine("Started change feed processor - press any key to stop");
      Console.ReadKey();

      await processor.StopAsync();
    }
  }

} 

The host builds a processor configured to monitor the byCity container, using the lease container to track internal state. The processor also references an observer factory which, in turn, supplies an observer to consume the changes as they happen.

This code relies on defined constants for the URI and master key of the Cosmos DB account. Define them as follows in a new class called Constants.cs:

namespace CfpLibraryHost
{
  public class Constants
  {
    public const string CosmosEndpoint = "https://<account-name>.documents.azure.com:443/";
    public const string CosmosMasterKey = "<account-key>";
  }
} 

The observer factory is super-simple. Just create a class that implements IChangeFeedObserverFactory, and return an observer instance (where the real logic will go) that implements IChangeFeedObserver. Name the class MultiPkCollectionObserverFactory, as referenced by WithObserverFactory in the host.

using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;

namespace CfpLibraryHost
{
  public class MultiPkCollectionObserverFactory : IChangeFeedObserverFactory
  {
    public IChangeFeedObserver CreateObserver() =>
      new MultiPkCollectionObserver();
  }
} 

Finally, create the observer class itself. Name this class MultiPkCollectionObserver, as referenced by the CreateObserver method in the factory class:

using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
using Microsoft.Azure.Documents.Client;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace CfpLibraryHost
{
  public class MultiPkCollectionObserver : IChangeFeedObserver
  {
    private static DocumentClient _client;

    static MultiPkCollectionObserver() =>
      _client = new DocumentClient(new Uri(Constants.CosmosEndpoint), Constants.CosmosMasterKey);

    public async Task OpenAsync(IChangeFeedObserverContext context)
    {
      Console.WriteLine($"Start observing partition key range {context.PartitionKeyRangeId}");
    }

    public async Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
    {
      Console.WriteLine($"Stop observing partition key range {context.PartitionKeyRangeId} because {reason}");
    }

    public async Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> documents, CancellationToken cancellationToken)
    {
      foreach (var document in documents)
      {
        var jDocument = JsonConvert.DeserializeObject<JObject>(document.ToString());

        if (jDocument["ttl"] == null)
        {
          var stateCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byState");
          await _client.UpsertDocumentAsync(stateCollUri, document);
          Console.WriteLine($"Upserted document id {document.Id} in byState collection");
        }
        else
        {
          var stateDocUri = UriFactory.CreateDocumentUri("multipk", "byState", document.Id);
          await _client.DeleteDocumentAsync(stateDocUri, new RequestOptions
          {
            PartitionKey = new PartitionKey(jDocument["state"].Value<string>())
          });
          Console.WriteLine($"Deleted document id {document.Id} in byState collection");
        }
      }
    }
  }
} 

Our synchronization logic is embedded inside the ProcessChangesAsync method. The benefits of using the CFP library should really stand out now, compared to our solution from part 1. This code is significantly simpler yet dramatically more robust than the SyncCollections method we wrote in the ChangeFeedDirect project. We are no longer concerned with discovering and iterating partition key ranges, nor do we need to track the last sync time ourselves. Instead, the CFP library handles those details, plus it parallelizes the work and uniformly distributes large-scale processing across multiple client instances. Our code is now limited to just the business logic we require, which is the same logic as before: Upsert new or changed documents into the byState container as they occur in the byCity container, or delete them if their TTL property is set.

Let’s see it work. We’ll want to run both projects at the same time. The ChangeFeedDirect console app provides an interactive menu for creating the database with the byCity, byState, and lease containers, and also for inserting, updating, and deleting documents in byCity container. The CfpLibraryHost app, meanwhile, sits and waits to be notified of changes as they occur in the byCity container, so that they can be synchronized to the byState container.

Start the first console app (ChangeFeedDirect) and run the DB command to drop and recreate the database with empty byCity and byState containers. Then run CL to create the lease container.

Without closing the first console app, start the second one (CfpLibraryHost). To do this, right-click the CfpLibraryHost project in Solution Explorer and choose Debug, Start New Instance. The host fires up, and now just sits and waits for changes from the CFP library:

Started change feed processor - press any key to stop
Start observing partition key range 0

Back in the first console app (ChangeFeedDirect), run the CD command to create three documents in the byCity container. Then run the UD command to update a document, and the DD command to delete another document (by setting its ttl property). Do not run SC to manually sync with the direct change feed queries from part 1. Instead, sit back and watch it happen automatically with the CFP library. Within a few moments, the host is notified of the changes, and our observer class synchronizes them to the byState container:

Started change feed processor - press any key to stop
Start observing partition key range 0
Upserted document id 63135c76-d5c6-44a8-acd3-67dfc54faae6 in byState collection
Upserted document id 06296937-de8b-4d72-805d-f3120bf06403 in byState collection
Upserted document id 0b222279-06df-487e-b746-7cb347acdf18 in byState collection
Upserted document id 63135c76-d5c6-44a8-acd3-67dfc54faae6 in byState collection
Deleted document id 0b222279-06df-487e-b746-7cb347acdf18 in byState collection 

The beauty here is that the client does not need to manually poll the change feed for updates. And we can just spin up as many clients as needed to scale out the processing for very large containers with many changes.

What’s Next?

The benefits of using the CFP library are clear, but we still need to deploy our host to run in Azure. We could certainly deploy the CfpLibraryHost console application to a web app in App Service as an Azure WebJob. But the simplest way to achieve this is by using Azure Functions with a Cosmos DB trigger.

So tune in to part 3, where I’ll conclude this three-part series by showing you how to deploy our solution to Azure using Azure Functions.

Multiple Partition Keys in Azure Cosmos DB (Part 1) – Querying the Change Feed Directly

To begin, let’s be clear that an Azure Cosmos DB container can have only one partition key. I say this from the start in case “multiple partition keys” in the title is somehow misinterpreted to imply otherwise. You always need to come up with a single property to serve as the partition key for every container and choosing the best property for this can sometimes be difficult.

Understanding the Problem

Making the right choice requires intimate knowledge about the data access patterns of your users and applications. You also need to understand how horizontal partitioning works behind the scenes in terms of storage and throughput, how queries and stored procedures are scoped by partition key, and the performance implications of running cross-partition queries and fan-out queries. So there are many considerations to take into account before settling on the one property to partition a container on. I discuss all this at length in my previous blog post Horizontal Partitioning in Azure Cosmos DB.

But here’s the rub. What if, after all the analysis, you come to realize that you simply cannot settle on a single property that serves as an ideal partition key for all scenarios? Let’s say for example, from a write perspective, you find one property will best distribute writes uniformly across all the partitions in the container. But from a query perspective, you find that using the same partition key results in too much fanning out. Or, you might identify two categories of common queries, where it’s roughly 50/50; meaning, about half of all the queries are of one type, and half are of the other. What do you do if the two query categories would each benefit from different partition keys?

Your brain can get caught in an infinite loop over this until you wind up in that state of “analysis paralysis,” where you recognize that there’s just no single best property to choose as the partition key. To break free, you need to think outside the box. Or, let’s say, think outside the container. Because the solution here is to simply create another container that’s a complete “replica” of the first. This second container holds the exact same set of documents as the first but defines a different partition key.

I placed quotes around the word “replica” because this second container is not technically a replica in the true Cosmos DB sense of the word (where, internally, Cosmos DB automatically maintains replicas of the physical partitions in every container). Rather, it’s a manual replica that you maintain yourself. Thus, it’s your job to keep it in sync with changes when they happen in the first container, which is partitioned by a property that’s optimized for writes. As those writes occur in real time, you need to respond by updating the second collection, which is partitioned by a property that’s optimized for queries.

Enter Change Feed

Fortunately, change feed comes to the rescue here. Cosmos DB maintains a persistent record of changes for every container that can be consumed using the change feed. This gives you a reliable mechanism for retrieving changes made to any container, all the way back to the beginning of time. For an introduction to change feed, have a look at my previous blog post Change Feed – Unsung Hero of Azure Cosmos DB

In this three-part series of blog posts, I’ll dive into different techniques you can use for consuming the change feed to synchronize containers:

Let’s get started. Assume that we’ve done our analysis and established that city is the ideal partition key for writes, as well as roughly half of the most common queries our users will be running. But we’ve also determined that state is the ideal partition key for the other (roughly half) commonly executed queries. This means we’ll want one container partitioned by city, and another partitioned by state. And we’ll want to consume the city-partitioned container’s change feed to keep the state-partitioned container in sync with changes as they occur. We’ll then be able to direct our city-based queries to the first container, and our state-based queries to the second container, which then eliminates fan-out queries in both cases.

Setting Up

If you’d like to follow along, you’ll need to be sure your environment is setup properly. First, of course, you’ll need to have a Cosmos DB account. The good news here is that you can get a free 30-day account with the “try cosmos” offering, which doesn’t even require a credit card or Azure subscription (just a free Microsoft account). Even better, there’s no limit to the number of times you can start a new 30-day trial. Create your free account at http://azure.microsoft.com/try/cosmosdb.

You’ll need your account’s endpoint URI and master key to connect to Cosmos DB from C#. To obtain them, head over to your Cosmos DB account in the Azure portal, open the Keys blade, and keep it open so that you can handily copy/paste them into the project.

You’ll also need Visual Studio. I’ll be using Visual Studio 2019, but the latest version of Visual Studio 2017 is fine as well. You can download the free community edition at https://visualstudio.microsoft.com/downloads.

Querying the Change Feed Directly

We’ll begin with the raw approach, which is to query the change feed directly using the SDK. The reality is that you’ll almost never want to go this route, except for the simplest small-scale scenarios. Still, it’s worth taking some time to examine this approach first, as I think you’ll benefit from learning how the change feed operates at a low level, and it will enhance your appreciation of the Change Feed Processor (CFP) library which I’ll cover in the next blog post of this series.

Fire up Visual Studio, create a new .NET Core console application, and name the project ChangeFeedDirect, and name the solution ChangeFeedDemos (we’ll be adding more projects to this solution in parts 2 and 3 of this blog series). Next, add the SDK to the ChangeFeedDirect project from the NuGet package Microsoft.Azure.DocumentDB.Core:

We’ll write some basic code to create a database with the two containers, with additional methods to create, update, and delete documents in the first container (partitioned by city). Then we’ll write our “sync” method that directly queries the change feed on the first container, in order to update the second container (partitioned by state) and reflect all the changes made.

Note: Our code (and the SDK) refers to containers as collections.

We’ll write all our code inside the Program.cs file. First, update the using statements at the very top of the file to get the right namespaces imported:

using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

Next, at the very top of the Program class, set two private string constants for your Cosmos DB account’s endpoint and master key. You can simply copy them from the Keys blade in the Azure portal, and paste them right into the code:

private const string CosmosEndpoint = "https://<account-name>.documents.azure.com:443/";
private const string CosmosMasterKey = "<account-key>";

Now edit the Main method with one line to call the Run method:

static void Main(string[] args)
{
  Task.Run(Run).Wait();
}

This one-liner invokes the Run method and waits for it to complete. Add the Run method next, which displays a menu and calls the various methods defined for each menu option:

private static async Task Run()
{
  while (true)
  {
    Console.WriteLine("Menu");
    Console.WriteLine();
    Console.WriteLine("DB - Create database");
    Console.WriteLine("CD - Create documents");
    Console.WriteLine("UD - Update document");
    Console.WriteLine("DD - Delete document");
    Console.WriteLine("SC - Sync collections");
    Console.WriteLine("Q - Quit");
    Console.WriteLine();
    Console.Write("Selection: ");

    var input = Console.ReadLine().ToUpper().Trim();
    if (input == "Q") break;
    else if (input == "DB") await CreateDatabase();
    else if (input == "CD") await CreateDocuments();
    else if (input == "UD") await UpdateDocument();
    else if (input == "DD") await DeleteDocument();
    else if (input == "SC") await SyncCollections();
    else Console.WriteLine("\nInvalid selection; try again\n");
  }
}

We’ll add the menu option methods one at a time, starting with CreateDatabase:

private static async Task CreateDatabase()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    // Create the database

    var dbUri = UriFactory.CreateDatabaseUri("multipk");

    try { await client.DeleteDatabaseAsync(dbUri); } catch { }

    await client.CreateDatabaseAsync(new Database { Id = "multipk" });

    // Create the two collections

    var partitionKeyDefinition = new PartitionKeyDefinition();
    partitionKeyDefinition.Paths.Add("/city");
    await client.CreateDocumentCollectionAsync(dbUri, new DocumentCollection
    {
      Id = "byCity",
      PartitionKey = partitionKeyDefinition,
      DefaultTimeToLive = -1
    }, new RequestOptions { OfferThroughput = 400 });

    partitionKeyDefinition = new PartitionKeyDefinition();
    partitionKeyDefinition.Paths.Add("/state");
    await client.CreateDocumentCollectionAsync(dbUri, new DocumentCollection
    {
      Id = "byState",
      PartitionKey = partitionKeyDefinition
    }, new RequestOptions { OfferThroughput = 400 });

    // Load the sync document into the first collection

    var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");
    var syncDocDef = new
    {
      city = "sync",
      state = "sync",
      lastSync = default(DateTime?)
    };
    await client.CreateDocumentAsync(collUri, syncDocDef);
  }

  Console.WriteLine("Created database for change feed demo");
}

Most of this code is intuitive, even if you’ve never done any Cosmos DB programming before. We first use our endpoint and master key to create a DocumentClient, and then we use the client to create a database named multipk with the two containers in it.

This works by first calling DeleteDatabaseAsync wrapped in a try block with an empty catch block. This effectively results in “delete if exists” behavior to ensure that the multipk database does not exist when we call CreateDatabaseAsync to create it.

Next, we call CreateDocumentCollection twice to create the two containers (again, a collection is a container). We name the first container byCity and assign it a partition key of /city, and we name the second container byState and assign /state as the partition key. Both containers reserve 400 request units (RUs) per second, which is the lowest throughput you can provision.

Notice the DefaultTimeToLive = -1 option applied to the first container. At the time of this writing, change feed does not support deletes. That is, if you delete a document from a container, it does not get picked up by the change feed. This may be supported in the future, but for now, the TTL (time to live) feature provides a very simple way to cope with deletions. Rather than physically deleting documents from the first container, we’ll just update them with a TTL of 60 seconds. That gives us 60 seconds to detect the update in the change feed, so that we can physically delete the corresponding document in the second container. Then, 60 seconds later, Cosmos DB will automatically physically delete the document from the first container by virtue of its TTL setting. You’ll see all this work in a moment when we run the code.

The other point to call out is the creation of our sync document, which is a special metadata document that won’t get copied over to the second container. Instead, we’ll use it to persist a timestamp to keep track of the last time we synchronized the containers. This way, each time we sync, we can request the correct point in time from which to consume changes that have occurred since the previous sync. The document is initialized with a lastSync value of null so that our first sync will consume the change feed from the beginning of time. Then lastSync is updated so that the next sync picks up precisely where the first one left off.

Now let’s implement CreateDocuments. This method simply populates three documents in the first container:

private static async Task CreateDocuments()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

    var dataDocDef = new
    {
      city = "Brooklyn",
      state = "NY",
      slogan = "Kings County"
    };
    await client.CreateDocumentAsync(collUri, dataDocDef);

    dataDocDef = new
    {
      city = "Los Angeles",
      state = "CA",
      slogan = "Golden"
    };
    await client.CreateDocumentAsync(collUri, dataDocDef);

    dataDocDef = new
    {
      city = "Orlando",
      state = "FL",
      slogan = "Sunshine"
    };
    await client.CreateDocumentAsync(collUri, dataDocDef);
  }

  Console.WriteLine("Created 3 documents in city collection");
}

Notice that all three documents have city and state properties, where the city property is the partition key for the container that we’re creating these documents in. The state property is the partition key for the second container, where our sync method will create copies of these documents as it picks them up from the change feed. The slogan property is just an ordinary document property. And although we aren’t explicitly supplying an id property, the SDK will automatically generate one for each document with a GUID as the id value.

We’ll also have an UpdateDocument method to perform a change on one of the documents:

private static async Task UpdateDocument()
{
  var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    // Update a document
    var brooklynDoc = client
      .CreateDocumentQuery(collUri, "SELECT * FROM c WHERE c.city = 'Brooklyn'")
      .AsEnumerable()
      .FirstOrDefault();

    brooklynDoc.slogan = "Fuhgettaboutit! " + Guid.NewGuid().ToString();
    await client.ReplaceDocumentAsync(brooklynDoc._self, brooklynDoc);
  }

  Console.WriteLine("Updated Brooklyn document in city collection");
}

This code retrieves the Brooklyn document, updates the slogan property, and calls ReplaceDocumentAsync to persist the change back to the container.

Next comes the DeleteDocument method:

private static async Task DeleteDocument()
{
  var collUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    // Delete a document (set time-to-live at 60 seconds)
    var orlandoDoc = client
      .CreateDocumentQuery(collUri, "SELECT * FROM c WHERE c.city = 'Orlando'")
      .AsEnumerable()
      .FirstOrDefault();

    orlandoDoc.ttl = 60;
    await client.ReplaceDocumentAsync(orlandoDoc._self, orlandoDoc);
  }

  Console.WriteLine("Deleted Orlando document in city collection");
}

Remember that (currently) the change feed doesn’t capture deleted documents, so we’re using the TTL (time to live) technique to keep our deletions in sync. Rather than calling DeleteDocumentAsync to physically delete the Orlando document, we’re simply updating it with a ttl property set to 60 and saving it back to the container with ReplaceDocumentAsync. To the change feed, this is just another update, so our sync method will pick it up normally as you’ll see in a moment. Meanwhile, Cosmos DB will physically delete the Orlando document from the first container in 60 seconds, giving our sync method up to one minute to pick it up from the change feed and delete it from the second container.

And finally, the sync method, which is what this whole discussion is all about. Here’s the code for SyncCollections:

private static async Task SyncCollections()
{
  using (var client = new DocumentClient(new Uri(CosmosEndpoint), CosmosMasterKey))
  {
    var cityCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byCity");

    // Obtain last sync time

    var syncDoc = client
      .CreateDocumentQuery(cityCollUri, "SELECT * FROM c WHERE c.city = 'sync'")
      .AsEnumerable()
      .FirstOrDefault();

    var lastSync = (DateTime?)syncDoc.lastSync;

    // Step 1: Gather all the partition key ranges (physical partitions)

    var continuationToken = default(string);
    var partitionKeyRanges = new List<PartitionKeyRange>();
    var loop = true;

    while (loop)
    {
      var partitionKeyRange = await client
        .ReadPartitionKeyRangeFeedAsync(
          cityCollUri,
          new FeedOptions { RequestContinuation = continuationToken });

      partitionKeyRanges.AddRange(partitionKeyRange);
      continuationToken = partitionKeyRange.ResponseContinuation;
      loop = continuationToken != null;
    }

    // Step 2: Consume the change feed for each partition key range

    // (simple demo doesn't scale when continuation tokens are needed
    foreach (var partitionKeyRange in partitionKeyRanges)
    {
      var options = new ChangeFeedOptions
      {
        PartitionKeyRangeId = partitionKeyRange.Id,
        StartFromBeginning = (lastSync == null),
        StartTime = (lastSync == null ? null : lastSync),
      };

      var query = client.CreateDocumentChangeFeedQuery(cityCollUri, options);

      while (query.HasMoreResults)
      {
        var readChangesResponse = await query.ExecuteNextAsync();
        foreach (var changedDocument in readChangesResponse)
        {
          if (changedDocument.city != "sync")
          {
            if (JsonConvert.DeserializeObject(changedDocument.ToString())["ttl"] == null)
            {
              var stateCollUri = UriFactory.CreateDocumentCollectionUri("multipk", "byState");
              await client.UpsertDocumentAsync(stateCollUri, changedDocument);
              Console.WriteLine($"Upserted document id {changedDocument.id} in byState collection");
            }
            else
            {
              var stateDocUri = UriFactory.CreateDocumentUri("multipk", "byState", changedDocument.id);
              await client.DeleteDocumentAsync(stateDocUri, new RequestOptions
              {
                PartitionKey = new PartitionKey(changedDocument.state)
              });
              Console.WriteLine($"Deleted document id {changedDocument.id} in byState collection");
            }
          }
        }
      }
    }

    // Update last sync time
    syncDoc.lastSync = DateTime.UtcNow;
    await client.ReplaceDocumentAsync(syncDoc._self, syncDoc);
  }

}

Let’s break this down. First, we grab the last sync time from the sync document in the first container. Remember, this will be null the very first time we run this method. Then, we’re ready to query the change feed, which is a two-step process.

For step 1, we need to discover all the partition key ranges in the container. A partition key range is essentially a set of partition keys. In our small demo, where we have only one document each across three distinct partition keys (cities), Cosmos DB will host all three of these documents inside a single partition key range.

Although there is conceptually only one change feed per container, there is actually one change feed for each partition key range in the container. So step 1 calls ReadPartitionKeyRangeFeedAsync to discover the partition key ranges, with a loop that utilizes a continuation token from the response so that we retrieve all of the partition key ranges into a list.

Then, in step 2, we iterate the list to consume the change feed on each partition key range. Notice the ChangeFeedOptions object that we set on each iteration, which identifies the partition key range in PartitionKeyRangeId, and then sets either StartFromBeginning or StartTime, depending on whether lastSync is null or not. If it’s null (which will be true only on the very first sync), then StartFromBeginning will be set to true and StartTime will be set to null. Otherwise, StartFromBeginning gets set to false, and StartTime gets set to the timestamp from the last sync.

After preparing the options, we call CreateDocumentChangeFeedQuery that returns an iterator. As long as the iterator’s HasMoreResults property is true, we call ExecuteNextAsync on it to fetch the next set of results from the change feed. And here, ultimately, is where we plug in our sync logic.

Each result is a changed document. We know this will always include the sync document, because we’ll be updating it after every sync. This is metadata that we don’t need copied over to the second container each time, so we filter out the sync document by testing the city property for “sync.”

For all other changed documents, it now becomes a matter of performing the appropriate create, update, or delete operation on the second container. First, we check to see if there is a ttl property on the document. Remember that this is our indication of whether this is a delete or not. If the ttl property isn’t present, then it’s either a create or an update. In either case, we handle the change by calling UpsertDocumentAsync on the second container (upsert means “update or insert”).

Otherwise, if we detect the ttl property, then we call DeleteDocumentAsync to delete the document from the second container, knowing that Cosmos DB will delete its counterpart from the first container when the ttl expires.

Let’s test it out. Start the console app and run the DB (create database) and CD (create documents) commands. Then navigate to the Data Explorer in the Azure portal to verify that the database exists with the two containers, and that the byCity container has three documents in it, plus the sync document with a lastSync value of null indicating that no sync has yet occurred:

The byState container should be empty at this point, because we haven’t run our first sync yet:

Back in the console app, run the SC command to sync the containers. This copies all three documents from the first container’s change feed over to the second container, skipping the sync document which we excluded in our code:

Upserted document id 01501a99-e8df-4c3b-9892-ed2eadb81180 in byState collection
Upserted document id fb8d41ae-3aae-4892-bfe9-8c34bc8138d2 in byState collection
Upserted document id 5f5600f3-9f34-4a4d-bdb4-28061a5ab35a in byState collection

Returning to the portal, refresh the data explorer to confirm that the second container now has the same three documents as the first, although here they are partitioned by state rather than city:

Both containers are now in sync. Refresh the first container view, and you’ll see that the lastSync property has been changed from null to a timestamp from when the previous sync ran.

Run the UD command in the console app the update the first container with a change to the slogan property of the Brooklyn, NY document. Now run SC to sync the containers again:

Upserted document id 01501a99-e8df-4c3b-9892-ed2eadb81180 in byState collection

Refresh the second container view, and you’ll see that the slogan property has now been updated there as well.

Finally, run the DD command in the console app the delete the Orlando, FL document from the first container. Remember that this doesn’t actually delete the document, but rather updates it with a ttl property set to 60 seconds. Then run SC to sync the containers again:

Deleted document id 5f5600f3-9f34-4a4d-bdb4-28061a5ab35a in byState collection

You can now confirm that the Orlando, FL document is deleted from the second container, and within a minute (upon ttl expiration), you’ll see that it gets deleted from the first container as well.

However, don’t wait longer than a minute after setting the ttl before running the sync or you will run out of time. Cosmos DB will delete the document from the first container when the ttl expires, at which point it will disappear from the change feed and you will lose your chance to delete it from the second container.

What’s Next?

It didn’t take that much effort to consume the change feed, but that’s only because we have a tiny container with just a handful of changes, and we’re manually invoking each sync. To consume the change feed at scale, much more work needs to be done. For example, the change feed on each partition key range of the container can be consumed concurrently, so we could add multithreading logic to parallelize those queries. Long change feeds can also be consumed in chunks, using continuation tokens that we could persist as a “lease,” so that new clients can resume consumption where previous clients left off. We also want the sync automated, so that we don’t need to poll manually.

Fortunately, the Change Feed Processor (CFP) library handles all these details for you. It was certainly beneficial to start by querying the change feed directly, since exploring that option first is a great way to learn how change feed works internally. However, unless you have very custom requirements, the CFP library is the way to go.

So tune in to part 2, and we’ll see how much easier it is to implement our multiple partition key solution much more robustly using the CFP library.