Event Sourcing With Elixir - Part 3

Posts in this series
Onward
In the previous articles, we went over installing the tools we’ll need for our API project and a small introduction to Event Sourcing, DDD and CQRS and how our library of choice, Commanded, interprets them. We aim to develop a simple Banking API that allows for:
- account opening and closing
- deposits and withdrawals
- transfers between accounts
Way back in part 1 we ran the Phoenix project generator Mix task and got a basic skeleton app ready to go. Let’s now add some dependencies: Commanded, EventStore and the EventStore adapter.
# mix.exs
defp deps do
[
# ...
{:commanded, "~> 0.18"},
{:eventstore, "~> 0.16.1", runtime: Mix.env() != :test},
{:commanded_eventstore_adapter, "~> 0.5", runtime: Mix.env() != :test}
]
end
We don’t want to use EventStore in all environments, since there’s a faster in-memory event store just for tests. Next up, telling Commanded which event store adapter to use:
# config/config.exs
config :commanded,
event_store_adapter: Commanded.EventStore.Adapters.EventStore
For development, we want to use EventStore, so the database settings must be present:
# config/dev.exs
config :eventstore,
column_data_type: "jsonb"
config :eventstore, EventStore.Storage,
serializer: EventStore.JsonbSerializer,
types: EventStore.PostgresTypes,
username: "postgres",
password: "postgres",
database: "bank_api_eventstore_dev",
hostname: "localhost",
pool_size: 10,
pool_overflow: 5
You should adjust these to suit your local environment. To create the event store’s database, it’s enough to:
$ mix do event_store.create, event_store.init
Finally, for the test environment:
# config/test.exs
config :commanded,
event_store_adapter: Commanded.EventStore.Adapters.InMemory
config :commanded, Commanded.EventStore.Adapters.InMemory,
serializer: Commanded.Serialization.JsonSerializer
When you run mix test
now, you should see (after all the initial compilation
messages):
Generated bank_api app
..
Finished in 0.03 seconds
2 tests, 0 failures
Randomized with seed 825671
The current tests were auto-generated by the Phoenix project generator task.
Opening bank accounts
Alright, time for our first feature! Our accounts will be quite simple and just have an identifier and a current_balance
field on our read model. Events that will be emitted as a result of interactions with our aggregates will be subscribed
to by projectors and this balance will be updated over time. Here’s the diagram
from the previous post as a reminder:

