Event Sourcing With Elixir - Part 2

Posts in this series
Introduction to new concepts
Before we get the ball rolling on our BankAPI project, there are some concepts we need to be acquainted with.
In traditional web projects following the MVC paradigm, you have Models, Views and Controllers.

from MSDN
Controllers
handle requests from the user, interact with Models
to query or modify
the database, and use the result of those interactions to populate Views
that are
rendered back to the user. This paradigm is widespread and is quite adequate as a
solution to a number of problems, but it’s almost Newtonian - at a certain scale,
it stops being so effective.
With MVC, you have a set database structure, and the requests your system processes have to somehow satisfy it, or you’ll have to change your data model. This is usually costly, unless your data model is extensible - which pushed too far, might introduce complexity of its own.
With Event Sourcing, the process of producing data from interactions with the
system is not as immediate. Every meaningful interaction, which we call event
, is stored
into a special database called Event Store
or write model
, and only after that do we
publish said event to its subscribers for analysis and eventual reification1
onto a database (or more than one) designated as the read model
. It is only from the read model(s) that data is fetched from, which means the Event Store is walled off from queries and serves only as a sort of log for past stored events.

from DDD Building Blocks Dictionary
There are many ways to do Event Sourcing, but one pattern that is commonly used is
Command Query Responsibility Segregation or CQRS. In this diagram covering its basics,
we see a command
interacting with a domain model
which emits events onto the
aforementioned Event Store, which are then fed back onto the domain model for potential
event cascading but also to a projector
, the proper jargon in this context for
what is basically an event handler2. The projector is responsible for modifying
the data in the read model.
CQRS stipulates a fundamental split between actions that impact the system or commands
, that
return no data, and actions that query the state of the system, aptly named queries
,
that do return data but have no effect on the system’s state.
At first this might seem unintuitive or verbose, but it allows for exciting features. Splitting read and write heavy operations in the system allows for separate tuning of the components that handle them - optimizing software for writing yields tremendous performance. But more profoundly, CQRS allows one to adapt to those data model changes so prevalent in fast-moving MVC codebases much more aptly. By constructing data after the event is stored, we’re free to change our interpretation of it in the future or add new features, and go back to past events and apply those learnings to build new data structures in the read model with retroactive effects.
For example, imagine we store events like this in our Event Store:
{
"type": "product_added_to_order",
"payload": {
"user_id": "ab3c0fff-58c5-491b-a7dc-98258b1dbd4e",
"order_id": "7fe50e1b-2601-416b-bcd2-b7cb928a8e83",
"item": {
"id": 1701, "name": "USS Enterprise-D scale model", price_in_cents: "1999"
}
}
}
{
"type": "product_removed_from_order",
"payload": {
"user_id": "ab3c0fff-58c5-491b-a7dc-98258b1dbd4e",
"order_id": "7fe50e1b-2601-416b-bcd2-b7cb928a8e83",
"item_id": 1701
}
}
{
"type": "product_added_to_order",
"payload": {
"user_id": "ab3c0fff-58c5-491b-a7dc-98258b1dbd4e",
"order_id": "7fe50e1b-2601-416b-bcd2-b7cb928a8e83",
"item": {
"id": 9453, "name": "Klingon Bird Of Prey D-7", price_in_cents: "2199"
}
}
}
At the end of this order, we just store the items that remain in the basket, but we can go back and mine this event stream for products that were removed and maybe notify the user when they are on sale. Or build a system that advises users on alternatives when they add one of these skipped products to their carts.
Did you ever wish you could go back in time to when the user filed a bug report to inspect the state of the database and try to replicate the issue? Now you can, just replicate the Event Store and run the projectors only up to a certain timestamp. Did you accidentally delete your production database? Replay the events in your Event Store and you’ll be able to rebuild a perfect copy of it3!
This ability to go back into the Event Store to reprocess old events with a new approach is a big plus: with MVC your current database is what it is, but with Event Sourcing you can mine your old events for new knowledge. And basically, it seems to be a better place to be: you leave an open door to what can be built in the future from the events you save, treading carefully in order to design them in a way that satisfies the current business needs. You no longer feel pressured into having to design the perfect data model, you can just iterate on your approximation to that perfect goal with every new projector and tweak to the read model.
Domain Models
The CQRS diagram above features a domain model
- what does that mean?
A lot of Event Sourcing projects take cues from a methodology known as Domain Driven Design or DDD. DDD is a big topic, and I recommend you read both the Blue book for the theory behind it from its creator, Eric Evans, and the Red book for a practical approach to it. I won’t go into it in much depth since I’m not an expert, but for our purposes and particularly our use of the Commanded library, some super quick definitions for a few additional DDD concepts are in order. This series of articles is meant as a head-first dive into Event Sourcing, not a super detailed introduction to DDD. With that in mind, let’s go over the main constructs we’ll be using with the Commanded library.

