CQRS and Event Sourcing using Rust

The cqrs-es crate provides a lightweight framework for building applications utilizing CQRS and event sourcing. The project targets serverless architectures but can be used in any application seeking to utilize these patterns to build better software.

Chapter 1 provides an introduction to CQRS and event sourcing as well as the underlying design strategy that they support, Domain Driven Design. This chapter is essential reading for anyone not familiar with these topics.

Chapter 2 provides a tutorial for getting started with the framework. This involves building an aggregate, commands, events and the associated trait implementations for a basic CQRS system. Our demo application will simulate a simple bank account

Chapter 3 covers the basics of implementing domain tests. The ability to simply build supple tests are one of the primary benefits of a CQRS system, and here we explain how to effectively build these.

Chapter 4 covers building a simple query processor as well as putting all of these components together in a proper CQRS framework. We will use a naive, in-memory event store to facilitate this mini-application that will allow us to explore CQRS with simple unit tests.

Chapter 5 discusses the additional features needed to work this framework into a real application.

Chapter 6 branches out to some of the more advanced topics permitted by these patterns, including event replay and upcasters.

The patterns

Command-Query Responsibility Segregation (CQRS) and event sourcing are patterns that enable many of the concepts behind Domain Driven Design. All of these tools are designed to provide a great deal of flexibility for applications that have complex or rapidly changing business rules.

By separating the business rules from the technical aspects of an application we remove many of the inherent barriers to software changes that exist in standard applications. Any application with complex or rapidly change rules might be a good candidate for using CQRS and event sourcing.

A note on terminology

Though CQRS and event sourcing can be used for a range of software problems they are primarily applied to build business applications since those so often require that quality.

I'll use the term "business rules" whenever I'm specifically discussing these complex or changing rulesets. If your application is not a business application, just replace "business rules" with something more appropriate to your domain.

Domain Driven Design

The intellectual backing for both of these patterns is provided by Domain Driven Design (DDD), which is an approach to building software that focuses on building a better business model. The field of DDD is expansive, but one of its core concepts is the Aggregate.

Some terminology

