Posts in this series

Way back in Part 3 we committed to the features we’d like to provide. This post was supposed to just add the missing features before transfers, and introduce process managers then, but it kinda ballooned out of control in terms of size, so next time we’ll tackle transfers between accounts in their own post.

Account closing

Closing an account at our bank is desirable, but past events pertaining to that account will remain on our event store. Now that the GDPR1 is a thing, this poses a problem but some ways around it exist (we’ll explore them in part 7). We’ll pretend it’s pre-GDPR days and sidestep this issue for the sake of keeping things simple. Our Account aggregate will have a closed? boolean field added, and closing an account just flips it.

# lib/bank_api/accounts/aggregates/account.ex
defmodule BankAPI.Accounts.Aggregates.Account do

  defstruct uuid: nil,
            current_balance: nil,
            closed?: false

  # ...

Let’s proceed outside-in. A new route on our Web interface:

# lib/bank_api_web/router.ex

resources "/accounts", AccountController, only: [:create, :delete, :show]

We’ll add delete but also show to just be able to see the status of our accounts. Next, our controller:

# lib/bank_api_web/controllers/account_controller.ex

# ...

def delete(conn, %{"id" => account_id}) do
  with :ok <- Accounts.close_account(account_id) do
    conn
    |> send_resp(200, "")
  end
end

def show(conn, %{"id" => account_id}) do
  with {:ok, %Account{} = account} <- Accounts.get_account(account_id) do
    conn
    |> put_status(:ok)
    |> render("show.json", account: account)
  end
end

Deletions will just send a 200 OK response with no body, or an error if they fail.

Let’s add the command and event to close an account, as they’ll be mentioned in the Accounts context operations:

# lib/bank_api/accounts/commands/close_account.ex

defmodule BankAPI.Accounts.Commands.CloseAccount do
  @enforce_keys [:account_uuid]

  defstruct [:account_uuid]

  alias BankAPI.Accounts

  def valid?(command) do
    Skooma.valid?(Map.from_struct(command), schema())
  end

  defp schema do
    %{
      account_uuid: [:string, Skooma.Validators.regex(Accounts.uuid_regex())]
    }
  end
end

Notice we’ve moved the uuid_regex from our OpenAccount command to the Accounts context since we have it in more than one command now. Small refactor for the OpenAccount command:

# lib/bank_api/accounts/commands/open_account.ex

defmodule BankAPI.Accounts.Commands.OpenAccount do
  @enforce_keys [:account_uuid]

  defstruct [:account_uuid, :initial_balance]

  alias BankAPI.Accounts
  alias BankAPI.Accounts.Commands.Validators

  def valid?(command) do
    Skooma.valid?(Map.from_struct(command), schema())
  end

  defp schema do
    %{
      account_uuid: [:string, Skooma.Validators.regex(Accounts.uuid_regex())],
      initial_balance: [:int, &Validators.positive_integer(&1)]
    }
  end
end

The new Validators module:

#  lib/bank_api/accounts/commands/validators.ex

defmodule BankAPI.Accounts.Commands.Validators do
  def positive_integer(data, minimum \\ 0) do
    if is_integer(data) do
      if data > minimum do
        :ok
      else
        {:error, "Argument must be bigger than #{minimum}"}
      end
    else
      {:error, "Argument must be an integer"}
    end
  end
end

This change breaks our tests, so change all expectactions of ["Argument must be bigger than zero"] to ["Argument must be bigger than 0"] in test/bank_api/accounts_test.exs.

Finally, the event emitted by the aggregate for this command:

# lib/bank_api/accounts/events/account_closed.ex

defmodule BankAPI.Accounts.Events.AccountClosed do
  @derive [Jason.Encoder]

  defstruct [
    :account_uuid
  ]
end

Our context with the new operations:

# lib/bank_api/accounts.ex

alias BankAPI.Accounts.Commands.{OpenAccount, CloseAccount}

def uuid_regex do
  ~r/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/
end

def get_account(id) do
  case Repo.get(Account, id) do
    %Account{} = account ->
      {:ok, account}

    _reply ->
      {:error, :not_found}
  end
end

def close_account(id) do
  %CloseAccount{
    account_uuid: id
  }
  |> Router.dispatch()
end

