Posts in this series

Onward

In the previous articles, we went over installing the tools we’ll need for our API project and a small introduction to Event Sourcing, DDD and CQRS and how our library of choice, Commanded, interprets them. We aim to develop a simple Banking API that allows for:

  • account opening and closing
  • deposits and withdrawals
  • transfers between accounts

Way back in part 1 we ran the Phoenix project generator Mix task and got a basic skeleton app ready to go. Let’s now add some dependencies: Commanded, EventStore and the EventStore adapter.

# mix.exs

defp deps do
  [
    # ...
    {:commanded, "~> 0.18"},
    {:eventstore, "~> 0.16.1", runtime: Mix.env() != :test},
    {:commanded_eventstore_adapter, "~> 0.5", runtime: Mix.env() != :test}
  ]
end

We don’t want to use EventStore in all environments, since there’s a faster in-memory event store just for tests. Next up, telling Commanded which event store adapter to use:

# config/config.exs

config :commanded,
  event_store_adapter: Commanded.EventStore.Adapters.EventStore

For development, we want to use EventStore, so the database settings must be present:

# config/dev.exs

config :eventstore,
  column_data_type: "jsonb"

config :eventstore, EventStore.Storage,
  serializer: EventStore.JsonbSerializer,
  types: EventStore.PostgresTypes,
  username: "postgres",
  password: "postgres",
  database: "bank_api_eventstore_dev",
  hostname: "localhost",
  pool_size: 10,
  pool_overflow: 5

You should adjust these to suit your local environment. To create the event store’s database, it’s enough to:

$ mix do event_store.create, event_store.init

Finally, for the test environment:

# config/test.exs

config :commanded,
  event_store_adapter: Commanded.EventStore.Adapters.InMemory

config :commanded, Commanded.EventStore.Adapters.InMemory,
  serializer: Commanded.Serialization.JsonSerializer

When you run mix test now, you should see (after all the initial compilation messages):

Generated bank_api app
..

Finished in 0.03 seconds
2 tests, 0 failures

Randomized with seed 825671

The current tests were auto-generated by the Phoenix project generator task.

Opening bank accounts

Alright, time for our first feature! Our accounts will be quite simple and just have an identifier and a current_balance field on our read model. Events that will be emitted as a result of interactions with our aggregates will be subscribed to by projectors and this balance will be updated over time. Here’s the diagram from the previous post as a reminder:

Commanded entities
Main entities in the Commanded library

For the “opening a bank account” initial feature we will need:

  • a controller action to receive the request (and a route matching to it)
  • a command to be dispatched by this controller action
  • a router to route this command to an aggregate
  • an aggregate that will handle this command, mutate its internal state and emit an event as a result
  • the event emitted by this aggregate
  • a projector to subscribe to this event and create the record on the read model

This might seem like a lot, but with this first feature we’ll get a lot of pieces in place, and subsequent features will be much quicker to implement. Let’s start things off with a test for the API endpoint:

# test/bank_api_web/controllers/account_controller_test.exs

defmodule BankAPIWeb.AccountControllerTest do
  use BankAPIWeb.ConnCase

  @create_attrs %{
    initial_balance: 42_00
  }
  @invalid_attrs %{
    initial_balance: nil
  }

  setup %{conn: conn} do
    {:ok, conn: put_req_header(conn, "accept", "application/json")}
  end

  describe "create account" do
    test "renders account when data is valid", %{conn: conn} do
      conn =
        post(
          conn,
          Routes.account_path(conn, :create),
          account: @create_attrs
        )

      assert %{
               "uuid" => _uuid,
               "current_balance" => 4200
             } = json_response(conn, 201)["data"]
    end

    test "renders errors when data is invalid", %{conn: conn} do
      conn =
        post(
          conn,
          Routes.account_path(conn, :create),
          account: @invalid_attrs
        )

      assert json_response(conn, 422)["errors"] != %{}
    end
  end
end

When we run our tests with mix test we should see:

..

  1) test create account renders account when data is valid (BankAPIWeb.AccountControllerTest)
     test/bank_api_web/controllers/account_controller_test.exs:16
     ** (UndefinedFunctionError) function BankAPIWeb.Router.Helpers.account_path/2 is undefined or private
     code: Routes.account_path(conn, :create),
     stacktrace:
       (bank_api) BankAPIWeb.Router.Helpers.account_path(%Plug.Conn{adapter: {Plug.Adapters.Test.Conn, :...}, assigns: %{}, before_send: [], body_params: %Plug.Conn.Unfetched{aspect: :body_params}, cookies: %Plug.Conn.Unfetched{aspect: :cookies}, halted: false, host: "www.example.com", method: "GET", owner: #PID<0.409.0>, params: %Plug.Conn.Unfetched{aspect: :params}, path_info: [], path_params: %{}, port: 80, private: %{phoenix_recycled: true, plug_skip_csrf_protection: true}, query_params: %Plug.Conn.Unfetched{aspect: :query_params}, query_string: "", remote_ip: {127, 0, 0, 1}, req_cookies: %Plug.Conn.Unfetched{aspect: :cookies}, req_headers: [{"accept", "application/json"}], request_path: "/", resp_body: nil, resp_cookies: %{}, resp_headers: [{"cache-control", "max-age=0, private, must-revalidate"}], scheme: :http, script_name: [], secret_key_base: nil, state: :unset, status: nil}, :create)
       test/bank_api_web/controllers/account_controller_test.exs:20: (test)



  2) test create account renders errors when data is invalid (BankAPIWeb.AccountControllerTest)
     test/bank_api_web/controllers/account_controller_test.exs:30
     ** (UndefinedFunctionError) function BankAPIWeb.Router.Helpers.account_path/2 is undefined or private
     code: Routes.account_path(conn, :create),
     stacktrace:
       (bank_api) BankAPIWeb.Router.Helpers.account_path(%Plug.Conn{adapter: {Plug.Adapters.Test.Conn, :...}, assigns: %{}, before_send: [], body_params: %Plug.Conn.Unfetched{aspect: :body_params}, cookies: %Plug.Conn.Unfetched{aspect: :cookies}, halted: false, host: "www.example.com", method: "GET", owner: #PID<0.411.0>, params: %Plug.Conn.Unfetched{aspect: :params}, path_info: [], path_params: %{}, port: 80, private: %{phoenix_recycled: true, plug_skip_csrf_protection: true}, query_params: %Plug.Conn.Unfetched{aspect: :query_params}, query_string: "", remote_ip: {127, 0, 0, 1}, req_cookies: %Plug.Conn.Unfetched{aspect: :cookies}, req_headers: [{"accept", "application/json"}], request_path: "/", resp_body: nil, resp_cookies: %{}, resp_headers: [{"cache-control", "max-age=0, private, must-revalidate"}], scheme: :http, script_name: [], secret_key_base: nil, state: :unset, status: nil}, :create)
       test/bank_api_web/controllers/account_controller_test.exs:34: (test)



Finished in 0.1 seconds
4 tests, 2 failures

Randomized with seed 469529

Now that we have a failing test, let’s fill in the blanks. First off, we’ll need a matching route so that the request gets picked up by Phoenix:

# lib/bank_api_web/router.ex

defmodule BankAPIWeb.Router do
  use BankAPIWeb, :router

  pipeline :api do
    plug :accepts, ["json"]
  end

  scope "/api", BankAPIWeb do
    pipe_through :api

    resources "/accounts", AccountController, only: [:create] # ◀️◀️◀️
  end
end

And a controller for that route:

# lib/bank_api_web/controllers/account_controller.ex

defmodule BankAPIWeb.AccountController do
  use BankAPIWeb, :controller

  alias BankAPI.Accounts
  alias BankAPI.Accounts.Projections.Account

  action_fallback BankAPIWeb.FallbackController

  def create(conn, %{"account" => account_params}) do
    with {:ok, %Account{} = account} <- Accounts.open_account(account_params) do
      conn
      |> put_status(:created)
      |> render("show.json", account: account)
    end
  end
end
# lib/bank_api_web/controllers/fallback_controller.ex

defmodule BankAPIWeb.FallbackController do
  use BankAPIWeb, :controller

  def call(conn, {:error, :not_found}) do
    conn
    |> put_status(:not_found)
    |> put_view(BankAPIWeb.ErrorView)
    |> render(:"404")
  end

  def call(conn, {:validation_error, _changeset}) do
    conn
    |> put_status(:unprocessable_entity)
    |> put_view(BankAPIWeb.ErrorView)
    |> render(:"422")
  end
end

Pretty standard fare - notice we designate a fallback controller to handle unhappy paths. All of our interaction with accounts is done through the Accounts context:

# lib/bank_api/accounts.ex

defmodule BankAPI.Accounts do
  @moduledoc """
  The Accounts context.
  """

  import Ecto.Query, warn: false

  alias Ecto.Changeset
  alias BankAPI.Repo
  alias BankAPI.Router
  alias BankAPI.Accounts.Commands.OpenAccount
  alias BankAPI.Accounts.Projections.Account

  def get_account(uuid), do: Repo.get!(Account, uuid)

  def open_account(account_params) do
    changeset = account_opening_changeset(account_params)

    if changeset.valid? do
      account_uuid = UUID.uuid4()

      dispatch_result =
        %OpenAccount{
          initial_balance: changeset.changes.initial_balance,
          account_uuid: account_uuid
        }
        |> Router.dispatch()

      case dispatch_result do
        :ok ->
          {
            :ok,
            %Account{
              uuid: account_uuid,
              current_balance: changeset.changes.initial_balance
            }
          }

        reply ->
          reply
      end
    else
      {:validation_error, changeset}
    end
  end

  defp account_opening_changeset(params) do
    {
      params,
      %{initial_balance: :integer}
    }
    |> Changeset.cast(params, [:initial_balance])
    |> Changeset.validate_required([:initial_balance])
    |> Changeset.validate_number(:initial_balance, greater_than: 0)
  end
end

Initially, we’ll just define get_account and open_account as our public interface. Getting an existing account from the read model is trivial given its identifier.

As for opening a new one, we first validate the changeset (done here for now, we will introduce a command validation middleware later on). If the changeset isn’t valid, we return an error tuple that will be handled by our fallback controller. If we do have a valid changeset, an OpenAccount command is built and dispatched. The account identifier present in the command is created here, using a v4 UUID. In case of a successful dispatch, we return a success tuple to the controller with an Account object to render. Notice we do not insert the account on the read model directly, instead relying on the command dispatch triggering a chain of events that will ultimately result in that insertion - routing of the command to an aggregate, which upon successful handling emits an event, that is then subscribed to by a projector, which inserts the new account.

The command is quite simple:

# lib/bank_api/accounts/commands/open_account.ex

defmodule BankAPI.Accounts.Commands.OpenAccount do
  @enforce_keys [:account_uuid]

  defstruct [:account_uuid, :initial_balance]
end

As is our initial router:

# lib/bank_api/router.ex

defmodule BankAPI.Router do
  use Commanded.Commands.Router

  alias BankAPI.Accounts.Aggregates.Account
  alias BankAPI.Accounts.Commands.OpenAccount

  dispatch([OpenAccount], to: Account, identity: :account_uuid)
end

Once built by open_account/1 in our context and dispatched through this router, the command is handled by the Account aggregate.

# lib/bank_api/accounts/aggregates/account.ex

defmodule BankAPI.Accounts.Aggregates.Account do
  defstruct uuid: nil,
            current_balance: nil

  alias __MODULE__
  alias BankAPI.Accounts.Commands.OpenAccount
  alias BankAPI.Accounts.Events.AccountOpened

  def execute(
        %Account{uuid: nil},
        %OpenAccount{
          account_uuid: account_uuid,
          initial_balance: initial_balance
        }
      )
      when initial_balance > 0 do
    %AccountOpened{
      account_uuid: account_uuid,
      initial_balance: initial_balance
    }
  end

  def execute(
        %Account{uuid: nil},
        %OpenAccount{
          initial_balance: initial_balance
        }
      )
      when initial_balance <= 0 do
    {:error, :initial_balance_must_be_above_zero}
  end

  def execute(%Account{}, %OpenAccount{}) do
    {:error, :account_already_opened}
  end

  # state mutators

  def apply(
        %Account{} = account,
        %AccountOpened{
          account_uuid: account_uuid,
          initial_balance: initial_balance
        }
      ) do
    %Account{
      account
      | uuid: account_uuid,
        current_balance: initial_balance
    }
  end
end

One of our execute/2 functions matches an aggregate with no UUID (an account not yet open) and a given command with an initial_balance argument over 0 and emits an AccountOpened event as a result. The apply/2 method applies this event and changes the aggregate’s internal state to match its payload.

The event is similar to the command in its simplicity:

# lib/bank_api/accounts/events/account_opened.ex

defmodule BankAPI.Accounts.Events.AccountOpened do
  @derive [Jason.Encoder]

  defstruct [
    :account_uuid,
    :initial_balance
  ]
end

Remember the convention: commands in the imperative and events in the past tense. We see here why the past tense makes so much sense as the event signifies a change that has already taken place on an aggregate.

So now we have a flow: route -> controller -> context -> router -> aggregate. The aggregates are being created, their internal state consistent with the payload of the initial POST request, and the AccountOpened event is being emitted, but we have yet to reflect that on our read model by creating new records on the database.

For that, we’ll use commanded_ecto_projections, a library by the same author as Commanded that makes it easy to handle events and update read models. You’re free to handle these events without this library, and the docs will show you how. We start by adding the new dependency:

# mix.exs

defp deps do
    [
      # ...
      {:commanded_ecto_projections, "~> 0.8"}
    ]
  end

Then configuring the default repo to use for the projectors:

# config/config.exs

config :commanded_ecto_projections,
  repo: BankAPI.Repo

Let’s create our bank account projection module. This will encode the accounts’ schema on the read model’s database:

# lib/bank_api/accounts/projections/account.ex

defmodule BankAPI.Accounts.Projections.Account do
  use Ecto.Schema

  @primary_key {:uuid, :binary_id, autogenerate: false}

  schema "accounts" do
    field :current_balance, :integer

    timestamps()
  end
end

Notice we don’t want an auto-generated key since we inject a UUID in our command’s payload for that purpose. The migration for the accounts table comes next:

$ mix ecto.gen.migration create_accounts
defmodule BankAPI.Repo.Migrations.CreateAccounts do
  use Ecto.Migration

  def change do
    create table(:accounts, primary_key: false) do
      add :uuid, :uuid, primary_key: true
      add :current_balance, :integer

      timestamps()
    end
  end
end

As per the lib’s README, we also need to create a migration for projectors to keep track of the last event they’ve processed and not reprocess previously handled events. Generate it with:

$ mix ecto.gen.migration create_projection_versions

And change the generated file to:

defmodule BankAPI.Repo.Migrations.CreateProjectionVersions do
  use Ecto.Migration

  def change do
    create table(:projection_versions, primary_key: false) do
      add :projection_name, :text, primary_key: true
      add :last_seen_event_number, :bigint

      timestamps()
    end
  end
end

Time to apply the migrations:

$ mix ecto.migrate

OK, on to the projector to create the accounts as a result of the aggregate’s emission of the AccountOpened event. The projections library makes this easy:

# lib/bank_api/accounts/projectors/account_opened.ex

defmodule BankAPI.Accounts.Projectors.AccountOpened do
  use Commanded.Projections.Ecto,
    name: "Accounts.Projectors.AccountOpened"

  alias BankAPI.Accounts.Events.AccountOpened
  alias BankAPI.Accounts.Projections.Account

  project(%AccountOpened{} = evt, _metadata, fn multi ->
    Ecto.Multi.insert(multi, :account_opened, %Account{
      uuid: evt.account_uuid,
      current_balance: evt.initial_balance
    })
  end)
end

The project macro facilitated by the library receives the event to handle, some metadata (that we won’t be using), and a multi1 object into which we can add operations. In this case, we insert a new Account using the event’s payload as attributes.

We’re almost done. For the projector to be always listening to events, we need to supervise it. Start by changing the application.ex file to:

# lib/bank_api/application.ex

defmodule BankAPI.Application do
  @moduledoc false

  use Application

  def start(_type, _args) do
    import Supervisor.Spec

    children = [
      supervisor(BankAPI.Repo, []),
      supervisor(BankAPIWeb.Endpoint, []),
      supervisor(BankAPI.Accounts.Supervisor, [])
    ]

    opts = [strategy: :one_for_one, name: BankAPI.Supervisor]
    Supervisor.start_link(children, opts)
  end

  def config_change(changed, _new, removed) do
    BankAPIWeb.Endpoint.config_change(changed, removed)
    :ok
  end
end

Finally let’s start up the projector on our context’s supervisor:

# lib/bank_api/accounts/supervisor.ex

defmodule BankAPI.Accounts.Supervisor do
  use Supervisor

  alias BankAPI.Accounts

  def start_link do
    Supervisor.start_link(__MODULE__, nil)
  end

  def init(_arg) do
    children = [
      worker(Accounts.Projectors.AccountOpened, [], id: :account_opened)
    ]

    supervise(children, strategy: :one_for_one)
  end
end

The final, final piece is the view used to render Account objects:

# lib/bank_api_web/views/account_view.ex

defmodule BankAPIWeb.AccountView do
  use BankAPIWeb, :view
  alias BankAPIWeb.AccountView

  def render("show.json", %{account: account}) do
    %{data: render_one(account, AccountView, "account.json")}
  end

  def render("account.json", %{account: account}) do
    %{
      uuid: account.uuid,
      current_balance: account.current_balance
    }
  end
end

If you restart Phoenix’s server now, you should see something similar to this in your console:

[debug] Attempting to start Postgrex
[debug] Successfully started Postgrex (#PID<0.297.0>)
[debug] Attempting to start Postgrex.Notifications
[debug] Successfully started Postgrex.Notifications (#PID<0.308.0>)
[debug] Attempting to start Postgrex
[debug] Successfully started Postgrex (#PID<0.311.0>)
[info] Running BankAPIWeb.Endpoint with cowboy 2.6.1 at 0.0.0.0:4000 (http)
[info] Access BankAPIWeb.Endpoint at http://localhost:4000
[debug] Subscription "Accounts.Projectors.AccountOpened"@"$all" attempting to connect subscriber #PID<0.551.0>
[debug] Subscription "Accounts.Projectors.AccountOpened"@"$all" catching-up
[debug] BankAPI.Accounts.Projectors.AccountOpened has successfully subscribed to event store

And our test for the controller should now be passing:

....

Finished in 0.1 seconds
4 tests, 0 failures

Randomized with seed 460453

POSTing to http://localhost:4000/api/accounts with a JSON payload like:

{
  "account": {
    "initial_balance": 1000
  }
}

Should now yield a 201 with a response body similar to:

{
  "data": {
    "current_balance": 1000,
    "uuid": "186075de-87e2-450a-b00e-2e11f4186303"
  }
}

We made it! ☺️

A note on consistency

You might have noticed that in the Accounts context, we build an object to reply with as soon as we got a successful dispatch.

  # ...

  def open_account(account_params) do
    changeset = account_opening_changeset(account_params)

    if changeset.valid? do
      account_uuid = UUID.uuid4()

      dispatch_result =
        %OpenAccount{
          initial_balance: changeset.changes.initial_balance,
          account_uuid: account_uuid
        }
        |> Router.dispatch()

      case dispatch_result do
        :ok ->
          {
            :ok,
            %Account{  # ◀️ ◀️ ◀️ HERE
              uuid: account_uuid,
              current_balance: changeset.changes.initial_balance
            }
          }

        reply ->
          reply
      end
    else
      {:validation_error, changeset}
    end
  end

  # ...

We do this because we’re confident a successful dispatch will create the account. If you instead try to get the account by its identifier after the dispatch:

  case dispatch_result do
    :ok ->
      {
        :ok,
        get_account(account_uuid)
      }

    reply ->
      reply
  end

you’ll see something like this:

Commanded 404 error exception
404 trying to get the account

Why is this the case? Basically, the call to get_account is being made before the projector picks up the event and materializes it on the database. The projector is quick, but not instantaneous. We say in this case that the system behaves in an eventually consistent manner, because the account will be present at some time in the (hopefully immediate) future.

There are two ways around this. We can either return an “optimistic” view of the system’s state like we originally did, or force the dispatch to be “strongly consistent”, as opposed to Commanded’s default stance of “eventual consistent”. When we operate with strong consistency, the dispatch call will block until all event handlers have run before returning.

We can set the default consistency to strong throughout the whole codebase:

# config/config.exs

config :commanded, default_consistency: :strong

Or explicitly set the consistency on a per-dispatch case. In ours, it’d be:

  # ...

  dispatch_result =
    %OpenAccount{
      initial_balance: changeset.changes.initial_balance,
      account_uuid: account_uuid
    }
    |> Router.dispatch(consistency: :strong) # ◀️◀️◀️

  # ...

But this will only work if the projector is also set to strongly consistent, which is easy enough to do:

# lib/bank_api/accounts/projectors/account_opened.ex

defmodule BankAPI.Accounts.Projectors.AccountOpened do
  use Commanded.Projections.Ecto,
    name: "Accounts.Projectors.AccountOpened",
    consistency: :strong # ◀️◀️◀️

  # ...

With these changes in place, we can ditch the optimistic return and fetch the account from the database, mindful that the endpoint will take longer to reply since the router dispatch will block until the projector concludes its handling. In my opinion, setting the consistency for the whole codebase is a bit limiting as different operations on your system will probably require different guarantees. Commanded’s default with an occasional strong override is the best approach. The documentation has a more detailed explanation about this subject.

Wrapping Up

I hope the avalanche of code in the article was bearable. I’m sure you’ll agree it’s all very easy to follow and logical. Some might find the structure overbearing, but I find it liberating in a sense, since I immediately know where every step of the flow of interaction is located.

I skipped some tests, namely for our aggregate (using the in-memory event store), the context’s validation logic and our projector but this post is already quite long. Next time we’ll add those tests, come up with a better approach to validation using middleware, and work on our second feature.

Finally, I’d like to thank Ben Smith, Commanded’s author, for taking the time to give me helpful feedback.

Until next time!

Cover image credit: PIXNIO


  1. For more info on Ecto.Multi, check this article out. [return]