But first, in DDD all objects are broken down into two groups:

  • entities - objects with a unique identity, and usually with state (e.g., a person).
  • value objects - immutable objects that are constructed when needed and discarded when no longer valuable (e.g., a person's age).

We use a combination of these components to form an 'aggregate'. An aggregate is a collection of related entities and value objects. An aggregate always has a root entity that forms the basis of this grouping.

For example, a CreditCard aggregate might be composed of:

  • the credit card account entity itself - entity
  • a list of charges - value objects
  • a payment history - value objects

An example bounded context

The above topics are encapsulated within a bounded context, an important topic, but outside the scope of this book. An example of a bounded context with multiple aggregates is below.

Bounded context example

More restrictions

Domain Driven Design requires that Aggregates are only modified as a whole. This means we cannot operate on any individual component, such as modifying the balance of a bank account, without loading the entire aggregate. Think of this as using strong encapsulation in an object-oriented pattern.

This stipulation allows robust, simple testing of the business rules since they are all in one location. Instead of integration tests spanning multiple application layers we can use simple unit tests to guarantee we meet all of our business requirements.

comparing ddd with standard applications

The above example compares a standard application to that of an application using domain driven design. The former uses business logic spread through a number of services that access and modify state data in unknown ways. In a DDD application all changes to the state data are only made through the aggregate resulting in a highly testable set of business rules.

In the diagram a CQRS framework is implied, but any DDD framework should promote the same level of flexibility.

Further reading

Domain Driven Design is a topic far too large to discuss in any detail here. Some additional resources:

CQRS

Command Query Responsibility Segregation (CQRS) splits with standard software development by using separate write and read models an application object. This allows us to create views, or queries, that more accurately model our business environment.

We define "standard application" here as a generic webapp commonly used to provide backend services in business applications.

In standard applications, the domain modeling tends to reflect how the application objects are stored in an RDBMS. Updates generally involve loading the object(s) from a set of tables via a JOIN operation, making the requisite modifications, and persisting the updated object(s).

Read operations are usually accomplished via the same load operation or from a view based on the same database tables. This requires us to make a compromise between our read and write models.

One of the primary drivers in using CQRS is to remove the compromises between read and write models.

CQRS basics

When using CQRS we divide our application into two parts: command processors and queries. Users interact to modify the system by sending commands but receive no information back. When a user interacts with the query side of our application they receive information but there is a guarantee that no changes will be made.

CQRS

A number of strategies have been devised for decoupling the command and query side of applications. The most successful among them is communication across this divide via events. This approach leverages the benefits of a reactive architecture as well as providing an opportunity to use event sourcing.

Updates

When creating updates under the pattern of CQRS, the focus is solely on what the changes are, and not on what these changes mean to any views or queries. This involves requesting the changes via a command and reflecting the actual changes in one or more events.

To do this, we use a DDD concept called an aggregate. Roughly, an aggregate combines the state of an object that is being acted upon, along with all the business rules of our application. This approach strives to remove any technical complexity near the business rules where applications are most sensitive to the errors that degrade their agility.

The aggregate’s job is to consider the command in the context of its current state and determine what business facts need to be recorded. In effect, a command is a request that is subject to security, validation, application state and business rules. The aggregate is the arbiter of these rules.

For instance, if we send an update-email command, we would expect an email-updated event to be produced by the customer aggregate.

Note the difference in naming between a command and an event. A command is a request in the imperative whereas an event is a statement of fact in the past tense.

A single event per command is the most common situation, but this can change based on a number of factors. In the event an email address is configured as the customer’s primary contact, we could see a primary-contact-updated event as well.

Using the same example: If the provided email address is identical to the old email address, then we may not have any events at all since there is no change to be reflected. Other situations with no resultant events could be seen for a variety of other reasons, such as if the new email address is invalid or if the user requesting the update is not authorized to do so.

Queries

Once events are published they can be consumed by our queries (a.k.a., views). As queries consume events they modify their own state and produce something similar to the views that we have in standard applications. Their real flexibility is derived from the fact that these queries are not tied in any way to our write model.

In the previous example we produced events that we could imagine to be of interest to several queries. Certainly, we would have a customer-information query that would need to be updated, but then we might have additional queries such as an all-customer-contacts query that would also respond to the same event.

Additionally, other downstream services may respond to these events similarly to how they would in any other messaging-based application. Using the same example we might have a service that sends a verification email to the new address after an email-updated event is fired.

CQRS

An example of how a single command might move through a CQRS system.

Event Sourcing

Event sourcing adds to the flexibility of CQRS by relying upon the events as our source of truth. Events are now historical facts that we can use to calculate the current state in ways that we did not originally intend. In this way, event sourcing acts similar to a financial ledger.

Event NameEvent ValueAccount Balance
account-openedAccount XYZ opened by user John Doe$0.00
money-depositedJohn Doe deposited $500$500.00
check-clearedCheck #1127 cleared for $27.15$472.85
cash-withdrawn$100 cash withdrawn from ATM #243$372.85

In this example we have four events that have been stored. Consider that a new business requirement is added to track money outflow from accounts. There is no way to do this immediately if the current state is all that is persisted (i.e., the current account balance).

However, because our source of truth is these events rather than a simple balance, it is easy to add the business logic after the fact and calculate what we need from the persisted events.

Event NameEvent ValueAccount BalanceOutflow
account-openedAccount XYZ opened by user John Doe$0.00$0.00
money-depositedJohn Doe deposited $500$500.00$0.00
check-clearedCheck #1127 cleared for $27.15$472.85$27.15
cash-withdrawn$100 cash withdrawn from ATM #243$372.85$127.15

In this way, we have used the persisted events to provide new business information that otherwise would not have been available. In essence we have turned back time and we can now use new logic to compute states that would otherwise be impossible to know.

Getting started

For this tutorial we will build an application to manage the logic of a bank account. As a simple set of business rules, we want to:

  • accept deposits
  • provide withdrawals
  • allow our customers to write checks
  • disallow customers from overdrawing their account

Project setup

Okay let's get some code going. First, up we need a workspace. You know the drill, find your favorite playspace on your hard drive and start a new Rust bin project.

cargo new --bin mybank

There is a lot that happens behind the scenes in a CQRS/event sourcing application, so we'll be using the cqrs-es framework to get us off the ground.
Add these dependencies in your cargo.toml:

[dependencies]
cqrs-es = "0.0.18"
serde = { version = "1.0.104", features = ["derive"]}
serde_json = "1.0"

All of the examples included here are simplified from the cqrs-demo project. More detailed examples can be found by exploring that package.

Aggregate

Let's start with an aggregate where we can store the state information needed make decisions based on our business rules. For our bank account example we will call our aggregate BankAccount. And for our simple set of business rules, we will only need to give the aggregate a single field, balance.

#[derive(Serialize, Deserialize)]
struct BankAccount {
    balance: f64,
}

In order to operate within the cqrs-es framework, we will need to derive the serde traits, Serialize and Deserialize, and to implement two traits, Default and cqrs_es::Aggregate.

impl Aggregate for BankAccount {
    fn aggregate_type() -> &'static str {
        "account"
    }
}