def open_account(%{"initial_balance" => initial_balance}) do
  account_uuid = UUID.uuid4()

  dispatch_result =
    %OpenAccount{
      initial_balance: initial_balance,
      account_uuid: account_uuid
    }
    |> Router.dispatch()

  case dispatch_result do
    :ok ->
      {
        :ok,
        %Account{
          uuid: account_uuid,
          current_balance: initial_balance,
          status: Account.status().open # ◀️◀️◀️
        }
      }

    reply ->
      reply
  end
end

# ...

Our get_account is a query, so no command is dispatched for it - just a normal Ecto lookup. For close_account we return the dispatch result, either :ok or an error handled by our FallbackController. Noticed that for open_account I’ve also added the status field to our initial optimistic return that doesn’t wait for the read-model projector to run (status field coming up on our projection changes).

Let’s add this new command to our router:

# lib/bank_api/router.ex

# ...

alias BankAPI.Accounts.Commands.{OpenAccount, CloseAccount}

dispatch([OpenAccount, CloseAccount],
  to: Account,
  identity: :account_uuid
)

Our Account aggregate now needs to handle this new command being dispatched to it:

# lib/bank_api/accounts/aggregates/account.ex

# ...

alias BankAPI.Accounts.Commands.{OpenAccount, CloseAccount}
alias BankAPI.Accounts.Events.{AccountOpened, AccountClosed}

def execute(
      %Account{uuid: account_uuid, closed?: true},
      %CloseAccount{
        account_uuid: account_uuid
      }
    ) do
  {:error, :account_already_closed}
end

def execute(
      %Account{uuid: account_uuid, closed?: false},
      %CloseAccount{
        account_uuid: account_uuid
      }
    ) do
  %AccountClosed{
    account_uuid: account_uuid
  }
end

def execute(
      %Account{},
      %CloseAccount{}
    ) do
  {:error, :not_found}
end

def apply(
      %Account{uuid: account_uuid} = account,
      %AccountClosed{
        account_uuid: account_uuid
      }
    ) do
  %Account{
    account
    | closed?: true
  }
end

# ...

Pretty simple - we first match on an account that is already closed, and return an error (can’t closed a closed account). The other error is not being able to find an account. Otherwise, we emit the relevant event.

Our FallbackController needs to handle this new error:

# lib/bank_api_web/controllers/fallback_controller.ex

def call(conn, {:error, :account_already_closed}) do
  conn
  |> put_status(:unprocessable_entity)
  |> put_view(BankAPIWeb.ErrorView)
  |> render(:"422")
end

At this point, the command can be handled by our Web layer, routed to the aggregate and the corresponding process will have its state changed. That leaves just the read-model to be updated. Let’s add a status field to our accounts:

# priv/repo/migrations/20190525105154_add_status_to_accounts.exs

defmodule BankAPI.Repo.Migrations.AddStatusToAccounts do
  use Ecto.Migration

  def change do
    alter table(:accounts) do
      add :status, :text
    end
  end
end

With this change on the database, we can write a projector that will flip the field:

# lib/bank_api/accounts/projectors/account_closed.ex

defmodule BankAPI.Accounts.Projectors.AccountClosed do
  use Commanded.Projections.Ecto,
    name: "Accounts.Projectors.AccountClosed"

  alias BankAPI.Accounts
  alias BankAPI.Accounts.Events.AccountClosed
  alias BankAPI.Accounts.Projections.Account
  alias Ecto.{Changeset, Multi}

  project(%AccountClosed{} = evt, _metadata, fn multi ->
    with {:ok, %Account{} = account} <- Accounts.get_account(evt.account_uuid) do
      Multi.update(
        multi,
        :account,
        Changeset.change(account, status: Account.status().closed)
      )
    else
      # ignore when this happens
      _ -> multi
    end
  end)
end

We first lookup the task, then we update it. If the task doesn’t exist, I’m just ignoring it but proper error handling should be in place if this is important to your use case. I’m not a big fan of just using strings to update fields, so the status is retrieved from the Account projection:

# lib/bank_api/accounts/projections/account.ex

defmodule BankAPI.Accounts.Projections.Account do
  use Ecto.Schema

  @primary_key {:uuid, :binary_id, autogenerate: false}

  schema "accounts" do
    field :current_balance, :integer
    field :status, :string

    timestamps()
  end

  def status do
    %{
      open: "open",
      closed: "closed"
    }
  end
end

One final touch is to have the AccountOpened projector also populate the status field:

# lib/bank_api/accounts/projectors/account_opened.ex

