Change Feed – Unsung Hero of Azure Cosmos DB

Introduction

Azure Cosmos DB is rapidly growing in popularity, and for good reason. Microsoft’s globally distributed, multi-model database service has massively scalable storage and throughput, provides a sliding scale for consistency, is fully and automatically indexed, and it exposes multiple APIs. Throw in a server-side programming model for ACID transactions with stored procedures, triggers, and user-defined functions, along with 99.999% SLAs on availability, throughput, latency, and consistency, and it’s easy to see why Cosmos DB is fast winning the hearts of developers and solution architects alike.

Yet still today, one of the most overlooked capabilities in Cosmos DB is its change feed. This little gem sits quietly behind every container in your database, watches for changes, and maintains a persistent record of them in the order they occur. This provides you with a reliable mechanism to consume a continuous and incremental feed of changes, as documents are actively written or modified in any container.

There are numerous use cases for this, and I’ll call out a few of the most common ones in a moment. But all of them share a need to respond to changes made to a Cosmos DB container. And the first thought that comes to the mind of a relational database developer is to use a trigger for this. Cosmos DB supports triggers as part of its server-side programming model, so it could be natural to think of using this feature to consume changes in real time when you need to.

Unfortunately, though, triggers in Cosmos DB do not fire automatically as they do in the relational world. They need to be explicitly referenced with each change in order to run, so they cannot be relied upon for capturing all changes made to a container. Furthermore, triggers are JavaScript-only, and they run in a bounded execution environment within Cosmos DB that is scoped to a single partition key. These characteristics further limit what triggers can practically accomplish in response to a change.

But with change feed, you’ve got a reliable mechanism for retrieving changes made to any container, all the way back to the beginning of time. You can write code (in your preferred language) that consumes the change feed to process it as needed and deploy that code to run on Azure. This paves an easy path for you to build many different solutions for many different scenarios.

Scenarios for Change Feed

Some of the more common use cases for change feed include:

  • Replicating containers for multiple partition keys
  • Denormalizing a document data model across containers
  • Triggering API calls for an event-driven architecture
  • Real time stream processing and materialized view patterns
  • Moving or archiving data to secondary data stores

Each of these deserves their own focused blog post (and will hopefully get one). For the broader context of this overview post, however, I’ll discuss them each at high level.

Replicating containers for multiple partition keys

One of the most (perhaps the most) important things you need to do when creating a container is to decide on an appropriate partition key – a single property in your data that the container will be partitioned by.

Now sometimes this is easy, and sometimes it is not. In order to settle on the correct choice, you need a clear understanding of how your data is used (written to and queried), and how horizontal partitioning works. You can read all about this in my previous blog post, Horizontal Partitioning in Azure Cosmos DB.

But what do you do when you can’t decide? What if there are two properties that make good choices, one for write performance, and another that’s better for query performance? This can lead to “analysis paralysis,” a scary condition that is fortunately and easily remedied using change feed.

All you do is create two containers, each partitioned by a different partition key. The first container uses a partition key that’s optimized for writes (it may also be appropriate for certain types of queries as well), while the second one uses a partition key optimized for most typical queries. Simply use change feed to monitor changes made to the container as the writes occur and replicate the changes out to the second container.

Your application then writes to the first container and queries from the second container, simple as that! I’ll show you a detailed walkthrough of exactly how to implement this using C# and the Change Feed Processor Library with Azure Functions in my next post.

Denormalizing a document data model across containers

Developers with a background in relational database design often struggle initially with the denormalized approach to data modeling in the NoSQL world of JSON documents. I personally empathize; from my own experience, I know that it can be difficult at first to embrace concepts that run contrary to deeply engrained practices that span decades of experience in the field.

Data duplication is a case in point, where this is considered a big no-no in the normalized world of relational databases with operational workloads. But with NoSQL, we often deliberately duplicate data in order to avoid expensive additional lookups. There is no concept of a JOIN in any NoSQL database engine, and we can avoid having to perform our own “manual” joins if we simply duplicate the same information across documents in different containers.