For the “opening a bank account” initial feature we will need:
- a controller action to receive the request (and a route matching to it)
- a command to be dispatched by this controller action
- a router to route this command to an aggregate
- an aggregate that will handle this command, mutate its internal state and emit an event as a result
- the event emitted by this aggregate
- a projector to subscribe to this event and create the record on the read model
This might seem like a lot, but with this first feature we’ll get a lot of pieces in place, and subsequent features will be much quicker to implement. Let’s start things off with a test for the API endpoint:
# test/bank_api_web/controllers/account_controller_test.exs
defmodule BankAPIWeb.AccountControllerTest do
use BankAPIWeb.ConnCase
@create_attrs %{
initial_balance: 42_00
}
@invalid_attrs %{
initial_balance: nil
}
setup %{conn: conn} do
{:ok, conn: put_req_header(conn, "accept", "application/json")}
end
describe "create account" do
test "renders account when data is valid", %{conn: conn} do
conn =
post(
conn,
Routes.account_path(conn, :create),
account: @create_attrs
)
assert %{
"uuid" => _uuid,
"current_balance" => 4200
} = json_response(conn, 201)["data"]
end
test "renders errors when data is invalid", %{conn: conn} do
conn =
post(
conn,
Routes.account_path(conn, :create),
account: @invalid_attrs
)
assert json_response(conn, 422)["errors"] != %{}
end
end
end
When we run our tests with mix test
we should see:
..
1) test create account renders account when data is valid (BankAPIWeb.AccountControllerTest)
test/bank_api_web/controllers/account_controller_test.exs:16
** (UndefinedFunctionError) function BankAPIWeb.Router.Helpers.account_path/2 is undefined or private
code: Routes.account_path(conn, :create),
stacktrace:
(bank_api) BankAPIWeb.Router.Helpers.account_path(%Plug.Conn{adapter: {Plug.Adapters.Test.Conn, :...}, assigns: %{}, before_send: [], body_params: %Plug.Conn.Unfetched{aspect: :body_params}, cookies: %Plug.Conn.Unfetched{aspect: :cookies}, halted: false, host: "www.example.com", method: "GET", owner: #PID<0.409.0>, params: %Plug.Conn.Unfetched{aspect: :params}, path_info: [], path_params: %{}, port: 80, private: %{phoenix_recycled: true, plug_skip_csrf_protection: true}, query_params: %Plug.Conn.Unfetched{aspect: :query_params}, query_string: "", remote_ip: {127, 0, 0, 1}, req_cookies: %Plug.Conn.Unfetched{aspect: :cookies}, req_headers: [{"accept", "application/json"}], request_path: "/", resp_body: nil, resp_cookies: %{}, resp_headers: [{"cache-control", "max-age=0, private, must-revalidate"}], scheme: :http, script_name: [], secret_key_base: nil, state: :unset, status: nil}, :create)
test/bank_api_web/controllers/account_controller_test.exs:20: (test)
2) test create account renders errors when data is invalid (BankAPIWeb.AccountControllerTest)
test/bank_api_web/controllers/account_controller_test.exs:30
** (UndefinedFunctionError) function BankAPIWeb.Router.Helpers.account_path/2 is undefined or private
code: Routes.account_path(conn, :create),
stacktrace:
(bank_api) BankAPIWeb.Router.Helpers.account_path(%Plug.Conn{adapter: {Plug.Adapters.Test.Conn, :...}, assigns: %{}, before_send: [], body_params: %Plug.Conn.Unfetched{aspect: :body_params}, cookies: %Plug.Conn.Unfetched{aspect: :cookies}, halted: false, host: "www.example.com", method: "GET", owner: #PID<0.411.0>, params: %Plug.Conn.Unfetched{aspect: :params}, path_info: [], path_params: %{}, port: 80, private: %{phoenix_recycled: true, plug_skip_csrf_protection: true}, query_params: %Plug.Conn.Unfetched{aspect: :query_params}, query_string: "", remote_ip: {127, 0, 0, 1}, req_cookies: %Plug.Conn.Unfetched{aspect: :cookies}, req_headers: [{"accept", "application/json"}], request_path: "/", resp_body: nil, resp_cookies: %{}, resp_headers: [{"cache-control", "max-age=0, private, must-revalidate"}], scheme: :http, script_name: [], secret_key_base: nil, state: :unset, status: nil}, :create)
test/bank_api_web/controllers/account_controller_test.exs:34: (test)
Finished in 0.1 seconds
4 tests, 2 failures
Randomized with seed 469529
Now that we have a failing test, let’s fill in the blanks. First off, we’ll need a matching route so that the request gets picked up by Phoenix:
# lib/bank_api_web/router.ex
defmodule BankAPIWeb.Router do
use BankAPIWeb, :router
pipeline :api do
plug :accepts, ["json"]
end
scope "/api", BankAPIWeb do
pipe_through :api
resources "/accounts", AccountController, only: [:create] # ◀️◀️◀️
end
end
And a controller for that route:
# lib/bank_api_web/controllers/account_controller.ex
defmodule BankAPIWeb.AccountController do
use BankAPIWeb, :controller
alias BankAPI.Accounts
alias BankAPI.Accounts.Projections.Account
action_fallback BankAPIWeb.FallbackController
def create(conn, %{"account" => account_params}) do
with {:ok, %Account{} = account} <- Accounts.open_account(account_params) do
conn
|> put_status(:created)
|> render("show.json", account: account)
end
end
end
# lib/bank_api_web/controllers/fallback_controller.ex
defmodule BankAPIWeb.FallbackController do
use BankAPIWeb, :controller
def call(conn, {:error, :not_found}) do
conn
|> put_status(:not_found)
|> put_view(BankAPIWeb.ErrorView)
|> render(:"404")
end
def call(conn, {:validation_error, _changeset}) do
conn
|> put_status(:unprocessable_entity)
|> put_view(BankAPIWeb.ErrorView)
|> render(:"422")
end
end
Pretty standard fare - notice we designate a fallback controller to handle unhappy paths. All of our interaction with accounts is done through the Accounts context:
# lib/bank_api/accounts.ex
defmodule BankAPI.Accounts do
@moduledoc """
The Accounts context.
"""
import Ecto.Query, warn: false
alias Ecto.Changeset
alias BankAPI.Repo
alias BankAPI.Router
alias BankAPI.Accounts.Commands.OpenAccount
alias BankAPI.Accounts.Projections.Account
def get_account(uuid), do: Repo.get!(Account, uuid)
def open_account(account_params) do
changeset = account_opening_changeset(account_params)
if changeset.valid? do
account_uuid = UUID.uuid4()
dispatch_result =
%OpenAccount{
initial_balance: changeset.changes.initial_balance,
account_uuid: account_uuid
}
|> Router.dispatch()
case dispatch_result do
:ok ->
{
:ok,
%Account{
uuid: account_uuid,
current_balance: changeset.changes.initial_balance
}
}
reply ->
reply
end
else
{:validation_error, changeset}
end
end
defp account_opening_changeset(params) do
{
params,
%{initial_balance: :integer}
}
|> Changeset.cast(params, [:initial_balance])
|> Changeset.validate_required([:initial_balance])
|> Changeset.validate_number(:initial_balance, greater_than: 0)
end
end
Initially, we’ll just define get_account
and open_account
as our public
interface. Getting an existing account from the read model is trivial given its identifier.
As for opening a new one, we first validate the changeset (done here
for now, we will introduce a command validation middleware later on). If the
changeset isn’t valid, we return an error tuple that will be handled by our
fallback controller. If we do have a valid changeset, an OpenAccount
command
is built and dispatched. The account identifier present in the command is created
here, using a v4 UUID. In case of a successful dispatch, we return a success
tuple to the controller with an Account object to render. Notice we do not insert
the account on the read model directly, instead relying on the command dispatch
triggering a chain of events that will ultimately result in that insertion -
routing of the command to an aggregate, which upon successful handling emits an
event, that is then subscribed to by a projector, which inserts the new account.
The command is quite simple:
# lib/bank_api/accounts/commands/open_account.ex
defmodule BankAPI.Accounts.Commands.OpenAccount do
@enforce_keys [:account_uuid]
defstruct [:account_uuid, :initial_balance]
end
As is our initial router:
# lib/bank_api/router.ex
defmodule BankAPI.Router do
use Commanded.Commands.Router
alias BankAPI.Accounts.Aggregates.Account
alias BankAPI.Accounts.Commands.OpenAccount
dispatch([OpenAccount], to: Account, identity: :account_uuid)
end
Once built by open_account/1
in our context and dispatched through this router,
the command is handled by the Account aggregate.
# lib/bank_api/accounts/aggregates/account.ex
defmodule BankAPI.Accounts.Aggregates.Account do
defstruct uuid: nil,
current_balance: nil
alias __MODULE__
alias BankAPI.Accounts.Commands.OpenAccount
alias BankAPI.Accounts.Events.AccountOpened
def execute(
%Account{uuid: nil},
%OpenAccount{
account_uuid: account_uuid,
initial_balance: initial_balance
}
)
when initial_balance > 0 do
%AccountOpened{
account_uuid: account_uuid,
initial_balance: initial_balance
}
end
def execute(
%Account{uuid: nil},
%OpenAccount{
initial_balance: initial_balance
}
)
when initial_balance <= 0 do
{:error, :initial_balance_must_be_above_zero}
end
def execute(%Account{}, %OpenAccount{}) do
{:error, :account_already_opened}
end
# state mutators
def apply(
%Account{} = account,
%AccountOpened{
account_uuid: account_uuid,
initial_balance: initial_balance
}
) do
%Account{
account
| uuid: account_uuid,
current_balance: initial_balance
}
end
end
One of our execute/2
functions matches an aggregate with no UUID (an account
not yet open) and a given command with an initial_balance
argument over 0 and emits an AccountOpened
event as a result. The apply/2
method applies this event
and changes the aggregate’s internal state to match its payload.
The event is similar to the command in its simplicity:
# lib/bank_api/accounts/events/account_opened.ex
defmodule BankAPI.Accounts.Events.AccountOpened do
@derive [Jason.Encoder]
defstruct [
:account_uuid,
:initial_balance
]
end
Remember the convention: commands in the imperative and events in the past tense. We see here why the past tense makes so much sense as the event signifies a change that has already taken place on an aggregate.
So now we have a flow: route -> controller -> context -> router ->
aggregate. The aggregates are being created, their internal state consistent with
the payload of the initial POST request, and the AccountOpened
event is being
emitted, but we have yet to reflect that on our read model by creating new records
on the database.
For that, we’ll use commanded_ecto_projections
, a library by
the same author as Commanded that makes it easy to handle events and update read
models. You’re free to handle these events without this library, and the docs will
show you how. We start by adding the new dependency:
# mix.exs
defp deps do
[
# ...
{:commanded_ecto_projections, "~> 0.8"}
]
end
Then configuring the default repo to use for the projectors:
# config/config.exs
config :commanded_ecto_projections,
repo: BankAPI.Repo
Let’s create our bank account projection module. This will encode the accounts’ schema on the read model’s database:
# lib/bank_api/accounts/projections/account.ex
defmodule BankAPI.Accounts.Projections.Account do
use Ecto.Schema
@primary_key {:uuid, :binary_id, autogenerate: false}
schema "accounts" do
field :current_balance, :integer
timestamps()
end
end
Notice we don’t want an auto-generated key since we inject a UUID in our command’s payload for that purpose. The migration for the accounts table comes next:
$ mix ecto.gen.migration create_accounts
defmodule BankAPI.Repo.Migrations.CreateAccounts do
use Ecto.Migration
def change do
create table(:accounts, primary_key: false) do
add :uuid, :uuid, primary_key: true
add :current_balance, :integer
timestamps()
end
end
end
As per the lib’s README, we also need to create a migration for projectors to keep track of the last event they’ve processed and not reprocess previously handled events. Generate it with:
$ mix ecto.gen.migration create_projection_versions
And change the generated file to:
defmodule BankAPI.Repo.Migrations.CreateProjectionVersions do
use Ecto.Migration
def change do
create table(:projection_versions, primary_key: false) do
add :projection_name, :text, primary_key: true
add :last_seen_event_number, :bigint
timestamps()
end
end
end
Time to apply the migrations:
$ mix ecto.migrate
OK, on to the projector to create the accounts as a result of the aggregate’s
emission of the AccountOpened
event. The projections library makes this easy:
# lib/bank_api/accounts/projectors/account_opened.ex
defmodule BankAPI.Accounts.Projectors.AccountOpened do
use Commanded.Projections.Ecto,
name: "Accounts.Projectors.AccountOpened"
alias BankAPI.Accounts.Events.AccountOpened
alias BankAPI.Accounts.Projections.Account
project(%AccountOpened{} = evt, _metadata, fn multi ->
Ecto.Multi.insert(multi, :account_opened, %Account{
uuid: evt.account_uuid,
current_balance: evt.initial_balance
})
end)
end
The project
macro facilitated by the library receives the event to handle,
some metadata (that we won’t be using), and a multi
1 object into which
we can add operations. In this case, we insert a new Account using the event’s
payload as attributes.
We’re almost done. For the projector to be always listening to events, we need to
supervise it. Start by changing the application.ex
file to:
# lib/bank_api/application.ex
defmodule BankAPI.Application do
@moduledoc false
use Application
def start(_type, _args) do
import Supervisor.Spec
children = [
supervisor(BankAPI.Repo, []),
supervisor(BankAPIWeb.Endpoint, []),
supervisor(BankAPI.Accounts.Supervisor, [])
]
opts = [strategy: :one_for_one, name: BankAPI.Supervisor]
Supervisor.start_link(children, opts)
end
def config_change(changed, _new, removed) do
BankAPIWeb.Endpoint.config_change(changed, removed)
:ok
end
end
Finally let’s start up the projector on our context’s supervisor:
# lib/bank_api/accounts/supervisor.ex
defmodule BankAPI.Accounts.Supervisor do
use Supervisor
alias BankAPI.Accounts
def start_link do
Supervisor.start_link(__MODULE__, nil)
end
def init(_arg) do
children = [
worker(Accounts.Projectors.AccountOpened, [], id: :account_opened)
]
supervise(children, strategy: :one_for_one)
end
end
The final, final piece is the view used to render Account objects:
# lib/bank_api_web/views/account_view.ex
defmodule BankAPIWeb.AccountView do
use BankAPIWeb, :view
alias BankAPIWeb.AccountView
def render("show.json", %{account: account}) do
%{data: render_one(account, AccountView, "account.json")}
end
def render("account.json", %{account: account}) do
%{
uuid: account.uuid,
current_balance: account.current_balance
}
end
end
If you restart Phoenix’s server now, you should see something similar to this in your console:
[debug] Attempting to start Postgrex
[debug] Successfully started Postgrex (#PID<0.297.0>)
[debug] Attempting to start Postgrex.Notifications
[debug] Successfully started Postgrex.Notifications (#PID<0.308.0>)
[debug] Attempting to start Postgrex
[debug] Successfully started Postgrex (#PID<0.311.0>)
[info] Running BankAPIWeb.Endpoint with cowboy 2.6.1 at 0.0.0.0:4000 (http)
[info] Access BankAPIWeb.Endpoint at http://localhost:4000
[debug] Subscription "Accounts.Projectors.AccountOpened"@"$all" attempting to connect subscriber #PID<0.551.0>
[debug] Subscription "Accounts.Projectors.AccountOpened"@"$all" catching-up
[debug] BankAPI.Accounts.Projectors.AccountOpened has successfully subscribed to event store
And our test for the controller should now be passing:
....
Finished in 0.1 seconds
4 tests, 0 failures
Randomized with seed 460453
POSTing to http://localhost:4000/api/accounts
with a JSON payload like:
{
"account": {
"initial_balance": 1000
}
}
Should now yield a 201
with a response body similar to:
{
"data": {
"current_balance": 1000,
"uuid": "186075de-87e2-450a-b00e-2e11f4186303"
}
}
We made it! ☺️
A note on consistency
You might have noticed that in the Accounts context, we build an object to reply with as soon as we got a successful dispatch.
# ...
def open_account(account_params) do
changeset = account_opening_changeset(account_params)
if changeset.valid? do
account_uuid = UUID.uuid4()
dispatch_result =
%OpenAccount{
initial_balance: changeset.changes.initial_balance,
account_uuid: account_uuid
}
|> Router.dispatch()
case dispatch_result do
:ok ->
{
:ok,
%Account{ # ◀️ ◀️ ◀️ HERE
uuid: account_uuid,
current_balance: changeset.changes.initial_balance
}
}
reply ->
reply
end
else
{:validation_error, changeset}
end
end
# ...
We do this because we’re confident a successful dispatch will create the account. If you instead try to get the account by its identifier after the dispatch:
case dispatch_result do
:ok ->
{
:ok,
get_account(account_uuid)
}
reply ->
reply
end
you’ll see something like this:

Why is this the case? Basically, the call to get_account
is being made
before the projector picks up the event and materializes it on the database.
The projector is quick, but not instantaneous. We say in this case that the
system behaves in an eventually consistent manner, because the account will be
present at some time in the (hopefully immediate) future.
There are two ways around this. We can either return an “optimistic” view of the system’s state like we originally did, or force the dispatch to be “strongly consistent”, as opposed to Commanded’s default stance of “eventual consistent”. When we operate with strong consistency, the dispatch call will block until all event handlers have run before returning.
We can set the default consistency to strong
throughout the whole codebase:
# config/config.exs
config :commanded, default_consistency: :strong
Or explicitly set the consistency on a per-dispatch case. In ours, it’d be:
# ...
dispatch_result =
%OpenAccount{
initial_balance: changeset.changes.initial_balance,
account_uuid: account_uuid
}
|> Router.dispatch(consistency: :strong) # ◀️◀️◀️
# ...
But this will only work if the projector is also set to strongly consistent, which is easy enough to do:
# lib/bank_api/accounts/projectors/account_opened.ex
defmodule BankAPI.Accounts.Projectors.AccountOpened do
use Commanded.Projections.Ecto,
name: "Accounts.Projectors.AccountOpened",
consistency: :strong # ◀️◀️◀️
# ...
With these changes in place, we can ditch the optimistic return and fetch the
account from the database, mindful that the endpoint will take longer to reply
since the router dispatch will block until the projector concludes its handling.
In my opinion, setting the consistency for the whole codebase is a bit limiting as
different operations on your system will probably require different guarantees.
Commanded’s default with an occasional strong
override is the best approach.
The documentation has a more detailed explanation about this subject.
Wrapping Up
I hope the avalanche of code in the article was bearable. I’m sure you’ll agree it’s all very easy to follow and logical. Some might find the structure overbearing, but I find it liberating in a sense, since I immediately know where every step of the flow of interaction is located.
I skipped some tests, namely for our aggregate (using the in-memory event store), the context’s validation logic and our projector but this post is already quite long. Next time we’ll add those tests, come up with a better approach to validation using middleware, and work on our second feature.
Finally, I’d like to thank Ben Smith, Commanded’s author, for taking the time to give me helpful feedback.
Until next time!
Cover image credit: PIXNIO
- For more info on Ecto.Multi, check this article out. [return]