Event Sourcing With Elixir - Part 5

Posts in this series
Way back in Part 3 we committed to the features we’d like to provide. This post was supposed to just add the missing features before transfers, and introduce process managers then, but it kinda ballooned out of control in terms of size, so next time we’ll tackle transfers between accounts in their own post.
Account closing
Closing an account at our bank is desirable, but past events pertaining to that
account will remain on our event store. Now that the GDPR1 is a thing, this poses
a problem but some ways around it exist (we’ll explore them in part 7). We’ll
pretend it’s pre-GDPR days and sidestep this issue for the sake of keeping things
simple. Our Account
aggregate will have a closed?
boolean field added, and
closing an account just flips it.
# lib/bank_api/accounts/aggregates/account.ex
defmodule BankAPI.Accounts.Aggregates.Account do
defstruct uuid: nil,
current_balance: nil,
closed?: false
# ...
Let’s proceed outside-in. A new route on our Web interface:
# lib/bank_api_web/router.ex
resources "/accounts", AccountController, only: [:create, :delete, :show]
We’ll add delete
but also show
to just be able to see the status of our accounts.
Next, our controller:
# lib/bank_api_web/controllers/account_controller.ex
# ...
def delete(conn, %{"id" => account_id}) do
with :ok <- Accounts.close_account(account_id) do
conn
|> send_resp(200, "")
end
end
def show(conn, %{"id" => account_id}) do
with {:ok, %Account{} = account} <- Accounts.get_account(account_id) do
conn
|> put_status(:ok)
|> render("show.json", account: account)
end
end
Deletions will just send a 200 OK
response with no body, or an error if they
fail.
Let’s add the command and event to close an account, as they’ll be mentioned in
the Accounts
context operations:
# lib/bank_api/accounts/commands/close_account.ex
defmodule BankAPI.Accounts.Commands.CloseAccount do
@enforce_keys [:account_uuid]
defstruct [:account_uuid]
alias BankAPI.Accounts
def valid?(command) do
Skooma.valid?(Map.from_struct(command), schema())
end
defp schema do
%{
account_uuid: [:string, Skooma.Validators.regex(Accounts.uuid_regex())]
}
end
end
Notice we’ve moved the uuid_regex
from our OpenAccount
command to the Accounts
context since we have it in more than one command now. Small refactor for the
OpenAccount
command:
# lib/bank_api/accounts/commands/open_account.ex
defmodule BankAPI.Accounts.Commands.OpenAccount do
@enforce_keys [:account_uuid]
defstruct [:account_uuid, :initial_balance]
alias BankAPI.Accounts
alias BankAPI.Accounts.Commands.Validators
def valid?(command) do
Skooma.valid?(Map.from_struct(command), schema())
end
defp schema do
%{
account_uuid: [:string, Skooma.Validators.regex(Accounts.uuid_regex())],
initial_balance: [:int, &Validators.positive_integer(&1)]
}
end
end
The new Validators
module:
# lib/bank_api/accounts/commands/validators.ex
defmodule BankAPI.Accounts.Commands.Validators do
def positive_integer(data, minimum \\ 0) do
if is_integer(data) do
if data > minimum do
:ok
else
{:error, "Argument must be bigger than #{minimum}"}
end
else
{:error, "Argument must be an integer"}
end
end
end
This change breaks our tests, so change all expectactions of ["Argument must be bigger than zero"]
to ["Argument must be bigger than 0"]
in test/bank_api/accounts_test.exs
.
Finally, the event emitted by the aggregate for this command:
# lib/bank_api/accounts/events/account_closed.ex
defmodule BankAPI.Accounts.Events.AccountClosed do
@derive [Jason.Encoder]
defstruct [
:account_uuid
]
end
Our context with the new operations:
# lib/bank_api/accounts.ex
alias BankAPI.Accounts.Commands.{OpenAccount, CloseAccount}
def uuid_regex do
~r/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/
end
def get_account(id) do
case Repo.get(Account, id) do
%Account{} = account ->
{:ok, account}
_reply ->
{:error, :not_found}
end
end
def close_account(id) do
%CloseAccount{
account_uuid: id
}
|> Router.dispatch()
end
def open_account(%{"initial_balance" => initial_balance}) do
account_uuid = UUID.uuid4()
dispatch_result =
%OpenAccount{
initial_balance: initial_balance,
account_uuid: account_uuid
}
|> Router.dispatch()
case dispatch_result do
:ok ->
{
:ok,
%Account{
uuid: account_uuid,
current_balance: initial_balance,
status: Account.status().open # ◀️◀️◀️
}
}
reply ->
reply
end
end
# ...
Our get_account
is a query, so no command is dispatched for it - just a normal
Ecto lookup. For close_account
we return the dispatch result, either :ok
or
an error handled by our FallbackController
. Noticed that for open_account
I’ve also added the status
field to our initial optimistic return that doesn’t
wait for the read-model projector to run (status field coming up on our projection
changes).
Let’s add this new command to our router:
# lib/bank_api/router.ex
# ...
alias BankAPI.Accounts.Commands.{OpenAccount, CloseAccount}
dispatch([OpenAccount, CloseAccount],
to: Account,
identity: :account_uuid
)
Our Account
aggregate now needs to handle this new command being dispatched to
it:
# lib/bank_api/accounts/aggregates/account.ex
# ...
alias BankAPI.Accounts.Commands.{OpenAccount, CloseAccount}
alias BankAPI.Accounts.Events.{AccountOpened, AccountClosed}
def execute(
%Account{uuid: account_uuid, closed?: true},
%CloseAccount{
account_uuid: account_uuid
}
) do
{:error, :account_already_closed}
end
def execute(
%Account{uuid: account_uuid, closed?: false},
%CloseAccount{
account_uuid: account_uuid
}
) do
%AccountClosed{
account_uuid: account_uuid
}
end
def execute(
%Account{},
%CloseAccount{}
) do
{:error, :not_found}
end
def apply(
%Account{uuid: account_uuid} = account,
%AccountClosed{
account_uuid: account_uuid
}
) do
%Account{
account
| closed?: true
}
end
# ...
Pretty simple - we first match on an account that is already closed, and return an error (can’t close a closed account). The other error is not being able to find an account. Otherwise, we emit the relevant event.
Our FallbackController
needs to handle this new error:
# lib/bank_api_web/controllers/fallback_controller.ex
def call(conn, {:error, :account_already_closed}) do
conn
|> put_status(:unprocessable_entity)
|> put_view(BankAPIWeb.ErrorView)
|> render(:"422")
end
At this point, the command can be handled by our Web layer, routed to the aggregate and the corresponding process will have its state changed. That leaves just the read-model to be updated. Let’s add a status field to our accounts:
# priv/repo/migrations/20190525105154_add_status_to_accounts.exs
defmodule BankAPI.Repo.Migrations.AddStatusToAccounts do
use Ecto.Migration
def change do
alter table(:accounts) do
add :status, :text
end
end
end
With this change on the database, we can write a projector that will flip the field:
# lib/bank_api/accounts/projectors/account_closed.ex
defmodule BankAPI.Accounts.Projectors.AccountClosed do
use Commanded.Projections.Ecto,
name: "Accounts.Projectors.AccountClosed"
alias BankAPI.Accounts
alias BankAPI.Accounts.Events.AccountClosed
alias BankAPI.Accounts.Projections.Account
alias Ecto.{Changeset, Multi}
project(%AccountClosed{} = evt, _metadata, fn multi ->
with {:ok, %Account{} = account} <- Accounts.get_account(evt.account_uuid) do
Multi.update(
multi,
:account,
Changeset.change(account, status: Account.status().closed)
)
else
# ignore when this happens
_ -> multi
end
end)
end
We first lookup the task, then we update it. If the task doesn’t exist, I’m just
ignoring it but proper error handling should be in place if this is important to
your use case. I’m not a big fan of just using strings to update fields, so the
status is retrieved from the Account
projection:
# lib/bank_api/accounts/projections/account.ex
defmodule BankAPI.Accounts.Projections.Account do
use Ecto.Schema
@primary_key {:uuid, :binary_id, autogenerate: false}
schema "accounts" do
field :current_balance, :integer
field :status, :string
timestamps()
end
def status do
%{
open: "open",
closed: "closed"
}
end
end
One final touch is to have the AccountOpened
projector also populate the status
field:
# lib/bank_api/accounts/projectors/account_opened.ex
# ...
project(%AccountOpened{} = evt, _metadata, fn multi ->
Ecto.Multi.insert(multi, :account_opened, %Account{
uuid: evt.account_uuid,
current_balance: evt.initial_balance,
status: Account.status().open # ◀️◀️◀️
})
end)
Finally for this feature, let’s make sure the supervisor worker is running with our app:
# lib/bank_api/accounts/supervisor.ex
defmodule BankAPI.Accounts.Supervisor do
use Supervisor
alias BankAPI.Accounts
def start_link do
Supervisor.start_link(__MODULE__, nil)
end
def init(_arg) do
children = [
worker(Accounts.Projectors.AccountOpened, [], id: :account_opened),
worker(Accounts.Projectors.AccountClosed, [], id: :account_closed) # ◀️◀️◀️
]
supervise(children, strategy: :one_for_one)
end
end
With these pieces in place, we’re done with account closure. Let’s move on to deposits and withdrawals.
Deposits and withdrawals
As with before, let’s start with the Web routes:
# lib/bank_api_web/router.ex
post "/accounts/:id/deposit", AccountController, :deposit
post "/accounts/:id/withdraw", AccountController, :withdraw
And our controller functions:
# lib/bank_api_web/controllers/account_controller.ex
def deposit(conn, %{"id" => account_id, "deposit_amount" => amount}) do
with {:ok, %Account{} = account} <- Accounts.deposit(account_id, amount) do
conn
|> put_status(:ok)
|> render("show.json", account: account)
end
end
def withdraw(conn, %{"id" => account_id, "withdrawal_amount" => amount}) do
with {:ok, %Account{} = account} <- Accounts.withdraw(account_id, amount) do
conn
|> put_status(:ok)
|> render("show.json", account: account)
end
end
Moving on to our context:
alias BankAPI.Accounts.Commands.{
OpenAccount,
CloseAccount,
DepositIntoAccount,
WithdrawFromAccount
}
def deposit(id, amount) do
dispatch_result =
%DepositIntoAccount{
account_uuid: id,
deposit_amount: amount
}
|> Router.dispatch(consistency: :strong)
case dispatch_result do
:ok ->
{
:ok,
Repo.get!(Account, id)
}
reply ->
reply
end
end
def withdraw(id, amount) do
dispatch_result =
%WithdrawFromAccount{
account_uuid: id,
withdraw_amount: amount
}
|> Router.dispatch(consistency: :strong)
case dispatch_result do
:ok ->
{
:ok,
Repo.get!(Account, id)
}
reply ->
reply
end
end
We probably want strong consistency here, since these operations are important to
an account. Thus, we dispatch these new commands with a strong guarantee which
makes Commanded wait until all effects from the dispatch are resolved. Using
consistency: [BankAPI.Accounts.Projectors.DepositsAndWithdrawals]
is also an
option here, to specify on which event handler you’re waiting for. This cuts down
on latency by involving fewer handlers at the cost of some increased coupling.
Here are the commands:
# lib/bank_api/accounts/commands/deposit_into_account.ex
defmodule BankAPI.Accounts.Commands.DepositIntoAccount do
@enforce_keys [:account_uuid]
defstruct [:account_uuid, :deposit_amount]
alias BankAPI.Accounts
alias BankAPI.Accounts.Commands.Validators
def valid?(command) do
Skooma.valid?(Map.from_struct(command), schema())
end
defp schema do
%{
account_uuid: [:string, Skooma.Validators.regex(Accounts.uuid_regex())],
deposit_amount: [:int, &Validators.positive_integer(&1, 1)]
}
end
end
# lib/bank_api/accounts/commands/withdraw_from_account.ex
defmodule BankAPI.Accounts.Commands.WithdrawFromAccount do
@enforce_keys [:account_uuid]
defstruct [:account_uuid, :withdraw_amount]
alias BankAPI.Accounts
alias BankAPI.Accounts.Commands.Validators
def valid?(command) do
Skooma.valid?(Map.from_struct(command), schema())
end
defp schema do
%{
account_uuid: [:string, Skooma.Validators.regex(Accounts.uuid_regex())],
withdraw_amount: [:int, &Validators.positive_integer(&1, 1)]
}
end
end
And the events emitted after their successful handling by the aggregate:
# lib/bank_api/accounts/events/deposited_into_account.ex
defmodule BankAPI.Accounts.Events.DepositedIntoAccount do
@derive [Jason.Encoder]
defstruct [
:account_uuid,
:new_current_balance
]
end
# lib/bank_api/accounts/events/withdrawn_from_account.ex
defmodule BankAPI.Accounts.Events.WithdrawnFromAccount do
@derive [Jason.Encoder]
defstruct [
:account_uuid,
:new_current_balance
]
end
The new commands must be added to be router:
# lib/bank_api/router.ex
# ...
alias BankAPI.Accounts.Commands.{
OpenAccount,
CloseAccount,
DepositIntoAccount,
WithdrawFromAccount
}
dispatch([OpenAccount, CloseAccount, DepositIntoAccount, WithdrawFromAccount],
to: Account,
identity: :account_uuid
)
# ...
And the aggregate needs to handle them:
# lib/bank_api/accounts/aggregates/account.ex
# ...
alias BankAPI.Accounts.Commands.{
OpenAccount,
CloseAccount,
DepositIntoAccount,
WithdrawFromAccount
}
alias BankAPI.Accounts.Events.{
AccountOpened,
AccountClosed,
DepositedIntoAccount,
WithdrawnFromAccount
}
def execute(
%Account{uuid: account_uuid, closed?: false, current_balance: current_balance},
%DepositIntoAccount{account_uuid: account_uuid, deposit_amount: amount}
) do
%DepositedIntoAccount{
account_uuid: account_uuid,
new_current_balance: current_balance + amount
}
end
def execute(
%Account{uuid: account_uuid, closed?: true},
%DepositIntoAccount{account_uuid: account_uuid}
) do
{:error, :account_closed}
end
def execute(
%Account{},
%DepositIntoAccount{}
) do
{:error, :not_found}
end
def execute(
%Account{uuid: account_uuid, closed?: false, current_balance: current_balance},
%WithdrawFromAccount{account_uuid: account_uuid, withdraw_amount: amount}
) do
if current_balance - amount > 0 do
%WithdrawnFromAccount{
account_uuid: account_uuid,
new_current_balance: current_balance - amount
}
else
{:error, :insufficient_funds}
end
end
def execute(
%Account{uuid: account_uuid, closed?: true},
%WithdrawFromAccount{account_uuid: account_uuid}
) do
{:error, :account_closed}
end
def execute(
%Account{},
%WithdrawFromAccount{}
) do
{:error, :not_found}
end
def apply(
%Account{
uuid: account_uuid,
current_balance: _current_balance
} = account,
%DepositedIntoAccount{
account_uuid: account_uuid,
new_current_balance: new_current_balance
}
) do
%Account{
account
| current_balance: new_current_balance
}
end
def apply(
%Account{
uuid: account_uuid,
current_balance: _current_balance
} = account,
%WithdrawnFromAccount{
account_uuid: account_uuid,
new_current_balance: new_current_balance
}
) do
%Account{
account
| current_balance: new_current_balance
}
end
The logic is very similar for withdrawals and deposits. We match against closed accounts first, and then in the case of withdrawals we also check if the current balance is enough before we emit the event. Notice that the event doesn’t have the amount of the deposit or withdrawal, but just the new balance of the aggregate. This makes our lives easier when projecting these events onto the read-model.
Since the projection logic is so simple for these two events (just set the balance to the one in the received event), we can have one projector for both:
# lib/bank_api/accounts/projectors/deposits_and_withdrawals.ex
defmodule BankAPI.Accounts.Projectors.DepositsAndWithdrawals do
use Commanded.Projections.Ecto,
name: "Accounts.Projectors.DepositsAndWithdrawals",
consistency: :strong
alias BankAPI.Accounts
alias BankAPI.Accounts.Events.{DepositedIntoAccount, WithdrawnFromAccount}
alias BankAPI.Accounts.Projections.Account
alias Ecto.{Changeset, Multi}
project(%DepositedIntoAccount{} = evt, _metadata, fn multi ->
with {:ok, %Account{} = account} <- Accounts.get_account(evt.account_uuid) do
Multi.update(
multi,
:account,
Changeset.change(
account,
current_balance: evt.new_current_balance
)
)
else
# ignore when this happens
_ -> multi
end
end)
project(%WithdrawnFromAccount{} = evt, _metadata, fn multi ->
with {:ok, %Account{} = account} <- Accounts.get_account(evt.account_uuid) do
Multi.update(
multi,
:account,
Changeset.change(
account,
current_balance: evt.new_current_balance
)
)
else
# ignore when this happens
_ -> multi
end
end)
end
Which needs to be supervised:
# lib/bank_api/accounts/supervisor.ex
defmodule BankAPI.Accounts.Supervisor do
use Supervisor
alias BankAPI.Accounts.Projectors
def start_link do
Supervisor.start_link(__MODULE__, nil)
end
def init(_arg) do
children = [
worker(Projectors.AccountOpened, [], id: :account_opened),
worker(Projectors.AccountClosed, [], id: :account_closed),
worker(Projectors.DepositsAndWithdrawals, [], id: :deposits_and_withdrawals)
]
supervise(children, strategy: :one_for_one)
end
end
Our FallbackController
needs to cover more cases now, so let’s add some messages
to it:
# lib/bank_api_web/controllers/fallback_controller.ex
defmodule BankAPIWeb.FallbackController do
use BankAPIWeb, :controller
def call(conn, {:error, :not_found}) do
conn
|> put_status(:not_found)
|> put_view(BankAPIWeb.ErrorView)
|> render(:"404")
end
def call(conn, {:error, :account_already_closed}) do
conn
|> put_status(:unprocessable_entity)
|> put_view(BankAPIWeb.ErrorView)
|> assign(:message, "Account already closed")
|> render(:"422")
end
def call(conn, {:error, :account_closed}) do
conn
|> put_status(:unprocessable_entity)
|> put_view(BankAPIWeb.ErrorView)
|> assign(:message, "Account closed")
|> render(:"422")
end
def call(conn, {:error, :bad_command}) do
conn
|> put_status(:unprocessable_entity)
|> put_view(BankAPIWeb.ErrorView)
|> assign(:message, "Bad command")
|> render(:"422")
end
def call(conn, {:error, :insufficient_funds}) do
conn
|> put_status(:unprocessable_entity)
|> put_view(BankAPIWeb.ErrorView)
|> assign(:message, "Insufficient funds to process order")
|> render(:"422")
end
def call(conn, {:error, :command_validation_failure, _command, _errors}) do
conn
|> put_status(:unprocessable_entity)
|> put_view(BankAPIWeb.ErrorView)
|> assign(:message, "Command validation error")
|> render(:"422")
end
end
In order to show that message, let’s tweak our ErrorView
:
# lib/bank_api_web/views/error_view.ex
defmodule BankAPIWeb.ErrorView do
use BankAPIWeb, :view
def template_not_found(template, _assigns) do
%{errors: %{detail: Phoenix.Controller.status_message_from_template(template)}}
end
def render("422.json", assigns) do
%{
errors: %{
message: assigns[:message] || "Unprocessable entity"
}
}
end
end
And while we’re at it, only return the balance of open accounts on the account view:
# lib/bank_api_web/views/account_view.ex
defmodule BankAPIWeb.AccountView do
use BankAPIWeb, :view
alias BankAPIWeb.AccountView
alias BankAPI.Accounts.Projections.Account
def render("show.json", %{account: account}) do
%{data: render_one(account, AccountView, "account.json")}
end
def render("account.json", %{account: account}) do
if account.status == Account.status().closed do
%{
uuid: account.uuid,
current_balance: account.current_balance
}
else
%{
uuid: account.uuid,
current_balance: account.current_balance,
status: account.status
}
end
end
end
Wrapping Up
I hope the torrent of code was easy to follow. We’ve covered all of this before, but it was important to set the stage for account transfers next time. I’ve skipped over the tests to keep the post from being even lengthier, but I might tackle them next time if any of them has some interesting patterns we can explore. Next time: operations on more than one aggregate with Process Managers.
See you there! And a note of thanks to Ben Smith, Commanded’s author, for helpful feedback on the post’s content.
Cover image credit: PIXNIO