This is a somewhat finer-grained version of the previous scenario, which replicates entire documents between two containers. In this case, we have different documents in each container, with data fragments from changed documents in one container being replicated into other (related) documents in another container.

But how do you ensure that the duplicated data remains in sync as changes occur in the source container? Why, change feed of course! Just monitor the source container and update the target container. I’ll show you exactly how to do this in a future post.

Triggering API calls for an event-driven architecture

In this scenario, you source events to a set of microservices, each with a single responsibility. For instance, an ecommerce website with a large-scale order processing pipeline. The pipeline is broken up into a set of smaller microservices, each of which can be scaled out independently. Each microservice is responsible for a single task in the pipeline, such as calculating tax on each order, generating tax audit records, processing each order payment, sending orders off to a fulfillment center, and generating shipping notifications.

Thus, you potentially have N microservices communicating with up to N-1 other microservices, which adds significant complexity to the larger architecture. The design can be greatly simplified if, instead, all these microservices communicate through a bus; that is, a persistent event store. And Cosmos DB serves as an excellent persistent event store, because the change feed makes it easy to broker and source these events to each microservice. Furthermore, because the events themselves are persisted, the order processing pipeline itself is very robust and incredibly resilient to failures. You can also query and navigate the individual events, so that this data can be surfaced out through a customer care API.

Check out this link for a real-world case study of how Jet.com implemented an event-sourced architecture using the Cosmos DB change feed with hundreds of microservices: https://customers.microsoft.com/en-us/story/jet-com-powers-innovative-e-commerce-engine-on-azure-in-less-than-12-months. There is also a video on this from Microsoft Build 2018 here: https://channel9.msdn.com/Events/Build/2018/BRK3602

Real time stream processing and materialized view patterns

The change feed can also be used for performing real time stream processing and analytics. In the order processing pipeline scenario, for example, this would enable you to take all the events and materialize a single view for tracking the order status. You could then easily and efficiently present the order status through a user-facing API.

Other examples of so called “lambda architectures” include performing real time analytics on IoT telemetry or building a scoring leader board for a massively multiplayer online video game.

Moving or archiving data to secondary data stores

Another common scenario for using the change feed involves replicating data from Cosmos DB as your primary (hot) store to some other secondary (cold) data store. Cosmos DB is a superb hot store because it can sustain heavy write ingestion, and then immediately serve the ingested records back out to a user-facing API.

Over time as the volume of data mounts, however, you may want to offload older data to cold storage for archival. Once again, change feed is a wonderful mechanism to implement a replication strategy that does just that.

Consuming the Change Feed

So how do you actually work with the change feed? There are several different ways, and I’ll conclude this blog post by briefly explaining three of them. (Don’t worry, I’ll drill deeper into all three next post!)

Direct Access

First, you can query the change feed directly using the SDK. This raw approach works but is the hardest to implement at large scale. Essentially, you first need to discover all the container’s partitions, and then you query each of them for their changes. You also need to persist state metadata; for example, a timestamp for when the change feed was last queried, and a continuation token for each partition. Plus, you’ll want to optimize performance by spawning multiple tasks across different partitions so that they get processed in parallel.

If all this sounds like a lot of work, it is. Which is why you’ll almost certainly want to leverage the Change Feed Processor (CFP) library instead.

Change Feed Processor (CFP) Library

The CFP library provides a high-level abstraction over direct access that greatly simplifies the process of reading the change feed from all the different partitions of a container. This is a separate NuGet package that you pull into your project, and it handles all the aforementioned complexity for you. It will automatically persist state, track all the partitions of the container, and acquire leases so that you can scale out across many consumers.

To make this work, the CFP library persists a set of leases as documents in another dedicated Cosmos DB container. Then, when you spin up consumers, they attempt to acquire leases as they expire.

