In a typical CQRS/ES system events processed by projections have At-least-once delivery guarantee. Because of that it is usually necessary to implement deduplication in order to achieve (perceived) idempotency.
Although some of the message queues / streaming platforms claim that they can deliver exactly once semantics the answer to the deduplication problem is nuanced and we can't get away without solving it.
Naturally idempotent projections
Before we dive into specific deduplication strategies let's consider a simplified example of current account domain:
Our domain is made of two events: AccountCredited
and AccountDebited
. Both of them are then used to create/update data in two repositories: AccountBalanceRepository
and AccountTransactionsRepository
. These repositories are then used to serve relevant read models.
A naive implementation of handlers for aforementioned projections:
As we can see in the AccountBalanceProjectionHandler
first we need to read current balance and then update it accordingly. Because there is no deduplication it's possible that in case of redelivered event we will add or subtract the same amount more then once, which will result in incorrect balance.
On the other hand AccountTransactionsProjectionHandler
is only saving new transactions into the database backend and in case that the same event is delivered twice it will overwrite existing data with exactly the same values. This means that the projection is idempotent by design and does not require any extra logic / code to handle deduplication of events.
Deduplication Strategies
Event id based
Deduplication table per projection
Most naive fix to our AccountBalanceProjectionHandler
could store handled event ids in a separate table and transactionally read current balance and update it only if event id is not present in the table. A simple schema that would support it:
CREATE TABLE handled_events (projection_id TEXT, event_id TEXT, UNIQUE(projection_id(36), event_id(36)));
Deduplication table per stream per projection
Alternatively the same could be done with a table that would store event ids for each stream (not for whole projection). That would make queries and indexes slightly more efficient:
CREATE TABLE handled_events (projection_id TEXT, stream_id TEXT, event_id TEXT, UNIQUE(projection_id(36), stream_id(36), event_id(36)));
Deduplication column per read model row
Last evolution of this approach is to store event ids as a part of the read model itself (part of the account_balance row) and fetch all event ids as during the current balance read. Then deduplication can be done in memory and if event id is already present then handling will be skipped:
CREATE TABLE account_balance (stream_id TEXT, balance DOUBLE, handled_event_ids TEXT, UNIQUE(stream_id(36)));
All of these approaches will work, but have one major problem, which is that the size of the deduplication table / column will grow linearly with the number of events handled. Even worse the last option will result in more and more of data to be transferred back and forth just to achieve deduplication of messages.
There are some ways of optimising the linear growth of the event ids storage, such as only storing latest N or only storing events younger than Y. This will surely work and help to mitigate the problem, but is a workaround / accidental complexity caused by suboptimal design. As a rule of thumb we should try to avoid this strategy as in most cases we can do much better and this is what the next two approaches describe.
Global sequence number based
Second approach relies on the fact that some of the event stores provide global ordering for all events written to it. If your event store does provide you with such sequence number it can be a better alternative to using explicit event ids. Given that we can guarantee ordered processing in the event handler and that sequence number is strictly increasing then we will process the event only if the stored sequence number is smaller than the number received with the event.
Deduplication table per projection
Simplest approach would assume only one deduplication table with following schema:
CREATE TABLE projection_checkpoint (projection_id TEXT, sequence_number BIGINT, UNIQUE(projection_id(36)));
Assuming that you are ok with extra queries that will be made to read and update the state of this table the strategy will work fine for systems with small to medium number of events. For systems with high throughput having only one consumer (which is imposed by the design of this table) will prevent us from parallelising queries to this table. If that's the case we can apply similar optimisation as we did in the previous example - introduce stream id to the mix:
Deduplication table per stream per projection
CREATE TABLE projection_checkpoint (projection_id TEXT, stream_id TEXT, sequence_number BIGINT, UNIQUE(projection_id(36), stream_id(36)));
Although nearly optimal this solution still suffers from the problem that every time we need to update balance table will always have to do two extra queries to read the state of the checkpoint table and then update it after handling. If that's the problem then similarly to the example we had earlier with storing event_ids in the account_balance table we can store there the sequence number.
Deduplication column per read model row
CREATE TABLE account_balance (stream_id TEXT, balance DOUBLE, sequence_number BIGINT, UNIQUE(stream_id(36)));
This way we've ended up with a solution that only adds a little bit of data to the table (and queries) and allows to avoid extra queries required by earlier solutions.
Stream version number based
Solutions described in the previous section rely on the fact that the event store will provide global ordering. In most cases this shouldn't be an issue but sometimes either chosen db backend or scale might force you to give up this constraint.
If the event storage is implemented using stream_version based optimistic lock we can use the stream_version as an alternative to using global sequence number. It will not be possible to implement one global deduplication table per projection, but the other two approaches discussed earlier will still work.
Deduplication table per stream per projection
Single projection table for all projections and stream ids will work as expected assuming you are fine with extra two queries needed on every update:
CREATE TABLE projection_checkpoint (projection_id TEXT, stream_id TEXT, stream_version BIGINT, UNIQUE(projection_id(36), stream_id(36)));
Deduplication column per read model row
Alternatively the stream_version
can be set on the read model table itself which will reduce number of queries:
CREATE TABLE account_balance (stream_id TEXT, balance DOUBLE, strem_version BIGINT, UNIQUE(stream_id(36)));
Summary
Choosing a deduplication strategy might not be something that is discussed often when considering a CQRS/ES implementation, but it's a necessity that will sooner or later surface itself in a correctly designed system.
Final note / warning - the analysis in this blog post is done assuming a single read model row saved per stream and that updates can be done atomically which might not be the case. Before using any of the strategies please analyse the impact of underlying architecture on your choice. One thing that wasn't discussed in this post is the complexity of implementation of each strategy which will vary between bd backends, programming languages, frameworks and teams.
Below table summarises the article in a more concise form and hopefully will help you make the right decision given various factors such as size of storage or number of extra queries per event.