Posts in this series

Intro

Transfers between accounts are a necessity for any bank, and ours is no exception. They’re also the perfect vehicle to introduce Process Managers to our toolset.

Every operation we’ve done so far focuses on a single aggregate - but what to do when we need to change more than one aggregate in a single domain interaction? Since we can only route a command to an aggregate, this seems hard to do. Process Managers (or Sagas) are components that behave like event handlers, but instead of changing the read model like Projectors, they emit commands. They also hold state, which allows them to keep track of the progress of their orchestration between aggregates.

Furthermore, this “outside control” of aggregates avoids having any sort of knowledge between aggregates to perform transfers. You can think of the aggregates’ functionality as the building blocks for more complex functionality facilitated by Process Managers.

Implementation

Let’s start outside-in once more. First stop, the Web router:

# lib/bank_api_web/router.ex

# ...
post "/accounts/:id/transfer", AccountController, :transfer

And the new action on our controller:

# lib/bank_api_web/controllers/account_controller.ex

# ...
def transfer(
      conn,
      %{
        "id" => account_id,
        "transfer_amount" => amount,
        "destination_account" => destination_account_id
      }
    ) do
  with :ok <-
         Accounts.transfer(account_id, amount, destination_account_id) do
    conn
    |> send_resp(201, "")
  end
end

Pretty simple so far - our context has a new function to handle this:

# lib/bank_api/accounts.ex

def transfer(source_id, amount, destination_id) do
  %TransferBetweenAccounts{
    account_uuid: source_id,
    transfer_uuid: UUID.uuid4(),
    transfer_amount: amount,
    destination_account_uuid: destination_id
  }
  |> Router.dispatch()
end

As before, our context serves as a gateway to dispatches on our router. The command to kickstart transfers:

# lib/bank_api/accounts/commands/transfer_between_accounts.ex

defmodule BankAPI.Accounts.Commands.TransferBetweenAccounts do
  @enforce_keys [:account_uuid, :transfer_uuid]

  defstruct [:account_uuid, :transfer_uuid, :transfer_amount, :destination_account_uuid]

  alias BankAPI.Repo
  alias BankAPI.Accounts
  alias BankAPI.Accounts.Commands.Validators
  alias BankAPI.Accounts.Projections.Account

  def valid?(command) do
    cmd = Map.from_struct(command)

    with %Account{} <- account_exists?(cmd.destination_account_uuid),
         true <- account_open?(cmd.destination_account_uuid) do
      Skooma.valid?(cmd, schema())
    else
      nil ->
        {:error, ["Destination account does not exist"]}

      false ->
        {:error, ["Destination account closed"]}

      reply ->
        reply
    end
  end

  defp schema do
    %{
      account_uuid: [:string, Skooma.Validators.regex(Accounts.uuid_regex())],
      transfer_uuid: [:string, Skooma.Validators.regex(Accounts.uuid_regex())],
      transfer_amount: [:int, &Validators.positive_integer(&1, 1)],
      destination_account_uuid: [:string, Skooma.Validators.regex(Accounts.uuid_regex())]
    }
  end

  defp account_exists?(uuid) do
    Repo.get(Account, uuid)
  end

  defp account_open?(uuid) do
    account = Repo.get!(Account, uuid)
    account.status == Account.status().open
  end
end

This might be a bit controversial, but we do check the read model to validate this command. We don’t want to allow transfers to closed or non-existing accounts.

Adding this command to our router is pretty simple:

# lib/bank_api/router.ex

defmodule BankAPI.Router do
  use Commanded.Commands.Router

  alias BankAPI.Accounts.Aggregates.Account

  alias BankAPI.Accounts.Commands.{
    OpenAccount,
    CloseAccount,
    DepositIntoAccount,
    WithdrawFromAccount,
    TransferBetweenAccounts # ◀️◀️◀️
  }

  middleware(BankAPI.Middleware.ValidateCommand)

  dispatch(
    [
      OpenAccount,
      CloseAccount,
      DepositIntoAccount,
      WithdrawFromAccount,
      TransferBetweenAccounts # ◀️◀️◀️
    ],
    to: Account,
    identity: :account_uuid
  )
end