# ...
 project(%AccountOpened{} = evt, _metadata, fn multi ->
   Ecto.Multi.insert(multi, :account_opened, %Account{
     uuid: evt.account_uuid,
      current_balance: evt.initial_balance,
      status: Account.status().open # ◀️◀️◀️
   })
 end)

Finally for this feature, let’s make sure the supervisor worker is running with our app:

# lib/bank_api/accounts/supervisor.ex

defmodule BankAPI.Accounts.Supervisor do
  use Supervisor

  alias BankAPI.Accounts

  def start_link do
    Supervisor.start_link(__MODULE__, nil)
  end

  def init(_arg) do
    children = [
      worker(Accounts.Projectors.AccountOpened, [], id: :account_opened),
      worker(Accounts.Projectors.AccountClosed, [], id: :account_closed) # ◀️◀️◀️
    ]

    supervise(children, strategy: :one_for_one)
  end
end

With these pieces in place, we’re done with account closure. Let’s move on to deposits and withdrawals.

Deposits and withdrawals

As with before, let’s start with the Web routes:

#  lib/bank_api_web/router.ex
post "/accounts/:id/deposit", AccountController, :deposit
post "/accounts/:id/withdraw", AccountController, :withdraw

And our controller functions:

# lib/bank_api_web/controllers/account_controller.ex

def deposit(conn, %{"id" => account_id, "deposit_amount" => amount}) do
  with {:ok, %Account{} = account} <- Accounts.deposit(account_id, amount) do
    conn
    |> put_status(:ok)
    |> render("show.json", account: account)
  end
end

def withdraw(conn, %{"id" => account_id, "withdrawal_amount" => amount}) do
  with {:ok, %Account{} = account} <- Accounts.withdraw(account_id, amount) do
    conn
    |> put_status(:ok)
    |> render("show.json", account: account)
  end
end

Moving on to our context:

alias BankAPI.Accounts.Commands.{
  OpenAccount,
  CloseAccount,
  DepositIntoAccount,
  WithdrawFromAccount
}

def deposit(id, amount) do
  dispatch_result =
    %DepositIntoAccount{
      account_uuid: id,
      deposit_amount: amount
    }
    |> Router.dispatch(consistency: :strong)

  case dispatch_result do
    :ok ->
      {
        :ok,
        Repo.get!(Account, id)
      }

    reply ->
      reply
  end
end

def withdraw(id, amount) do
  dispatch_result =
    %WithdrawFromAccount{
      account_uuid: id,
      withdraw_amount: amount
    }
    |> Router.dispatch(consistency: :strong)

  case dispatch_result do
    :ok ->
      {
        :ok,
        Repo.get!(Account, id)
      }

    reply ->
      reply
  end
end

We probably want strong consistency here, since these operations are important to an account. Thus, we dispatch these new commands with a strong guarantee which makes Commanded wait until all effects from the dispatch are resolved. Using consistency: [BankAPI.Accounts.Projectors.DepositsAndWithdrawals] is also an option here, to specify on which event handler you’re waiting for. This cuts down on latency by involving fewer handlers at the cost of some increased coupling.

Here are the commands:

# lib/bank_api/accounts/commands/deposit_into_account.ex

defmodule BankAPI.Accounts.Commands.DepositIntoAccount do
  @enforce_keys [:account_uuid]

  defstruct [:account_uuid, :deposit_amount]

  alias BankAPI.Accounts
  alias BankAPI.Accounts.Commands.Validators

  def valid?(command) do
    Skooma.valid?(Map.from_struct(command), schema())
  end

  defp schema do
    %{
      account_uuid: [:string, Skooma.Validators.regex(Accounts.uuid_regex())],
      deposit_amount: [:int, &Validators.positive_integer(&1, 1)]
    }
  end
end

# lib/bank_api/accounts/commands/withdraw_from_account.ex

defmodule BankAPI.Accounts.Commands.WithdrawFromAccount do
  @enforce_keys [:account_uuid]

  defstruct [:account_uuid, :withdraw_amount]

  alias BankAPI.Accounts
  alias BankAPI.Accounts.Commands.Validators

  def valid?(command) do
    Skooma.valid?(Map.from_struct(command), schema())
  end

  defp schema do
    %{
      account_uuid: [:string, Skooma.Validators.regex(Accounts.uuid_regex())],
      withdraw_amount: [:int, &Validators.positive_integer(&1, 1)]
    }
  end
end

And the events emitted after their successful handling by the aggregate:

# lib/bank_api/accounts/events/deposited_into_account.ex