impl Default for BankAccount {
    fn default() -> Self {
        BankAccount {
            balance: 0_f64
        }
    }
}

The cqrs_es::Aggregate trait provides a name for the aggregate type to distinguish it from any other aggregate.

Domain Events

Next we will need to create some domain events. Note that we qualify events with 'domain' to differentiate them from other events that might exist within our application. These events directly speak to changes in aggregate state.

In this cqrs-es framework the domain events are expected to be an enum with payloads, this will give us a single root event for each aggregate. By convention, each payload has the same name as the associate element, elements that do not require additional information use an empty payload.

The enum as well as the payloads should derive several traits.

  • Debug - used for error handling and testing.
  • Clone - the event may be passed to a number of downstream queries in an asynchronous manner, so we will want to ensure that each one gets its' own clone.
  • Serialize, Deserialize - serialization is essential for both storage and transmission to external queries.
  • PartialEq - we will be adding a lot of tests to verify that our business logic is correct.

Adding events and payloads

Let's add three self-descriptive events as part of a single enum.

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
enum BankAccountEvent {
    CustomerDepositedMoney(CustomerDepositedMoney),
    CustomerWithdrewCash(CustomerWithdrewCash),
    CustomerWroteCheck(CustomerWroteCheck)
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct CustomerDepositedMoney {
    amount: f64,
    balance: f64,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct CustomerWithdrewCash {
    amount: f64,
    balance: f64,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct CustomerWroteCheck {
    check_number: String,
    amount: f64,
    balance: f64,
}

Again, all of our events are in the past tense. This is important.

Our events now need to implement cqrs_es::DomainEvent<BankAccount>. The purpose of this traits single function is to update the state of an aggregate instance based on events that have happened.

The usual way to handle this is to use the enum implementation and pass the event down to the payload level applying the data. Each payload then updates the aggregate with the appropriate logic.

impl DomainEvent<BankAccount> for BankAccountEvent {
    fn apply(self, account: &mut BankAccount) {
        match self {
            BankAccountEvent::CustomerDepositedMoney(e) => {e.apply(account)},
            BankAccountEvent::CustomerWithdrewCash(e) => {e.apply(account)},
            BankAccountEvent::CustomerWroteCheck(e) => {e.apply(account)},
        }
    }
}

impl DomainEvent<BankAccount> for CustomerDepositedMoney {
    fn apply(self, account: &mut BankAccount) {
        account.balance = self.balance;
    }
}
impl DomainEvent<BankAccount> for CustomerWithdrewCash {
    fn apply(self, account: &mut BankAccount) {
        account.balance = self.balance;
    }
}
impl DomainEvent<BankAccount> for CustomerWroteCheck {
    fn apply(self, account: &mut BankAccount) {
        account.balance = self.balance;
    }
}

Note that the apply function has no return value. The act of applying an event is simply bookkeeping, the action has already taken place.

An event is a historical fact, it can be ignored, but it should never cause an error.

Commands

In order to make changes to our system we will need commands. Let's start with a few commands that match up closely to the events that we have already written.

struct DepositMoney {
    amount: f64
}

struct WithdrawMoney {
    amount: f64
}

struct WriteCheck {
    check_number: u32,
    amount: f64
}

These will all need to implement the cqrs_es::Command<BankAccount,BankAccountEvent> trait. The handle method of this trait is where our business logic will go, for now we will leave that out and just return an empty vector.

impl Command<BankAccount, BankAccountEvent> for DepositMoney {
    fn handle(self, account: &BankAccount) -> Result<Vec<BankAccountEvent>, AggregateError> {
        Ok(vec![])
    }
}

impl Command<BankAccount, BankAccountEvent> for WithdrawMoney {
    fn handle(self, account: &BankAccount) -> Result<Vec<BankAccountEvent>, AggregateError> {
        Ok(vec![])
    }
}

impl Command<BankAccount, BankAccountEvent> for WriteCheck {
    fn handle(self, account: &BankAccount) -> Result<Vec<BankAccountEvent>, AggregateError> {
        Ok(vec![])
    }
}

We now have all the components in place to begin adding our tests.

Adding aggregate tests

Now that we have the basic components in place we can begin setting up our aggregate tests. These are the tests that we will use to verify the business logic for our application. Testing is one of the most valuable aspects of CQRS/event sourcing as it allows us to setup tests that have no coupling with our application logic.

This occurs because we rely only on events for past state, so no amount of refactoring of our application logic will affect the outcome. These tests should follow a pattern that you are likely familiar with:

  • Given some past events
  • When a command is applied
  • Then some result is expected

Let's first add a test module and define a new AccountTestFramework type for our test framework.

#[cfg(test)]
mod aggregate_tests {
    use super::*;
    use cqrs_es::test::TestFramework;

    type AccountTestFramework = TestFramework<BankAccount,BankAccountEvent>;
}

A first aggregate test

Now within our aggregate_tests module we will add our first test. Let's pass a DepositMoney command and we will expect to see a CustomerDepositedMoney event. Since we do not require any previous events, we can initiate our test with the given_no_previous_events method.

#[test]
fn test_deposit_money() {
    let expected = BankAccountEvent::CustomerDepositedMoney(CustomerDepositedMoney { amount: 200.0, balance: 200.0 });

    AccountTestFramework::default()
        .given_no_previous_events()
        .when(DepositMoney{ amount: 200.0 })
        .then_expect_events(vec![expected]);
}

Now if we run this test, we should see a test failure with the output looking something like this:

thread 'aggregate_tests::test' panicked at 'assertion failed: `(left == right)`
  left: `[]`,
 right: `[CustomerDepositedMoney(CustomerDepositedMoney { amount: 200.0, balance: 200.0 })]`', <::std::macros::panic ...

We have not added any logic yet, so this is what we should see. We have told the test to expect a CustomerDepositedMoney event, but none has been produced.

Adding business logic

Let's go back to our Command implementation for DepositMoney and fix this.

impl Command<BankAccount, BankAccountEvent> for DepositMoney {
    fn handle(self, account: &BankAccount) -> Result<Vec<BankAccountEvent>, AggregateError> {
        let balance = account.balance + self.amount;
        let event_payload = CustomerDepositedMoney {
            amount: self.amount,
            balance
        };
        Ok(vec![BankAccountEvent::CustomerDepositedMoney(event_payload)])
    }
}

And running our first test again - success!

Dealing with previous events

Now we should verify that our logic is valid if there is a previous balance. For this, we will use the given method to initiate the test, along with a vector containing a sole previous event:

#[test]
fn test_deposit_money_with_balance() {
    let previous = BankAccountEvent::CustomerDepositedMoney(CustomerDepositedMoney { amount: 200.0, balance: 200.0 });
    let expected = BankAccountEvent::CustomerDepositedMoney(CustomerDepositedMoney { amount: 200.0, balance: 400.0 });

    AccountTestFramework::default()
        .given(vec![previous])
        .when(DepositMoney{ amount: 200.0 })
        .then_expect_events(vec![expected]);
}

These exercises feel a little-brain dead, but provide a good example of how these tests are structured. Next we will start adding some real logic.

Adding more logic

In our simple example a customer can always deposit money, but making a cash withdrawal is another thing. We should ensure that the customer has the requested funds available before releasing them, lest the account overdraw.

When discussing events, we noted that that the process of applying events cannot produce an error since it is a past event. Instead, errors should be produced before the event is generated, during the processing of the command.

Account withdrawal - happy path

First, let's add a test for a happy path withdrawal, again with a previous deposit using the given initial method:

#[test]
fn test_withdraw_money() {
    let previous = BankAccountEvent::CustomerDepositedMoney(CustomerDepositedMoney { amount: 200.0, balance: 200.0 });
    let expected = BankAccountEvent::CustomerWithdrewCash(CustomerWithdrewCash { amount: 100.0, balance: 100.0 });

    AccountTestFramework::default()
        .given(vec![previous])
        .when(WithdrawMoney{ amount: 100.0 })
        .then_expect_events(vec![expected]);
}

Since we have not added any withdrawal logic yet this should fail. Let's correct this with some naive logic to produce the event:

impl Command<BankAccount, BankAccountEvent> for WithdrawMoney {
    fn handle(self, account: &BankAccount) -> Result<Vec<BankAccountEvent>, AggregateError> {
        let balance = account.balance - self.amount;
        let event_payload = CustomerWithdrewCash{
            amount: self.amount,
            balance
        };
        Ok(vec![BankAccountEvent::CustomerWithdrewCash(event_payload)])
    }
}

Verify funds are available

Now we have success with our happy path test, but then there is nothing to stop a customer from withdrawing more than is deposited. Let's add a test case using the then_expect_error expect case:

#[test]
fn test_withdraw_money_funds_not_available() {
    AccountTestFramework::default()
        .given_no_previous_events()
        .when(WithdrawMoney{ amount: 200.0 })
        .then_expect_error("funds not available")
}

We should see our new test fail since our naive logic cannot handle this yet. Now we will need to update our command logic to return an error when this situation arises:

impl Command<BankAccount, BankAccountEvent> for WithdrawMoney {
    fn handle(self, account: &BankAccount) -> Result<Vec<BankAccountEvent>, AggregateError> {
        let balance = account.balance - self.amount;
        if balance < 0_f64 {
            return Err(AggregateError::new("funds not available"))
        }
        let event_payload = CustomerWithdrewCash{
            amount: self.amount,
            balance
        };
        Ok(vec![BankAccountEvent::CustomerWithdrewCash(event_payload)])
    }
}

And we should now see our test pass.

Note that handling a command is always an atomic process, either all produced events become a part of the factual history of this aggregate instance, or an error is returned.

Putting it all together

Now that we have built and tested the logic for our application we need to find a way to utilize it. We will start by building a test application with in-memory persistence in order to understand the fundamentals. We will need three things for this:

  • an event store to insert and retrieve our events
  • a query to read our events once committed
  • a framework to wire everything together and process commands

Using an event store

In an event sourced application the domain events are our source of truth, and to provide persistence we need an event store. Any persistence mechanism can be used but there are a few things that we need from a production event store:

  • append only
  • load all events, in order of commit, for a single aggregate instance
  • a guarantee that no events are missing
  • optimistic locking on the aggregate instance
  • provide additional metadata, outside of the event payload, for auditing or logging use

To provide our needed guarantees we identify any domain event by the combination of the aggregate type, the aggregate instance ID and the sequence number. This allows us to correctly order all events, guarantee that we are not missing any events for an aggregate instance, and to provide optimistic locking on append.

To keep all of the context surrounding and event together with the event payload, we use an EventEnvelope consisting of:

  • aggregate instance ID
  • sequence number
  • aggregate type
  • payload
  • metadata

The EventStore trait

In our application we need an implementation of EventStore for appending and loading events. For our test application we will use MemStore, the in-memory event store that ships with the cqrs-es crate.

use cqrs_es::mem_store::MemStore;

let event_store = MemStore::<BankAccount,BankAccountEvent>::default();

This implementation will not give us any real persistence but it will allow us to get started with testing our application. Later we will use another crate to provide a production capable implementation.

A simple query

The command processing portion of a CQRS handles updates to the system but provides no insight into the current state. For this we will need one or more queries that read the events as they are committed. In the cqrs-es crate these events should implement the QueryProcessor trait.

For our first query, we will just print the aggregate instance ID, the sequence number and the event payload (pretty-serialized for readability).

struct SimpleLoggingQuery {}

impl QueryProcessor<BankAccount, BankAccountEvent> for SimpleLoggingQuery {
    fn dispatch(&self, aggregate_id: &str, events: Vec<EventEnvelope<BankAccount, BankAccountEvent>>) {
        for event in events {
            let payload = serde_json::to_string_pretty(&event.payload).unwrap();
            println!("{}-{}\n{}", aggregate_id, event.sequence, payload);
        }
    }
}

Note that the trait's sole method takes a vector of EventEnvelopes and this allows queries to have the full context surrounding the event. This is different from aggregates which receive only the event payload, thus any information needed for applying business rules and handling commands should be included in the event payload.

Putting everything together

The final piece of our test application is a CQRS framework to load up the aggregate, process incoming commands, persist the events and apply them to our queries. This is provided by a CqrsFramework component which takes an EventStore and a vector of boxed QueryProcessors.

Wiring this all up and firing two commands:

#[test]
fn test_event_store() {
    let event_store = MemStore::<BankAccount, BankAccountEvent>::default();
    let query = SimpleLoggingQuery {};
    let cqrs = CqrsFramework::new(event_store, vec![Box::new(query)]);

    let aggregate_id = "aggregate-instance-A";

    // deposit $1000
    cqrs.execute(aggregate_id, DepositMoney{
        amount: 1000_f64
    }).unwrap();

    // write a check for $236.15
    cqrs.execute(aggregate_id, WriteCheck{
        check_number: "1137".to_string(),
        amount: 236.15
    }).unwrap();
}

To run the test we should ensure that rust does not consume our output.

cargo test -- --nocapture

Which should give us output something like this:

running 1 test
loading: 0 events for aggregate ID 'aggregate-instance-A'
storing: 1 new events for aggregate ID 'aggregate-instance-A'
aggregate-instance-A-1
{
  "CustomerDepositedMoney": {
    "amount": 1000.0,
    "balance": 1000.0
  }
}
loading: 1 events for aggregate ID 'aggregate-instance-A'
storing: 1 new events for aggregate ID 'aggregate-instance-A'
aggregate-instance-A-2
{
  "CustomerWroteCheck": {
    "check_number": "1137",
    "amount": 236.15,
    "balance": 763.85
  }
}

Here we see the output from our SimpleLoggingQuery along with some logging from the MemStore which is just what we hoped for.

This shows our entire framework working including loading events, rebuilding the aggregate, processing commands and distributing events to a query. Next, we will move on to actually using this in an application.

Building the application

For a bare minimum operating application we are missing a number of components including:

  • non-volatile persistence
  • a Restful API or other interface
  • useful queries

Sorry, we're just getting started here, this isn't quite done.

Have some patience, this section should be completed soon.

In the meantime, a demo of wiring this all together is available in the cqrs-demo project.

Including metadata with our commands

Sorry, we're just getting started here, this isn't quite done.

Advanced topics

Sorry, we're just getting started here, this isn't quite done.

Have some patience, this section should be completed soon.

Service injection

Debugging production errors locally

Event replay

Event upcasters