Event Sourcing With Elixir - Part 6

Posts in this series
Intro
Transfers between accounts are a necessity for any bank, and ours is no exception. They’re also the perfect vehicle to introduce Process Managers to our toolset.
Every operation we’ve done so far focuses on a single aggregate - but what to do when we need to change more than one aggregate in a single domain interaction? Since we can only route a command to an aggregate, this seems hard to do. Process Managers (or Sagas) are components that behave like event handlers, but instead of changing the read model like Projectors, they emit commands. They also hold state, which allows them to keep track of the progress of their orchestration between aggregates.
Furthermore, this “outside control” of aggregates avoids having any sort of knowledge between aggregates to perform transfers. You can think of the aggregates’ functionality as the building blocks for more complex functionality facilitated by Process Managers.
Implementation
Let’s start outside-in once more. First stop, the Web router:
# lib/bank_api_web/router.ex
# ...
post "/accounts/:id/transfer", AccountController, :transfer
And the new action on our controller:
# lib/bank_api_web/controllers/account_controller.ex
# ...
def transfer(
conn,
%{
"id" => account_id,
"transfer_amount" => amount,
"destination_account" => destination_account_id
}
) do
with :ok <-
Accounts.transfer(account_id, amount, destination_account_id) do
conn
|> send_resp(201, "")
end
end
Pretty simple so far - our context has a new function to handle this:
# lib/bank_api/accounts.ex
def transfer(source_id, amount, destination_id) do
%TransferBetweenAccounts{
account_uuid: source_id,
transfer_uuid: UUID.uuid4(),
transfer_amount: amount,
destination_account_uuid: destination_id
}
|> Router.dispatch()
end
As before, our context serves as a gateway to dispatches on our router. The command to kickstart transfers:
# lib/bank_api/accounts/commands/transfer_between_accounts.ex
defmodule BankAPI.Accounts.Commands.TransferBetweenAccounts do
@enforce_keys [:account_uuid, :transfer_uuid]
defstruct [:account_uuid, :transfer_uuid, :transfer_amount, :destination_account_uuid]
alias BankAPI.Repo
alias BankAPI.Accounts
alias BankAPI.Accounts.Commands.Validators
alias BankAPI.Accounts.Projections.Account
def valid?(command) do
cmd = Map.from_struct(command)
with %Account{} <- account_exists?(cmd.destination_account_uuid),
true <- account_open?(cmd.destination_account_uuid) do
Skooma.valid?(cmd, schema())
else
nil ->
{:error, ["Destination account does not exist"]}
false ->
{:error, ["Destination account closed"]}
reply ->
reply
end
end
defp schema do
%{
account_uuid: [:string, Skooma.Validators.regex(Accounts.uuid_regex())],
transfer_uuid: [:string, Skooma.Validators.regex(Accounts.uuid_regex())],
transfer_amount: [:int, &Validators.positive_integer(&1, 1)],
destination_account_uuid: [:string, Skooma.Validators.regex(Accounts.uuid_regex())]
}
end
defp account_exists?(uuid) do
Repo.get(Account, uuid)
end
defp account_open?(uuid) do
account = Repo.get!(Account, uuid)
account.status == Account.status().open
end
end
This might be a bit controversial, but we do check the read model to validate this command. We don’t want to allow transfers to closed or non-existing accounts.
Adding this command to our router is pretty simple:
# lib/bank_api/router.ex
defmodule BankAPI.Router do
use Commanded.Commands.Router
alias BankAPI.Accounts.Aggregates.Account
alias BankAPI.Accounts.Commands.{
OpenAccount,
CloseAccount,
DepositIntoAccount,
WithdrawFromAccount,
TransferBetweenAccounts # ◀️◀️◀️
}
middleware(BankAPI.Middleware.ValidateCommand)
dispatch(
[
OpenAccount,
CloseAccount,
DepositIntoAccount,
WithdrawFromAccount,
TransferBetweenAccounts # ◀️◀️◀️
],
to: Account,
identity: :account_uuid
)
end
Our origin account for the transfer will handle this command:
# lib/bank_api/accounts/aggregates/account.ex
# ...
def execute(
%Account{
uuid: account_uuid,
closed?: true
},
%TransferBetweenAccounts{
account_uuid: account_uuid
}
) do
{:error, :account_closed}
end
def execute(
%Account{uuid: account_uuid, closed?: false},
%TransferBetweenAccounts{
account_uuid: account_uuid,
destination_account_uuid: destination_account_uuid
}
)
when account_uuid == destination_account_uuid do
{:error, :transfer_to_same_account}
end
def execute(
%Account{
uuid: account_uuid,
closed?: false,
current_balance: current_balance
},
%TransferBetweenAccounts{
account_uuid: account_uuid,
transfer_amount: transfer_amount
}
)
when current_balance < transfer_amount do
{:error, :insufficient_funds}
end
def execute(
%Account{uuid: account_uuid, closed?: false},
%TransferBetweenAccounts{
account_uuid: account_uuid,
transfer_uuid: transfer_uuid,
transfer_amount: transfer_amount,
destination_account_uuid: destination_account_uuid
}
) do
%MoneyTransferRequested{
transfer_uuid: transfer_uuid,
source_account_uuid: account_uuid,
amount: transfer_amount,
destination_account_uuid: destination_account_uuid
}
end
def execute(
%Account{},
%TransferBetweenAccounts{}
) do
{:error, :not_found}
end
We handle the command as with all previous ones. We match on closed accounts and
if the destination account is the same as the source one, we throw an error. We
also check for sufficient funds for the transfer. The event we emit here will be
the one the process manager will pick up on and start the transfer. For that
reason, the aggregate apply/2
function doesn’t change the aggregate’s state
for this operation:
# lib/bank_api/accounts/aggregates/account.ex
# ...
def apply(
%Account{} = account,
%MoneyTransferRequested{}
) do
account
end
The event itself is pretty simple:
# lib/bank_api/accounts/events/money_transfer_requested.ex
defmodule BankAPI.Accounts.Events.MoneyTransferRequested do
@derive [Jason.Encoder]
defstruct [
:transfer_uuid,
:source_account_uuid,
:destination_account_uuid,
:amount
]
end
Which brings us to the process manager at last. You can see that, just like an aggregate, we have a state definition in the form of a struct. We also need to name the manager (name must be unique throughout the codebase) and specify which router to use when we dispatch commands.
# lib/bank_api/accounts/process_managers/transfer_money.ex
defmodule BankAPI.Accounts.ProcessManagers.TransferMoney do
use Commanded.ProcessManagers.ProcessManager,
name: "Accounts.ProcessManagers.TransferMoney",
router: BankAPI.Router
@derive Jason.Encoder
defstruct [
:transfer_uuid,
:source_account_uuid,
:destination_account_uuid,
:amount,
:status
]
# ...
Then, we specify which events we want to handle with interested?/1
functions. First
off, we want to handle the MoneyTransferRequested
event, and we return a tuple
with start!
and a UUID that will serve as an identifier for the Erlang process
that will be started for this particular process manager instance. All further
interaction with the process manager will need the same UUID to reach the same instance.
# lib/bank_api/accounts/process_managers/transfer_money.ex
# ...
def interested?(%MoneyTransferRequested{transfer_uuid: transfer_uuid}),
do: {:start!, transfer_uuid}
def interested?(%WithdrawnFromAccount{transfer_uuid: transfer_uuid})
when is_nil(transfer_uuid),
do: false
def interested?(%WithdrawnFromAccount{transfer_uuid: transfer_uuid}),
do: {:continue!, transfer_uuid}
def interested?(%DepositedIntoAccount{transfer_uuid: transfer_uuid})
when is_nil(transfer_uuid),
do: false
def interested?(%DepositedIntoAccount{transfer_uuid: transfer_uuid}),
do: {:stop, transfer_uuid}
def interested?(_event), do: false
We could have also used start
without the !
, but I’ve added the bang for extra
validation that the process doesn’t already exist. More info here.
The list of events we’re interested in also includes WithdrawnFromAccount
and
DepositedIntoAccount
since these are the consequence events from the commands
we’ll be dispatching to execute a transfer. This diagram helps clear it up:

Note that we could have used one less “hop” in the flow, and have the event that kickstarts the transfer already withdraw money from the source account, but it’s important to capture intent in your domain. More on this on the next post.
Once we signal as being interested in an event, we need to have handle
functions
that emit the commands, and apply
functions to change the manager’s state:
# lib/bank_api/accounts/process_managers/transfer_money.ex
# ...
def handle(
%TransferMoney{},
%MoneyTransferRequested{
source_account_uuid: source_account_uuid,
amount: transfer_amount,
transfer_uuid: transfer_uuid
}
) do
%WithdrawFromAccount{
account_uuid: source_account_uuid,
withdraw_amount: transfer_amount,
transfer_uuid: transfer_uuid
}
end
def handle(
%TransferMoney{transfer_uuid: transfer_uuid} = pm,
%WithdrawnFromAccount{transfer_uuid: transfer_uuid}
) do
%DepositIntoAccount{
account_uuid: pm.destination_account_uuid,
deposit_amount: pm.amount,
transfer_uuid: pm.transfer_uuid
}
end
###
def apply(%TransferMoney{} = pm, %MoneyTransferRequested{} = evt) do
%TransferMoney{
pm
| transfer_uuid: evt.transfer_uuid,
source_account_uuid: evt.source_account_uuid,
destination_account_uuid: evt.destination_account_uuid,
amount: evt.amount,
status: :withdraw_money_from_source_account
}
end
def apply(%TransferMoney{} = pm, %WithdrawnFromAccount{}) do
%TransferMoney{
pm
| status: :deposit_money_in_destination_account
}
end
You will notice that the WithdrawFromAccount
and DepositIntoAccount
commands
have been enhanced with a transfer_uuid
field. This is to distinguish deposits
and withdrawals being done as part of a transfer, or single operations. That is
why the interested?/1
hooks for these events match for when this field is missing
so as to not handle them. We need to tweak our commands and events appropriately.
The last change we need is to add our new process manager to our supervisor:
# lib/bank_api/accounts/supervisor.ex
defmodule BankAPI.Accounts.Supervisor do
use Supervisor
alias BankAPI.Accounts.Projectors
alias BankAPI.Accounts.ProcessManagers # ◀️◀️◀️
def start_link do
Supervisor.start_link(__MODULE__, nil)
end
def init(_arg) do
children = [
# Projectors
worker(Projectors.AccountOpened, [], id: :account_opened),
worker(Projectors.AccountClosed, [], id: :account_closed),
worker(Projectors.DepositsAndWithdrawals, [], id: :deposits_and_withdrawals),
# Process managers
worker(ProcessManagers.TransferMoney, [], id: :transfer_money) # ◀️◀️◀️
]
supervise(children, strategy: :one_for_one)
end
end
Oh, and add that new error to our fallback controller:
# lib/bank_api_web/controllers/fallback_controller.ex
# ...
def call(conn, {:error, :transfer_to_same_account}) do
conn
|> put_status(:unprocessable_entity)
|> put_view(BankAPIWeb.ErrorView)
|> assign(:message, "Source and destination accounts are the same")
|> render(:"422")
end
Wrapping Up
So now we know how to orchestrate complex flows between our carefully designed aggregates. Process managers are a really powerful tool in the Commanded arsenal and they help us breakdown our domain’s operations between low-level, single aggregate ones, and higher-level, composite, intra-aggregate ones.
The code is available online.
And that’s it, we’ve covered most of the basics about Commanded! I hope this series has brought you more insight on event sourcing, DDD concepts, and CQRS as an architectural pattern. I’ll be wrapping up with a final post with a mix of lessons learned from running Commanded projects in production and little bits of extra information next time!
Many thanks once more to Commanded’s author, Ben Smith, for helpful feedback!
Cover image credit: PIXNIO