defmodule BankAPI.Accounts.Events.DepositedIntoAccount do
  @derive [Jason.Encoder]

  defstruct [
    :account_uuid,
    :new_current_balance
  ]
end

# lib/bank_api/accounts/events/withdrawn_from_account.ex

defmodule BankAPI.Accounts.Events.WithdrawnFromAccount do
  @derive [Jason.Encoder]

  defstruct [
    :account_uuid,
    :new_current_balance
  ]
end

The new commands must be added to be router:

# lib/bank_api/router.ex

# ...

alias BankAPI.Accounts.Commands.{
  OpenAccount,
  CloseAccount,
  DepositIntoAccount,
  WithdrawFromAccount
}

dispatch([OpenAccount, CloseAccount, DepositIntoAccount, WithdrawFromAccount],
  to: Account,
  identity: :account_uuid
)

# ...

And the aggregate needs to handle them:

# lib/bank_api/accounts/aggregates/account.ex

# ...

alias BankAPI.Accounts.Commands.{
  OpenAccount,
  CloseAccount,
  DepositIntoAccount,
  WithdrawFromAccount
}

alias BankAPI.Accounts.Events.{
  AccountOpened,
  AccountClosed,
  DepositedIntoAccount,
  WithdrawnFromAccount
}

def execute(
      %Account{uuid: account_uuid, closed?: false, current_balance: current_balance},
      %DepositIntoAccount{account_uuid: account_uuid, deposit_amount: amount}
    ) do
  %DepositedIntoAccount{
    account_uuid: account_uuid,
    new_current_balance: current_balance + amount
  }
end

def execute(
      %Account{uuid: account_uuid, closed?: true},
      %DepositIntoAccount{account_uuid: account_uuid}
    ) do
  {:error, :account_closed}
end

def execute(
      %Account{},
      %DepositIntoAccount{}
    ) do
  {:error, :not_found}
end

def execute(
      %Account{uuid: account_uuid, closed?: false, current_balance: current_balance},
      %WithdrawFromAccount{account_uuid: account_uuid, withdraw_amount: amount}
    ) do
  if current_balance - amount > 0 do
    %WithdrawnFromAccount{
      account_uuid: account_uuid,
      new_current_balance: current_balance - amount
    }
  else
    {:error, :insufficient_funds}
  end
end

def execute(
      %Account{uuid: account_uuid, closed?: true},
      %WithdrawFromAccount{account_uuid: account_uuid}
    ) do
  {:error, :account_closed}
end

def execute(
      %Account{},
      %WithdrawFromAccount{}
    ) do
  {:error, :not_found}
end

def apply(
      %Account{
        uuid: account_uuid,
        current_balance: _current_balance
      } = account,
      %DepositedIntoAccount{
        account_uuid: account_uuid,
        new_current_balance: new_current_balance
      }
    ) do
  %Account{
    account
    | current_balance: new_current_balance
  }
end

def apply(
      %Account{
        uuid: account_uuid,
        current_balance: _current_balance
      } = account,
      %WithdrawnFromAccount{
        account_uuid: account_uuid,
        new_current_balance: new_current_balance
      }
    ) do
  %Account{
    account
    | current_balance: new_current_balance
  }
end

The logic is very similar for withdrawals and deposits. We match against closed accounts first, and then in the case of withdrawals we also check if the current balance is enough before we emit the event. Notice that the event doesn’t have the amount of the deposit or withdrawal, but just the new balance of the aggregate. This makes our lives easier when projecting these events onto the read-model.

Since the projection logic is so simple for these two events (just set the balance to the one in the received event), we can have one projector for both:

# lib/bank_api/accounts/projectors/deposits_and_withdrawals.ex

defmodule BankAPI.Accounts.Projectors.DepositsAndWithdrawals do
  use Commanded.Projections.Ecto,
    name: "Accounts.Projectors.DepositsAndWithdrawals",
    consistency: :strong

  alias BankAPI.Accounts
  alias BankAPI.Accounts.Events.{DepositedIntoAccount, WithdrawnFromAccount}
  alias BankAPI.Accounts.Projections.Account
  alias Ecto.{Changeset, Multi}

  project(%DepositedIntoAccount{} = evt, _metadata, fn multi ->
    with {:ok, %Account{} = account} <- Accounts.get_account(evt.account_uuid) do
      Multi.update(
        multi,
        :account,
        Changeset.change(
          account,
          current_balance: evt.new_current_balance
        )
      )
    else
      # ignore when this happens
      _ -> multi
    end
  end)

  project(%WithdrawnFromAccount{} = evt, _metadata, fn multi ->
    with {:ok, %Account{} = account} <- Accounts.get_account(evt.account_uuid) do
      Multi.update(
        multi,
        :account,
        Changeset.change(
          account,
          current_balance: evt.new_current_balance
        )
      )
    else
      # ignore when this happens
      _ -> multi
    end
  end)