All you do is write an observer class that implements IChangeFeedObserver. The primary method of this interface that you need to implement is ProcessChangesAsync, which receives the change feed as a list of documents that you can process as needed. No partitions to worry about, no timestamps or continuation tokens to persist, and no scale-out needs to concern yourself with.

However, you still need to write your own host, and deploy the DLL with your observer class to an Azure app service. Although the process is straightforward, going with Azure Functions instead provides an even easier deployment model.

Azure Functions

The simplest way to consume the change feed is by using Azure Functions with a Cosmos DB trigger. If you’re not already familiar with Azure Functions, they let you write individual methods (functions), which you deploy for execution in a serverless environment hosted on Azure. The term “serverless” here means without also having to write a host and deploy an Azure app service to run your code.

Azure Functions are invoked by one of any number of triggers. Yes, Azure Functions also uses the term triggers, but unlike triggers in Cosmos DB, an Azure Functions trigger always fires when its conditions are met. There are several different triggers available, including the one that we care about here, the Cosmos DB trigger. This Azure Functions trigger binds to configuration that points to the container you want to monitor changes on, and the lease collection that gets managed by the CFP library under the covers.

From there, the process is identical to using the CFP library, only the deployment model is dramatically simpler. The Azure Functions method that is bound using the Cosmos DB trigger receives a parameter with the same list of documents that a CFP library’s observer class does in its ProcessChangesAsync method, and you process change feed data the way you need to just the same.

What’s Next?

Hopefully, this blog post opened your eyes to the change feed in Azure Cosmos DB. This powerful feature creates all sorts of exciting possibilities across a wide range of use cases. In the next installment, I’ll focus on container replication for multiple partition keys, and walk you step by step through the process of building a change feed solution using direct access, CFP library, and Azure Functions – all with working code samples. So, stay tuned!

Advertisements

Understanding Consistency Levels in Azure Cosmos DB

Developers with a background in relational databases are accustomed to achieving data integrity using transactions. Once a writer updates a bank balance and commits the transaction, it’s entirely unacceptable for a reader to ever be served the previous value, and a relational database ensures that this never happens. In the NoSQL world, this is referred to as strong consistency. Achieving strong consistency in a NoSQL database is more challenging, because NoSQL databases by design write to multiple replicas. And in the case of Azure Cosmos DB, these replicas can be geographically spread across multiple Microsoft data centers throughout the world.

First though, let’s understand consistency within the context of a single data center.

In one Azure data center, your data is written out to multiple replicas (four at least). Consistency is all about whether or not you can be sure that the data you are reading at any point in time is – in fact – the latest version of that data, because, you can be reading from any replica at any given time. And if it’s not the latest version, then this is known as a “dirty read.”

Cosmos DB supports five consistency levels, ranging from strong to eventual. I’ll discuss each of these levels in a moment, but you can see that there’s a sliding scale here that balances latency with availability:

On the one extreme, strong consistency guarantees you’ll never experience a dirty read, but of course, this guarantee comes with a cost, because Cosmos DB can’t permit reads until all the replicas are updated. This results in higher latency, which degrades performance.

On the other end of the spectrum, there’s eventual consistency, which offers no guarantees whatsoever, and you never know with certainty whether or not you’re reading the latest version of data. This gives Cosmos DB the freedom to serve your reads from any available replica, without having to wait for that replica to receive the latest updates from other writes. In turn, this results in the lowest latency, and delivers the best performance, at the cost of potentially experiencing dirty reads.

Replication within a region

Consistency is always a concern, but practically, it only becomes really important once you start geo-replicating your data. Because the reality is that, despite these varying consistency levels, it’s extremely rare to ever experience a dirty read within a data center, even when using eventual consistency, which is the weakest consistency level. And that’s because the replicas in a data center are located very close to one another, and data moves extremely fast between them… typically within one millisecond.

So in fact, there is little difference in consistency, latency, performance, and availability within a data center, where you’ll almost never encounter a dirty read, but it’s a completely different matter once you start replicating globally.