Commands
Commands are normal Elixir structs, and they are usually named in the imperative. Example command:
defmodule OpenAccount do
@enforce_keys [:account_number]
defstruct [:account_number, :initial_balance]
end
Middleware
You can define code to run before and after a command’s dispatch, and also after a dispatch failure. This is where you want to do command validation, authorization, logging and the like. Example:
defmodule NoOpMiddleware do
@behaviour Commanded.Middleware
alias Commanded.Middleware.Pipeline
import Pipeline
def before_dispatch(%Pipeline{} = pipeline) do
pipeline
end
def after_dispatch(%Pipeline{} = pipeline) do
pipeline
end
def after_failure(%Pipeline{} = pipeline) do
pipeline
end
end
Aggregates
Aggregates are, traditionally in DDD, collections of objects that can, or must,
be treated as a single unit for business reasons. Picture an Order with an array
of OrderLines and a total
field - these objects are distinct but a change to one
of the OrderLines without updating the Order parent object’s total
field would
lead to inconsistent state.
In Commanded they are very easy to work with. Here’s an example:
defmodule BankAccount do
defstruct [:account_number, :balance]
# public command API
def execute(%BankAccount{account_number: nil}, %OpenAccount{account_number: account_number, initial_balance: initial_balance})
when initial_balance > 0
do
%BankAccountOpened{account_number: account_number, initial_balance: initial_balance}
end
def execute(%BankAccount{}, %OpenAccount{initial_balance: initial_balance})
when initial_balance <= 0
do
{:error, :initial_balance_must_be_above_zero}
end
def execute(%BankAccount{}, %OpenAccount{}) do
{:error, :account_already_opened}
end
# state mutators
def apply(%BankAccount{} = account, %BankAccountOpened{account_number: account_number, initial_balance: initial_balance}) do
%BankAccount{account |
account_number: account_number,
balance: initial_balance
}
end
end
Think of them as holders of state, receiving commands and emitting events after mutating
their internal state. Each individual aggregate is an Elixir process and can be stopped
and rebuilt from the Event Store on demand to address memory usage constraints. In the example above, the aggregate handles the OpenAccount
command on the execute/2
functions
and emits the BankAccountOpened
event as a result. Once those events are emitted, the
aggregate runs its apply/2
functions that mutate internal state (account_number
and balance), informed by the event’s values.
Routers
Routers match commands to aggregates that will handle them. Example:
defmodule BankRouter do
use Commanded.Commands.Router
dispatch OpenAccount, to: BankAccount, identity: :account_number
end
A command dispatch is quite simple:
:ok = BankRouter.dispatch(%OpenAccount{account_number: "ZC7", initial_balance: 100})
Events
Normal Elixir structs, just like commands. They are usually named in the past tense as they denote something that has already happened. Example:
defmodule BankAccountOpened do
@derive Jason.Encoder
defstruct [:account_number, :initial_balance]
end
Commanded serializes all events to JSON.
Process Managers
Process Managers are the opposite of aggregates as they receive events and emit commands. They are responsible for coordinating interactions between one or more aggregates. Process managers have state that can be used to track an orchestrated flow. Example:
defmodule TransferMoneyProcessManager do
use Commanded.ProcessManagers.ProcessManager,
name: "TransferMoneyProcessManager",
router: BankRouter
@derive Jason.Encoder
defstruct [
:transfer_uuid,
:debit_account,
:credit_account,
:amount,
:status
]
# Process routing
def interested?(
%MoneyTransferRequested{transfer_uuid: transfer_uuid}
), do: {:start, transfer_uuid}
def interested?(
%MoneyWithdrawn{transfer_uuid: transfer_uuid}
), do: {:continue, transfer_uuid}
def interested?(
%MoneyDeposited{transfer_uuid: transfer_uuid}
), do: {:stop, transfer_uuid}
def interested?(_event), do: false
# Command dispatch
def handle(%TransferMoneyProcessManager{}, %MoneyTransferRequested{} = event) do
%MoneyTransferRequested{
transfer_uuid: transfer_uuid,
debit_account: debit_account,
amount: amount
} = event
%WithdrawMoney{
account_number: debit_account,
transfer_uuid: transfer_uuid,
amount: amount
}
end
def handle(%TransferMoneyProcessManager{} = pm, %MoneyWithdrawn{}) do
%TransferMoneyProcessManager{
transfer_uuid: transfer_uuid,
credit_account: credit_account,
amount: amount
} = pm
%DepositMoney{
account_number: credit_account,
transfer_uuid: transfer_uuid,
amount: amount
}
end
# State mutators
def apply(
%TransferMoneyProcessManager{} = transfer,
%MoneyTransferRequested{} = event
) do
%MoneyTransferRequested{
transfer_uuid: transfer_uuid,
debit_account: debit_account,
credit_account: credit_account,
amount: amount
} = event
%TransferMoneyProcessManager{transfer |
transfer_uuid: transfer_uuid,
debit_account: debit_account,
credit_account: credit_account,
amount: amount,
status: :withdraw_money_from_debit_account
}
end
def apply(%TransferMoneyProcessManager{} = transfer, %MoneyWithdrawn{}) do
%TransferMoneyProcessManager{transfer |
status: :deposit_money_in_credit_account
}
end
end
There’s quite a lot going on here, but basically we must define the process manager’s
state like we do for aggregates, and then use interested?/1
functions to signal
which events we want to hook into to start, continue and stop the flow. These events
are fed into apply/2
functions that mutate the manager’s internal state, and finally
we handle/2
the events to emit commands that will be routed to aggregates. We’ll
go into more detail about process managers later on.
Event Store
The specialized data store that stores the events. Commanded ships with adapters for two of them: Commanded author’s own EventStore, open source and based on PostgreSQL, and Greg Young’s Event Store, also open source and with optional paid support.
Read Model
A normal database (any supported by Ecto will do). It is this database that projectors affect, and that queries read from.
Projector
The components that subscribe to events and change the read model. Commanded has an extra lib to simplify working with these, including error handling. Example:
defmodule MyApp.ExampleProjector do
use Commanded.Projections.Ecto, name: "example_projector"
project %AnEvent{name: name}, _metadata, fn multi ->
Ecto.Multi.insert(multi, :example_projection, %ExampleProjection{name: name})
end
end
The projector’s name
must be unique throughout the codebase, and they are usually
supervised, unless you have a one-off projector. Here’s an example of how supervising
a projector could work:
defmodule Bank.Supervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
Supervisor.init(
[
MyApp.ExampleProjector
],
strategy: :one_for_one
)
end
end
Queries
These are just normal Ecto queries or direct SQL statement - whatever fits your needs best. Only rule is to query the read model’s repo and never the Event Store.
Wrap Up
I didn’t go into other DDD concepts like Bounded Contexts and the like since this is already a quite long post, and as mentioned I’m aiming for a hands-on introduction to the methodology and the Commanded library and not a full course on DDD - that being said, other concepts might be introduced along the way. Hopefully by the end of the post series, you’ll feel more motivated to know more about DDD, Event Sourcing or CQRS on your own.
This all might seem like a lot to take in, but it feels quite natural once we start to get down to it. Next time we’ll move past the introductions and get into some real code.
See you then!
Cover image credit: PIXNIO
- Turning an abstract entity or construct into data. [return]
- In functional terms, the event is projected onto the read model by the handler, thus they are called projectors or projections. I prefer “projector” with the Ecto module being manipulated being the “projection”. [return]
- Caveat - if your read model’s state depends on calls to an outside system, you have to provide mechanisms to be able to replay these calls so they yield the same results. And remember to not send emails again! [return]