end

Which needs to be supervised:

# lib/bank_api/accounts/supervisor.ex

defmodule BankAPI.Accounts.Supervisor do
  use Supervisor

  alias BankAPI.Accounts.Projectors

  def start_link do
    Supervisor.start_link(__MODULE__, nil)
  end

  def init(_arg) do
    children = [
      worker(Projectors.AccountOpened, [], id: :account_opened),
      worker(Projectors.AccountClosed, [], id: :account_closed),
      worker(Projectors.DepositsAndWithdrawals, [], id: :deposits_and_withdrawals)
    ]

    supervise(children, strategy: :one_for_one)
  end
end

Our FallbackController needs to cover more cases now, so let’s add some messages to it:

# lib/bank_api_web/controllers/fallback_controller.ex

defmodule BankAPIWeb.FallbackController do
  use BankAPIWeb, :controller

  def call(conn, {:error, :not_found}) do
    conn
    |> put_status(:not_found)
    |> put_view(BankAPIWeb.ErrorView)
    |> render(:"404")
  end

  def call(conn, {:error, :account_already_closed}) do
    conn
    |> put_status(:unprocessable_entity)
    |> put_view(BankAPIWeb.ErrorView)
    |> assign(:message, "Account already closed")
    |> render(:"422")
  end

  def call(conn, {:error, :account_closed}) do
    conn
    |> put_status(:unprocessable_entity)
    |> put_view(BankAPIWeb.ErrorView)
    |> assign(:message, "Account closed")
    |> render(:"422")
  end

  def call(conn, {:error, :bad_command}) do
    conn
    |> put_status(:unprocessable_entity)
    |> put_view(BankAPIWeb.ErrorView)
    |> assign(:message, "Bad command")
    |> render(:"422")
  end

  def call(conn, {:error, :insufficient_funds}) do
    conn
    |> put_status(:unprocessable_entity)
    |> put_view(BankAPIWeb.ErrorView)
    |> assign(:message, "Insufficient funds to process order")
    |> render(:"422")
  end

  def call(conn, {:error, :command_validation_failure, _command, _errors}) do
    conn
    |> put_status(:unprocessable_entity)
    |> put_view(BankAPIWeb.ErrorView)
    |> assign(:message, "Command validation error")
    |> render(:"422")
  end
end

In order to show that message, let’s tweak our ErrorView:

# lib/bank_api_web/views/error_view.ex

defmodule BankAPIWeb.ErrorView do
  use BankAPIWeb, :view

  def template_not_found(template, _assigns) do
    %{errors: %{detail: Phoenix.Controller.status_message_from_template(template)}}
  end

  def render("422.json", assigns) do
    %{
      errors: %{
        message: assigns[:message] || "Unprocessable entity"
      }
    }
  end
end

And while we’re at it, only return the balance of open accounts on the account view:

# lib/bank_api_web/views/account_view.ex

defmodule BankAPIWeb.AccountView do
  use BankAPIWeb, :view
  alias BankAPIWeb.AccountView

  alias BankAPI.Accounts.Projections.Account

  def render("show.json", %{account: account}) do
    %{data: render_one(account, AccountView, "account.json")}
  end

  def render("account.json", %{account: account}) do
    if account.status == Account.status().closed do
      %{
        uuid: account.uuid,
        current_balance: account.current_balance
      }
    else
      %{
        uuid: account.uuid,
        current_balance: account.current_balance,
        status: account.status
      }
    end
  end
end

Wrapping Up

I hope the torrent of code was easy to follow. We’ve covered all of this before, but it was important to set the stage for account transfers next time. I’ve skipped over the tests to keep the post from being even lengthier, but I might tackle them next time if any of them has some interesting patterns we can explore. Next time: operations on more than one aggregate with Process Managers.

See you there! And a note of thanks to Ben Smith, Commanded’s author, for helpful feedback on the post’s content.

Cover image credit: PIXNIO


  1. More info on the GDPR here [return]