Our origin account for the transfer will handle this command:

# lib/bank_api/accounts/aggregates/account.ex

# ...

def execute(
      %Account{
        uuid: account_uuid,
        closed?: true
      },
      %TransferBetweenAccounts{
        account_uuid: account_uuid
      }
    ) do
  {:error, :account_closed}
end

def execute(
      %Account{uuid: account_uuid, closed?: false},
      %TransferBetweenAccounts{
        account_uuid: account_uuid,
        destination_account_uuid: destination_account_uuid
      }
    )
    when account_uuid == destination_account_uuid do
  {:error, :transfer_to_same_account}
end

def execute(
      %Account{
        uuid: account_uuid,
        closed?: false,
        current_balance: current_balance
      },
      %TransferBetweenAccounts{
        account_uuid: account_uuid,
        transfer_amount: transfer_amount
      }
    )
    when current_balance < transfer_amount do
  {:error, :insufficient_funds}
end

def execute(
      %Account{uuid: account_uuid, closed?: false},
      %TransferBetweenAccounts{
        account_uuid: account_uuid,
        transfer_uuid: transfer_uuid,
        transfer_amount: transfer_amount,
        destination_account_uuid: destination_account_uuid
      }
    ) do
  %MoneyTransferRequested{
    transfer_uuid: transfer_uuid,
    source_account_uuid: account_uuid,
    amount: transfer_amount,
    destination_account_uuid: destination_account_uuid
  }
end

def execute(
      %Account{},
      %TransferBetweenAccounts{}
    ) do
  {:error, :not_found}
end

We handle the command as with all previous ones. We match on closed accounts and if the destination account is the same as the source one, we throw an error. We also check for sufficient funds for the transfer. The event we emit here will be the one the process manager will pick up on and start the transfer. For that reason, the aggregate apply/2 function doesn’t change the aggregate’s state for this operation:

# lib/bank_api/accounts/aggregates/account.ex

# ...

def apply(
      %Account{} = account,
      %MoneyTransferRequested{}
    ) do
  account
end

The event itself is pretty simple:

# lib/bank_api/accounts/events/money_transfer_requested.ex

defmodule BankAPI.Accounts.Events.MoneyTransferRequested do
  @derive [Jason.Encoder]

  defstruct [
    :transfer_uuid,
    :source_account_uuid,
    :destination_account_uuid,
    :amount
  ]
end

Which brings us to the process manager at last. You can see that, just like an aggregate, we have a state definition in the form of a struct. We also need to name the manager (name must be unique throughout the codebase) and specify which router to use when we dispatch commands.

# lib/bank_api/accounts/process_managers/transfer_money.ex

defmodule BankAPI.Accounts.ProcessManagers.TransferMoney do
  use Commanded.ProcessManagers.ProcessManager,
    name: "Accounts.ProcessManagers.TransferMoney",
    router: BankAPI.Router

  @derive Jason.Encoder
  defstruct [
    :transfer_uuid,
    :source_account_uuid,
    :destination_account_uuid,
    :amount,
    :status
  ]

  # ...

Then, we specify which events we want to handle with interested?/1 functions. First off, we want to handle the MoneyTransferRequested event, and we return a tuple with start! and a UUID that will serve as an identifier for the Erlang process that will be started for this particular process manager instance. All further interaction with the process manager will need the same UUID to reach the same instance.

# lib/bank_api/accounts/process_managers/transfer_money.ex

# ...

def interested?(%MoneyTransferRequested{transfer_uuid: transfer_uuid}),
  do: {:start!, transfer_uuid}

def interested?(%WithdrawnFromAccount{transfer_uuid: transfer_uuid})
    when is_nil(transfer_uuid),
    do: false

def interested?(%WithdrawnFromAccount{transfer_uuid: transfer_uuid}),
  do: {:continue!, transfer_uuid}

def interested?(%DepositedIntoAccount{transfer_uuid: transfer_uuid})
    when is_nil(transfer_uuid),
    do: false

def interested?(%DepositedIntoAccount{transfer_uuid: transfer_uuid}),
  do: {:stop, transfer_uuid}

def interested?(_event), do: false

We could have also used start without the !, but I’ve added the bang for extra validation that the process doesn’t already exist. More info here.

