Domain Centric

View Original

Event Sourcing Projections patterns: Consumer scaling

Event Sourced systems implemented in tandem with Command Query Responsibility Segregation can handle very high volume of events and still deliver great experience for the users.

Because the Write Stack (command handling) is clearly separated from the Read Stack (query handling) we can prioritise which read models get updated first and which can be updated a bit later in case of a spike in traffic.

An example from the banking domain discussed in the previous post about deduplication can be two read models:

  • Account Balance read model - accurately presenting user with current balance
  • Transaction History read model - displaying full history of transactions with all details

Let's imagine that the business defined following SLOs (Service Level Objectives) for them:

  • Account Balance - P99 of updates will be completed within 100ms of transaction being handled, MAX time it can take to update the read model is 1s
  • Transaction History - P99 of transactions should be visible on the history within 1s and the MAX time is set to 1m

The sliding window used to alert on both measurements is set to 5 minutes.

Having SLOs like this defined by business stakeholders is a good starting point as they will inform and impact the design and scale of the projection consumers. One way of measuring the time it takes to update the read model is to have a custom metric calculating the difference between event's occur time and the time when the read model update was completed. Because usually projection consumers are running independently from command handlers it's likely that such metric will not be perfect because of the clock drift between two machines, which is something that should be kept in mind.

To begin with our projection consumers are implemented in a very straightforward way - they run as two independent processes, consuming messages from the event stream one by one and acking them after every event is successfully processed. Given such implementation average time it takes to process an update is respectively:

  • Account Balance - 20ms
  • Transaction History - 15ms

Our system is handling on average 20 events per second and assuming fairly uniform distribution of events the values reported for metrics are well within the tolerance set for them. Occasional spikes to 60 events per second do not fire of the alerts as the values reported are smoothed out by the sliding window of the alert.

As these spikes start to happen more often the Account Balance alerting starts to fire off. The consumer is no longer keeping up with number of messages being sent to it and needs to be optimised or scaled out.

Event batching

Because our consumers are fetching and acking messages from the event stream one by one, the simplest thing we can do to save some IO time is to fetch a batch of messages at once (ie all messages since last fetch, with some limit), process them, and then ack them all at the end of processing. This operation can save a constant amount of time per message and also will ease the pressure on network and broker.

The negative side of this approach is that in case where the consumer crashes before acking the batch it will have to reprocess and deduplicate events that were already processed. This will cause a delay in processing, which might not be welcome, but hopefully it's not something that will happen very often.

Increasing throughput of a single consumer

Depending on the nature of projection, we might be able to further speed up the process by introducing a degree of parallelism by spawning extra threads within the consumer that is processing the events.

In the simplest case we might be able to parallelise event processing without putting any extra logic in place. An example of such a case is AccountTransactionsProjectionHandler. Because it doesn't need to read any state before the insert, it doesn't really matter when two inserts happen at the same time.

Things get more complicated when we need to read the current state of the read model first and execute updates sequentially. Fortunately this sequential processing usually applies to a group of messages - in the case of AccountBalanceProjectionHandler the events are always handled in the context of a single account id, which in our case is the same as stream id.

Let's consider an example of a batch of messages fetched from event stream - letter represents stream id and number is the sequence number of the event within that stream:

A1 - A2 - B1 - A3 - B2 - C1 - A4 - C2 - C3 - C4 - C5 - C6

If we then group messages using stream id we will end up with 3 sub streams:

A1 - A2 - A3 - A4
B1 - B2
C1 - C2 - C3 - C4 - C5 - C6

Now we can parallelise processing of each group of messages but within each group the messages will be processed sequentially, which will guarantee ordering (assuming events were not shuffled during grouping).

Because usually a lot of the projection handling time is spent on IO (communicating with db backends et al) if we can make sure that IO is multithreaded and non-blocking we should end up with fairly reasonable throughput improvements.

That doesn't mean our job is done here, as we can still do better. Looking at our previous example of 3 grouped streams you probably noticed that largest group is last on the list of groups to be processed. In such case if the degree of processing parallelism is smaller than the number of groups we will end up with suboptimal results.

If the degree of parallelism is set to to 2 messages will be consumed in the following order:

Thread 1: A1 - A2 - A3 - A4
Thread 2: B1 - B2 - C1 - C2 - C3 - C4 - C5 - C6

Which assuming that each message takes 20ms to be processed will reduce processing time from 240ms (12 messages) to 160ms (8 messages handled by the Thread 2).

If instead we sort groups by the size:

C1 - C2 - C3 - C4 - C5 - C6
A1 - A2 - A3 - A4
B1 - B2

And then handle them in parallel:

Thread 1: C1 - C2 - C3 - C4 - C5 - C6
Thread 2: A1 - A2 - A3 - A4 - B1 - B2

The final processing time will be reduced to 120ms.

In order to further improve the throughput we can scale the consumer vertically and add extra cpu, threads or memory. Based on my previous experience with such an approach the consumer should be able to handle thousands or more events per second.

As you can see from the above strategy will shine in cases where batch contains events assigned to a large number of streams, with a small number of events per stream. On the other hand if for example handler will receive a large batch of messages from a single stream (which might be the case during replay) the parallelism will not improve the throughput as all events will be handled sequentially.

Increasing number of consumers

In a very large system, or during replays of large number of events the previous strategy might not be enough to handle all events in a timely manner. In such cases we can tackle the problem from a different angle and scale the consumers horizontally.

This can be achieved by partitioning messages to 2 or more partitions using the stream id (or any other partition key that suits your use case). Some of the brokers support partitioning natively, some others might need more setup, but the critical requirement we have is that all events with the same stream id should always be assigned to the same partition.

With partitioning in place each of the consumers can independently fetch messages from partitions assigned to it. Theoretically we can linearly increase throughput as we increase the number of consumers or even autoscale them using metrics. The reason that the linear increase in throughput is theoretical is because usually we can't easily autoscale our database backends, so at some point we will start seeing diminishing returns as the db will become a bottleneck.

We should also keep that in mind when making decisions which consumers should be scaled, as if they use the same infrastructure scaling one can negatively impact the other. For that reason it's beneficial to understand which projections can lag behind during the spikes.

Another problem that partitioning introduces is consumer rebalancing (if a consumer joins or leaves) as depending on the chosen technology the rebalance might pause the processing. If the processing is not paused until the group stabilises itself we are risking violating Single Writer Principle and sending the same message to multiple consumers at the same time.

Summary

CQRS/ES systems thanks to the separation of Read and Write Stacks allow to design and implement highly scalable systems. The throughout of consumers can be tuned in multiple ways depending on the use case and available technologies. It is important to stress that any of the optimisation decisions should be made based on the predefined metrics, which will allow to avoid putting too much effort into premature optimisation of the projection handling.