Event Sourcing With Elixir - Part 4

Posts in this series
It’s been a while! But now, after holidays, vacation time and a trip to ElixirConf EU 2019 in lovely Prague, I’m back to carry on with our event-driven Bank API project! This time, we’ll go over Commanded middleware and we’ll improve our testing coverage of existing features.
A better way to validate commands
The previous post had somewhat rushed command validation using Ecto.Changeset
,
and trying out a new way to do validation gives us the perfect introduction to a
new building block in Commanded - middleware.
If you’re familiar with Ruby’s Rack or Raxx in the Elixir world, Commanded’s
middleware will be familiar to you. If not, think of Commanded’s command dispatch
workflow as similar to Plug’s conn
passing through the stack.
In Commanded, we have a Pipeline that is a struct passed around the
middleware modules specified in your Router
, in the order defined. Certain functions
are executed in middleware modules before and after successful or failed command
dispatches.
Command validation is a bit of a shared responsibility in our project. In the case
of handling bank account openings, we don’t want to even dispatch an OpenAccount
command if the parameters given do not include an initial_balance
field, but
validating what that parameter should be is offloaded to the command itself. This
way, we avoid dispatching commands we know aren’t valid structurally, but any further
business logic is not a first line concern. So, let’s tweak our open_account/1
function in the Accounts context:
# lib/bank_api/accounts.ex
# ...
def open_account(%{"initial_balance" => initial_balance}) do
account_uuid = UUID.uuid4()
dispatch_result =
%OpenAccount{
initial_balance: initial_balance,
account_uuid: account_uuid
}
|> Router.dispatch()
case dispatch_result do
:ok ->
{
:ok,
%Account{
uuid: account_uuid,
current_balance: initial_balance
}
}
reply ->
reply
end
end
def open_account(_params), do: {:error, :bad_command}
# ...
Using pattern-matching, we immediately discard calls not including the initial balance as an argument - we’ll handle this shortly in our fallback controller. If we do receive the proper argument, we construct the command and dispatch it. It will be here that the validation middleware will take over and do a deeper analysis.
The middleware module is quite simple, since we’re opting to defer the validation logic to the command:
# lib/bank_api/support/middleware/validate_command.ex
defmodule BankAPI.Middleware.ValidateCommand do
@behaviour Commanded.Middleware
alias Commanded.Middleware.Pipeline
def before_dispatch(%Pipeline{command: command} = pipeline) do
case command.__struct__.valid?(command) do
:ok ->
pipeline
{:error, messages} ->
pipeline
|> Pipeline.respond({:error, :command_validation_failure, command, messages})
|> Pipeline.halt()
end
end
def after_dispatch(pipeline), do: pipeline
def after_failure(pipeline), do: pipeline
end
Implementing the Commanded.Middleware
behaviour, our before_dispatch
function
will be the gatekeeper for continued processing or not of the command. It is here
that we call valid?/1
on the command’s module and pass on the pipeline, if valid,
or halt it if there’s a validation error. Notice how the behaviour also includes
functions to be performed after successful or failed dispatch.
I’m using the Skooma lib for specifying the validation rules. I’ve used Vex in the past, but have found its usage to be a bit inflexible at times. Here’s how the command module looks with the validation rules:
# lib/bank_api/accounts/commands/open_account.ex
defmodule BankAPI.Accounts.Commands.OpenAccount do
@enforce_keys [:account_uuid]
@uuid_regex ~r/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/
defstruct [:account_uuid, :initial_balance]
def valid?(command) do
Skooma.valid?(Map.from_struct(command), schema())
end
defp schema do
%{
account_uuid: [:string, Skooma.Validators.regex(@uuid_regex)],
initial_balance: [:int, &positive_integer(&1)]
}
end
defp positive_integer(data) do
cond do
is_integer(data) ->
if data > 0 do
:ok
else
{:error, "Argument must be bigger than zero"}
end
true ->
{:error, "Argument must be an integer"}
end
end
end
Pretty straight-forward. Adding the middleware to our Router is trivial:
# lib/bank_api/router.ex
defmodule BankAPI.Router do
use Commanded.Commands.Router
alias BankAPI.Accounts.Aggregates.Account
alias BankAPI.Accounts.Commands.OpenAccount
middleware BankAPI.Middleware.ValidateCommand # <---
dispatch([OpenAccount], to: Account, identity: :account_uuid)
end
And finally, we handle the new errors on our fallback controller:
# lib/bank_api_web/controllers/fallback_controller.ex
# ...
def call(conn, {:error, :bad_command}) do
conn
|> put_status(:unprocessable_entity)
|> put_view(BankAPIWeb.ErrorView)
|> render(:"422")
end
def call(conn, {:error, :command_validation_failure, _command, _errors}) do
conn
|> put_status(:unprocessable_entity)
|> put_view(BankAPIWeb.ErrorView)
|> render(:"422")
end
# ...
end
We can do much more with the error messages here, but we’re keeping things simple on this end.
Further testing
We’ve had some tests so far, but we’re missing tests for our context, aggregate and the account creation projector. Before we get to them, we need to lay some groundwork in terms of support files and cases.
We’ll be truncating the database between tests, so we won’t be needing Ecto’s Sandbox. Remove:
pool: Ecto.Adapters.SQL.Sandbox
from the config/test.exs
file. All other case templates should also be purged
of:
:ok = Ecto.Adapters.SQL.Sandbox.checkout(BankAPI.Repo)
unless tags[:async] do
Ecto.Adapters.SQL.Sandbox.mode(BankAPI.Repo, {:shared, self()})
end
and finally removing:
Ecto.Adapters.SQL.Sandbox.mode(BankAPI.Repo, :manual)
from the test/test_helper.exs
file.
We want to use the in-memory event store for our tests, so that we don’t hit the actual event store’s database. For that, let’s add a custom ExUnit case, straight out of Commanded’s wiki:
# test/support/in_memory_event_store_case.ex
defmodule BankAPI.Test.InMemoryEventStoreCase do
use ExUnit.CaseTemplate
using do
quote do
import Commanded.Assertions.EventAssertions
import BankAPI.Test.AggregateUtils
end
end
setup do
on_exit(fn ->
:ok = Application.stop(:bank_api)
:ok = Application.stop(:commanded)
{:ok, _apps} = Application.ensure_all_started(:bank_api)
end)
end
end
We’ll also have a special case for projector tests:
# test/support/projector_case.ex
defmodule BankAPI.ProjectorCase do
use ExUnit.CaseTemplate
using do
quote do
alias BankAPI.Repo
import Ecto
import Ecto.Changeset
import Ecto.Query
import BankAPI.DataCase
import BankAPI.Test.ProjectorUtils
end
end
setup _tags do
:ok = BankAPI.Test.ProjectorUtils.truncate_database()
:ok
end
end
And a couple of utility modules:
# test/support/aggregate_utils.ex
defmodule BankAPI.Test.AggregateUtils do
def evolve(aggregate, events) do
Enum.reduce(
List.wrap(events),
aggregate,
&aggregate.__struct__.apply(&2, &1)
)
end
end
# test/support/projector_utils.ex
defmodule BankAPI.Test.ProjectorUtils do
alias BankAPI.Repo
import Ecto.Query, only: [from: 2]
def truncate_database do
truncate_readstore_tables_sql = """
TRUNCATE TABLE
accounts,
projection_versions
RESTART IDENTITY
CASCADE;
"""
{:ok, _result} = Repo.query(truncate_readstore_tables_sql)
:ok
end
def get_last_seen_event_number(name) do
from(
p in "projection_versions",
where: p.projection_name == ^name,
select: p.last_seen_event_number
)
|> Repo.one() || 0
end
def only_instance_of(module) do
module |> Repo.one()
end
Now, we can add the test for the Accounts context:
# test/bank_api/accounts_test.exs
defmodule BankAPI.Accounts.AccountsTest do
use BankAPI.Test.InMemoryEventStoreCase
alias BankAPI.Accounts
alias BankAPI.Accounts.Projections.Account
test "opens account with valid command" do
params = %{
"initial_balance" => 1_000
}
assert {:ok, %Account{current_balance: 1_000}} = Accounts.open_account(params)
end
test "does not dispatch command with invalid payload" do
params = %{
"initial_whatevs" => 1_000
}
assert {:error, :bad_command} = Accounts.open_account(params)
end
test "returns validation errors from dispatch" do
params1 = %{
"initial_balance" => "1_000"
}
params2 = %{
"initial_balance" => -10
}
params3 = %{
"initial_balance" => 0
}
assert {
:error,
:command_validation_failure,
_cmd,
["Expected INTEGER, got STRING \"1_000\", at initial_balance"]
} = Accounts.open_account(params1)
assert {
:error,
:command_validation_failure,
_cmd,
["Argument must be bigger than zero"]
} = Accounts.open_account(params2)
assert {
:error,
:command_validation_failure,
_cmd,
["Argument must be bigger than zero"]
} = Accounts.open_account(params3)
end
end
Nothing too fancy here, and we cover our public interface except for get_account/1
which is a straight-forward call to Ecto (coverage for that left as an exercise).
Next up, our aggregate’s test:
# test/bank_api/aggregates/account_test.exs
defmodule BankAPI.Aggregates.AgentTest do
use BankAPI.Test.InMemoryEventStoreCase
alias BankAPI.Accounts.Aggregates.Account, as: Aggregate
alias BankAPI.Accounts.Events.AccountOpened
alias BankAPI.Accounts.Commands.OpenAccount
test "ensure agregate gets correct state on creation" do
uuid = UUID.uuid4()
account =
%Aggregate{}
|> evolve(%AccountOpened{
initial_balance: 1_000,
account_uuid: uuid
})
assert account.uuid == uuid
assert account.current_balance == 1_000
end
test "errors out on invalid opening balance" do
invalid_command = %OpenAccount{
initial_balance: -1_000,
account_uuid: UUID.uuid4()
}
assert {:error, :initial_balance_must_be_above_zero} =
Aggregate.execute(%Aggregate{}, invalid_command)
end
test "errors out on already opened account" do
command = %OpenAccount{
initial_balance: 1_000,
account_uuid: UUID.uuid4()
}
assert {:error, :account_already_opened} =
Aggregate.execute(%Aggregate{uuid: UUID.uuid4()}, command)
end
end
Again, pretty much by the numbers. We use our evolve
function to layer a list
of events onto the aggregate and then assert its final state. Not super useful
yet, as we have only one event - we’re sure to get more use out of it once we have
more complex aggregate state being built by more event combinations.
Lastly, our projector’s test:
# test/bank_api/projectors/account_opened_test.exs
defmodule BankAPI.Accounts.Projectors.AccountOpenedTest do
use BankAPI.ProjectorCase
alias BankAPI.Accounts.Projections.Account
alias BankAPI.Accounts.Events.AccountOpened
alias BankAPI.Accounts.Projectors.AccountOpened, as: Projector
test "should succeed with valid data" do
uuid = UUID.uuid4()
account_opened_evt = %AccountOpened{
account_uuid: uuid,
initial_balance: 1_000
}
last_seen_event_number = get_last_seen_event_number("Accounts.Projectors.AccountOpened")
assert :ok =
Projector.handle(
account_opened_evt,
%{event_number: last_seen_event_number + 1}
)
assert only_instance_of(Account).current_balance == 1_000
assert only_instance_of(Account).uuid == uuid
end
end
We only work out the happy path here, as we’re fairly confident events with faulty payloads won’t be emitted by our domain model.
Wrapping Up
So that’s it for this time, a short one to get me back on schedule for next week. If you have any feedback, questions or want to see a particular aspect covered more in-depth, feel free to contact me.
Finally, I’d like to thank Ben Smith, Commanded’s author, for taking the time to give me helpful feedback. Be sure to check out his recent video on Commanded usage and event sourcing in general.
Until next time!
Cover image credit: PIXNIO