In the previous post of the series we've discussed how to scale projection consumers so that they can handle high volumes of messages. Today we will look at the design of the consumer itself and see how we can design it so that it works in a predictable and repeatable way.
For the purposes of this post we will assume the logic of projection consumer is composed of the following steps:
- Receive the event
- (optionally) Read an existing state
- Execute projection logic
- Perform side effects
- (optionally) Save updated state
- Ack the event
I have on purpose separated two side effects from points 4 and 5 (writing to DB is a side effect as well). This is to illustrate how to handle writes to two independent backends without using two-phase commits. Instead we can rely on using at-least-once delivery guarantees and ask other systems to deduplicate our messages.
Fraud use case
It's always good to discuss concrete examples, as they help to contextualise concepts. We will continue exploring our banking domain and add new fraud related functionality. The requirements from business state that:
When 2 debit transactions are made from the same account within 5 minutes and the total debited amount is greater than £5000 then the customer should be notified about potential fraud.
Example implementation
One of possible ways of fulfilling the the requirement using template presented in earlier paragraph could be:
- On every
AccountDebited
event - Fetch
LastDebitByCustomerId
read model - Deduplicate the event, calculate and prepare the message if fraud notification requirements are met
- Use
NotificationService
to alert the customer if they are - Update
LastDebitByCustomerId
with latest transaction - Ack the event
So far so good. Logic seems simple and clear. Now let's consider what is going to happen when we have to deal with "power cuts".
If the process dies during steps 1, 2 we are safe. The event will be eventually redelivered when system is back online and state re-read.
Things start to get interesting at point 3. We can rely on the deduplication to tell us whether event should be skipped or not. Now in cases where requirements are met we will need to prepare a new alert message to be sent via notification service. The message has the following structure:
case class FraudAlert(
id: UUID,
date: ZonedDateTime,
customerId: UUID,
message: String
)
First field we need to populate is the id of the notification. If we simply generate UUID every time code is executed we will lose any possibility to deduplicate messages in the NotificationService
in cases where the process dies shortly after notification is sent. What we can do instead is to either re-use the event id as notification id, or generate new uuid (UUID v5) based on data received in the latest event. That way every time we will be handling the event we will get exactly the same output for a given input, which makes testing much simpler (no need to mock UUID generation code).
Second field is the date of the notification. It will be important to discuss with product/business stakeholders where this field is initialised, as if the date of alert is the date of the last transaction when we can use the occurred time of event (and add timezone information to it). If not and instead it should be generated at the time of handling the notification then we will lose the referential transparency we are aiming for, but it might not be a big deal if the alert is deduplicated in the notification service.
Now that our logic generates exactly the same output for a given input we can move to step 4 and send the notification. If the process dies shortly before notification is sent, then no harm is made and we can restart the process. If the process dies after notification is sent when we are also safe as when we will be handling the redelivered event we will send exactly the same notification for the second time and the notification service will have to deal with deduplicating it (or send the same message for the second time).
It is very important to send the notification (step 4) before the step number 5 - updating state in the db - is executed. If we were to swap these two we could end up in a situation where state (and deduplication store) is updated before notification is sent. Then if the process dies (or notification service is down) on event redelivery we would skip the processing as the deduplication store was already updated.
If instead the logic is implemented as suggested in this post (first send the massage and then update deduplication store) we will save new row to the store (and use it to deduplicate messages) only if the notification succeeded.
Summary
Running a projection doesn't have to mean that all we can do is to update read models used by APIs. It is possible to handle more complex use cases that involve triggering side effects on other systems, and still keep the code concise and resilient. When designing such solutions you can apply following two heuristics:
1. Calculate new values based on existing inputs
So that logic is referentially transparent, easy to test and other systems can deduplicate messages.
2. Perform side effects before updating deduplication store
So that in a case of a "power cut" the system will be able to execute the logic from the beginning without being affected by premature deduplication.