Global replication

Once you globally distribute your data, it takes hundreds of milliseconds to replicate changes across continents. This greatly increases the chances for dirty reads, which is why consistency becomes a much more pressing concern once you globally distribute your data across multiple data centers.

Five Consistency Levels

Strong

In the case of Strong Consistency, you can be certain that you’re always reading the latest data. But you pay for that certainty in performance. Because when somebody writes to the database, everybody waits for Cosmos DB to acknowledge that the write was successfully saved to all the replicas. Only then can Cosmos DB serve up consistent reads, guaranteed. Cosmos DB actually does support strong constency across regions, assuming that you can tolerate the high latency that will result while replicating globally.

Bounded Staleness

Next there’s bounded staleness, which is kind of one step away from strong. As the name implies, this means that you can tolerate stale data, but up to a point. So you can actually configure much how out of date you want to allow for stale reads. Think of it as controlling the level of freshness, where you might have dirty reads, but only if the data isn’t too much out of date. Otherwise, for data that’s too stale, Cosmos DB will switch to strong consistency. In terms of defining how stale is stale, you can specify lag in terms of time – like, no stale data older than X, or update operations – as in, no stale data that’s been updated more than Y number of times.

Session

And then there’s session consistency, which is actually the default consistency level. With session consistency, you maintain a session for your writer – or writers – by defining a unique session key that all writers include in their requests. Cosmos DB then uses the session key to ensure strong consistency for the writer, meaning that a writer is always guaranteed to read the same data that they wrote, and will never read a stale version from an out-of-date replica – while everyone else may experience dirty reads.

When you create a DocumentClient instance with the .NET SDK, it automatically establishes a session key and includes it with every request. This means that within the lifetime of a DocumentClient instance, you are guaranteed strong consistency for reading any data that you’ve written, when using session consistency. This is a perfect balance for many scenarios, which is why session consistency the default. Because very often, you want to sure immediately that the data you’ve written has – in fact – been written, but it’s perfectly OK if it takes a little extra time for everyone else to see what you’ve written, once all the replicas have caught up with your write.

Consistent Prefix

With consistent prefix, dirty reads are always possible. However, you do get some guarantees. First, when you do get stale data from a dirty read, you can at least be sure that the data you get has in fact been updated to all the replicas, even thought it’s not the most up-to-date version of that data, which has still yet to be replicated. And second, you’ll never experience reads out of order. For example, say the same data gets updated four times, so there are versions A, B, C, and D, where D is the must up to date version, and A, B, and C are stale versions. Now assume A and B have both been replicated, but C and D haven’t yet. You can be sure that, until C and D do get replicated, you’ll never read versions A and B out of order. That is, you might first get version A when only A has been replicated, and then get version B once version B has been replicated, and then you’ll never again get version A. You can only get the stale versions in order, so you’ll only continue getting version B until a later version gets replicated.

Eventual

Eventual consistency is the weakest consistency level. It offers no guarantees whatsoever, in exchange for the lowest latency and best performance compared to all the other consistency levels. With eventual, you never wait for any acknowledgement that data has been fully replicated before reading. This means that read requests can be serviced by replicas that have already been updated, or by those that haven’t. So you can get inconsistent results in a seemingly random fashion, until eventually, all the replicas get updated and then all the reads become consistent – until the next write operation, of course. So this really is the polar opposite of strong consistency, because now there’s no guarantee that you’re ever reading the latest data, but there’s also never any waiting, which delivers the fastest performance.

One more thing…

It’s also worth mentioning that when bounded staleness and session consistency levels don’t apply strong consistency, they fall back to consistent prefix, not all the way down eventual consistency. So for bounded staleness, which gives you strong consistency when the data is too stale, you’ll still get consistent prefix guarantees when the data isn’t too stale. Likewise, with session, which gives you strong consistency when reading your own writes, you’ll get consistent prefix when reading other writes.