The list of events we’re interested in also includes WithdrawnFromAccount and DepositedIntoAccount since these are the consequence events from the commands we’ll be dispatching to execute a transfer. This diagram helps clear it up:

Transfer process diagram
Transfer process diagram

Note that we could have used one less “hop” in the flow, and have the event that kickstarts the transfer already withdraw money from the source account, but it’s important to capture intent in your domain. More on this on the next post.

Once we signal as being interested in an event, we need to have handle functions that emit the commands, and apply functions to change the manager’s state:

# lib/bank_api/accounts/process_managers/transfer_money.ex

# ...

def handle(
      %TransferMoney{},
      %MoneyTransferRequested{
        source_account_uuid: source_account_uuid,
        amount: transfer_amount,
        transfer_uuid: transfer_uuid
      }
    ) do
  %WithdrawFromAccount{
    account_uuid: source_account_uuid,
    withdraw_amount: transfer_amount,
    transfer_uuid: transfer_uuid
  }
end

def handle(
      %TransferMoney{transfer_uuid: transfer_uuid} = pm,
      %WithdrawnFromAccount{transfer_uuid: transfer_uuid}
    ) do
  %DepositIntoAccount{
    account_uuid: pm.destination_account_uuid,
    deposit_amount: pm.amount,
    transfer_uuid: pm.transfer_uuid
  }
end

###

def apply(%TransferMoney{} = pm, %MoneyTransferRequested{} = evt) do
  %TransferMoney{
    pm
    | transfer_uuid: evt.transfer_uuid,
      source_account_uuid: evt.source_account_uuid,
      destination_account_uuid: evt.destination_account_uuid,
      amount: evt.amount,
      status: :withdraw_money_from_source_account
  }
end

def apply(%TransferMoney{} = pm, %WithdrawnFromAccount{}) do
  %TransferMoney{
    pm
    | status: :deposit_money_in_destination_account
  }
end

You will notice that the WithdrawFromAccount and DepositIntoAccount commands have been enhanced with a transfer_uuid field. This is to distinguish deposits and withdrawals being done as part of a transfer, or single operations. That is why the interested?/1 hooks for these events match for when this field is missing so as to not handle them. We need to tweak our commands and events appropriately.

The last change we need is to add our new process manager to our supervisor:

# lib/bank_api/accounts/supervisor.ex

defmodule BankAPI.Accounts.Supervisor do
  use Supervisor

  alias BankAPI.Accounts.Projectors
  alias BankAPI.Accounts.ProcessManagers # ◀️◀️◀️

  def start_link do
    Supervisor.start_link(__MODULE__, nil)
  end

  def init(_arg) do
    children = [
      # Projectors
      worker(Projectors.AccountOpened, [], id: :account_opened),
      worker(Projectors.AccountClosed, [], id: :account_closed),
      worker(Projectors.DepositsAndWithdrawals, [], id: :deposits_and_withdrawals),

      # Process managers
      worker(ProcessManagers.TransferMoney, [], id: :transfer_money) # ◀️◀️◀️
    ]

    supervise(children, strategy: :one_for_one)
  end
end

Oh, and add that new error to our fallback controller:

# lib/bank_api_web/controllers/fallback_controller.ex

# ...
def call(conn, {:error, :transfer_to_same_account}) do
  conn
  |> put_status(:unprocessable_entity)
  |> put_view(BankAPIWeb.ErrorView)
  |> assign(:message, "Source and destination accounts are the same")
  |> render(:"422")
end

Wrapping Up

So now we know how to orchestrate complex flows between our carefully designed aggregates. Process managers are a really powerful tool in the Commanded arsenal and they help us breakdown our domain’s operations between low-level, single aggregate ones, and higher-level, composite, intra-aggregate ones.

The code is available online.

And that’s it, we’ve covered most of the basics about Commanded! I hope this series has brought you more insight on event sourcing, DDD concepts, and CQRS as an architectural pattern. I’ll be wrapping up with a final post with a mix of lessons learned from running Commanded projects in production and little bits of extra information next time!

Many thanks once more to Commanded’s author, Ben Smith, for helpful feedback!

Cover image credit: PIXNIO