Event Sourcing: Projections

Introduction

Projections are one of the core patterns used in Event Sourcing. What we understand by ES is persisting changes that are happening in the application as a sequence of events. Then this sequence (also called a stream) of events can be used to reconstruct the current state, so that any subsequent requests can be handled.

In theory we could stop here and do everything using only the event stream. Unfortunately that quickly becomes very inefficient. Usually the reads (queries) happen much more often than the writes (commands). If we look at worn banking example, we can query all the transactions that have ever happened to an account and then derive the current balance from the history of changes. Unfortunately reading hundreds or thousands of events will mean we will spend a long time doing IO, and then also some time in calculating the current balance.

If instead we could pre-calculate the current balance, and store the value somewhere then that query could be answered much quicker. You can think of it as a form of materialised view or a cache. Using this approach leads us to CQRS (Command-Query Responsibility Segregation) which is a pattern defined around the idea that you can use two different models to read (Read model) and write (Write model) the information.

Definition of projection

With this context in mind we can define a projection as an answer to a question: what is the current state derived from the event stream?

As Greg Young explains it, a projection is nothing else than a left-fold over the sequence of events, which is a functional way of expressing the definition. In Scala the foldLeft is a higher order function with (simplified) definition of:

So given a traversable of events A, fold left takes an initial state B and a function op that updates current state B with event A and **thenreturns a new state B**. The op function will be called on each element of traversable from left to right and the updated state will be passed along.

Simple examples

Let's consider following example: we have a sequence of credits and debits and would like to know what is the current balance:

It might look like a convoluted way of calling transactions.sum, but implementing it in terms of this abstraction gives us an ability to express more complex logic. Let's imagine we need to know what is the total amount of credits ever received by an account. What we need to do know is to change how the op is defined:

As we can see with use of these simple building blocks we can start building more complex projections of what happened in the system. Unfortunately that still doesn't address the idea we have started with which is to "pre calculate the current balance, and store the value somewhere then that query could be answered much quicker".

Projection Persistence

In-memory

Simplest way of maintaining the projection state is to read the whole stream of the events when the service starts and then keep the current state in memory. Any queries to get the current state will be answered as quickly as possible. This approach works well for prototyping, when the number of events is fairly small (so it's quick to replay) and where can afford waiting for the service to replay events in case of its failure.

SQL

A traditional way of storing the projection state, and likely one of the most commonly used. After processing an event we store the latest state in a table, and then when it's needed it can be queried or updated when any subsequent event is received.

One of the heuristics used to build projections and store them in an SQL store is that they should be designed to answer the questions we want ask. If they are implemented in that way, then any queries to fetch the state can be answered very quikcly. When joins or aggregation queries are involved it can be an indication that the current design of a projection is not optimal or that it's serving more than one use case.

One of the traps to watch out for are dependencies between projections. If two different projections are depending on each other it makes it quite hard to rebuild them later if needed. This trap is unfortunately usually quite easy to fall for if a traditional SQL store is being used, as joins are very easy to perform. It might be tempting to reach to tables created by other projections to get the data currently not available in the primary projection. The main issue with this approach is that each projection has its own lifecycle and that it's possible that changes propagated to one of them didn't reach the other one yet. This could then lead to race conditions, bugs or other hard to explain behaviours.

Another challenge that using SQL poses is the temptation to build the 3rd normal form model of their entities. This is often driven by a need to be able to perform ad-hoc queries that system might need to answer in the future. Instead what we should focus our efforts on is to understand what are the actual use cases that we need to support and then build a set of projections that will help us achieve these goals.

NoSQL

The raise in number of NoSQL solutions on the market greatly improved the awareness of developers that there is no "one size fits all" solution to the querying problems. Having an ability to project an event stream to any backend gives us ability to improve the user experience and simplify the application code. An example can be to store the projection state in a search engine, or in a time series database, or in a distributed key value store that will be able to support an optimal way of querying the data.

Files

Filesystem is another option, and in particular any cloud object storage can be considered for being a repository for projected data. One of possible examples is storing data as json or xml files so that they can be directly consumed by application clients.

Keeping the projection up to date

The state of projection needs to be updated and stored after write to an event store - it can happen in two ways:

  • Synchronously - in the same transaction as the write to the event stream. This approach is usually very limited as it assumes the events are stored in the same database as the projection data. It also doesn't scale easily, and has other operational problems (such as replays that can be impossible or hard to synchronise). On the upside it reduces operational complexity and also allows to assume the projection state is updated immediately. As a rule of thumb we can say though that this approach can be useful for prototyping but not so much for production version of the system.
  • Asynchronously - events are delivered to projections after they are written to the event store. This can happen in a push or pull based manner depending on the available infrastructure and scaling needs. Because the updates are asynchronous we will have to deal with eventual consistency of data stored by projections as well as with delivery guarantees. On the upside the projections are now decoupled from the main transactional write and can be scaled, replayed and monitored independently depending on the needs.

Concrete example

In real life the projection implementation tend to have a couple of moving parts:

  • A repository that allows to find or store a state
  • A projector that has event handlers that know how to update or create a state

Let's have a look at our account example, where we want to build a current balance projection:

In the example above, we have a Balance and a repository that allows to read and write it. When a new event is received we can execute an appropriate logic depending on which type is being handled. This is a good start but in real world we will probably want to add some deduplication, or in more complex use cases also trigger side effects. These topics are covered in other posts from the projection series and are linked at the end of this post.

Use cases

The state maintained and persisted by projections can support multiple use cases:

  • To expose a read model to the users of the system
  • To support business policies, process managers and sagas
  • To accumulate and build external fat events used by downstream systems
  • To support scheduled jobs

Testing

Assuming that we've followed the pattern that the projection is a left-fold over the history of events, we can then leverage it also for the testing. Any update to the projection effectively boils down to calling the projector with a sequence of events that happened in the past and a new event. Then the result should be equal to an expected state.

The above example assumes existence of testing framework that will support the above syntax, which certainly is possible in some languages, but might be a bit more verbose in others.

Projections vs Aggregates

By now you've probably asked yourself a question how the projection differs from applying events to the state of aggregate. The answer is that they are very similar, and the only real difference is the intent. Internal aggregate state exists so that we can make a decision if a command should be processed, and if yes then what are the new events that should be persisted (so the C from CQRS). On the other hand projections always answer a question about the facts that already happened to the system (the Q part of CQRS).

Summary

Projections are a very simple yet powerful concept that will allow your application to support number of varied use cases. Their implementation and testing is simple, but it's likely that updates will be eventually consistent which might be challenging in some domains. In return we will gain an ability to push our data to number of different persistence stores, scale the processing, support changing requirements and allow for downtime free bug fixes.

If the post was useful have a look at other more advanced projection topics: