CQRS and Event Sourcing using Rust
The cqrs-es
crate
provides a lightweight framework for building applications utilizing CQRS and event sourcing.
The project targets serverless architectures but can be used in any application seeking to utilize these patterns to
build better software.
Chapter 1 provides an introduction to CQRS and event sourcing as well as the underlying design strategy that they support, Domain Driven Design. This chapter is essential reading for anyone not familiar with these topics.
Chapter 2 provides a tutorial for getting started with the framework. This involves building an aggregate, commands, events and the associated trait implementations for a basic CQRS system. Our demo application will simulate a simple bank account
Chapter 3 covers the basics of implementing domain tests. The ability to simply build supple tests are one of the primary benefits of a CQRS system, and here we explain how to effectively build these.
Chapter 4 covers building a simple query processor as well as putting all of these components together in a proper CQRS framework. We will use a naive, in-memory event store to facilitate this mini-application that will allow us to explore CQRS with simple unit tests.
Chapter 5 discusses the additional features needed to work this framework into a real application.
The patterns
Command-Query Responsibility Segregation (CQRS) and event sourcing are patterns that enable many of the concepts behind Domain Driven Design. All of these tools are designed to provide a great deal of flexibility for applications that have complex or rapidly changing business rules.
By separating the business rules from the technical aspects of an application we remove many of the inherent barriers to software changes that exist in standard applications. Any application with complex or rapidly change rules might be a good candidate for using CQRS and event sourcing.
A note on terminology
Though CQRS and event sourcing can be used for a range of software problems they are primarily applied to build business applications since those so often require that quality.
I'll use the term "business rules" whenever I'm specifically discussing these complex or changing rulesets. If your application is not a business application, just replace "business rules" with something more appropriate to your domain.
Domain Driven Design
The intellectual backing for both of these patterns is provided by Domain Driven Design (DDD), which is an approach to building software that focuses on building a better business model. The field of DDD is expansive, but one of its core concepts is the Aggregate.
Some terminology
But first, in DDD all objects are broken down into two groups:
- entities - objects with a unique identity, and usually with state (e.g., a person).
- value objects - immutable objects that are constructed when needed and discarded when no longer valuable (e.g., a person's age).
We use a combination of these components to form an 'aggregate'. An aggregate is a collection of related entities and value objects. An aggregate always has a root entity that forms the basis of this grouping.
For example, a CreditCard
aggregate might be composed of:
- the credit card account entity itself - entity
- a list of charges - value objects
- a payment history - value objects
An example bounded context
The above topics are encapsulated within a bounded context, an important topic, but outside the scope of this book. An example of a bounded context with multiple aggregates is below.
More restrictions
Domain Driven Design requires that Aggregates are only modified as a whole. This means we cannot operate on any individual component, such as modifying the balance of a bank account, without loading the entire aggregate. Think of this as using strong encapsulation in an object-oriented pattern.
This stipulation allows robust, simple testing of the business rules since they are all in one location. Instead of integration tests spanning multiple application layers we can use simple unit tests to guarantee we meet all of our business requirements.
The above example compares a standard application to that of an application using domain driven design. The former uses business logic spread through a number of services that access and modify state data in unknown ways. In a DDD application all changes to the state data are only made through the aggregate resulting in a highly testable set of business rules.
In the diagram a CQRS framework is implied, but any DDD framework should promote the same level of flexibility.
Further reading
Domain Driven Design is a topic far too large to discuss in any detail here. Some additional resources:
- Ubiquitous language
- The Blue Book - the original text on DDD by Eric Evans
- The Red Book - a book refined from years of experience with DDD
CQRS
Command Query Responsibility Segregation (CQRS) splits with standard software development by using separate write and read models an application object. This allows us to create views, or queries, that more accurately model our business environment.
We define "standard application" here as a generic webapp commonly used to provide backend services in business applications.
In standard applications, the domain modeling tends to reflect how the application objects are stored in an RDBMS. Updates generally involve loading the object(s) from a set of tables via a JOIN operation, making the requisite modifications, and persisting the updated object(s).
Read operations are usually accomplished via the same load operation or from a view based on the same database tables. This requires us to make a compromise between our read and write models.
One of the primary drivers in using CQRS is to remove the compromises between read and write models.
CQRS basics
When using CQRS we divide our application into two parts: command processors and queries. Users interact to modify the system by sending commands but receive no information back. When a user interacts with the query side of our application they receive information but there is a guarantee that no changes will be made.
A number of strategies have been devised for decoupling the command and query side of applications. The most successful among them is communication across this divide via events. This approach leverages the benefits of a reactive architecture as well as providing an opportunity to use event sourcing.
Updates
When creating updates under the pattern of CQRS, the focus is solely on what the changes are, and not on what these changes mean to any views or queries. This involves requesting the changes via a command and reflecting the actual changes in one or more events.
To do this, we use a DDD concept called an aggregate. Roughly, an aggregate combines the state of an object that is being acted upon, along with all the business rules of our application. This approach strives to remove any technical complexity near the business rules where applications are most sensitive to the errors that degrade their agility.
The aggregate’s job is to consider the command in the context of its current state and determine what business facts need to be recorded. In effect, a command is a request that is subject to security, validation, application state and business rules. The aggregate is the arbiter of these rules.
For instance, if we send an update-email
command, we would expect an email-updated
event to be produced by the
customer
aggregate.
Note the difference in naming between a command and an event. A command is a request in the imperative whereas an event is a statement of fact in the past tense.
A single event per command is the most common situation, but this can change based on a number of factors. In the
event an email address is configured as the customer’s primary contact, we could see a primary-contact-updated
event as well.
Using the same example: If the provided email address is identical to the old email address, then we may not have any events at all since there is no change to be reflected. Other situations with no resultant events could be seen for a variety of other reasons, such as if the new email address is invalid or if the user requesting the update is not authorized to do so.
Queries
Once events are published they can be consumed by our queries (a.k.a., views). As queries consume events they modify their own state and produce something similar to the views that we have in standard applications. Their real flexibility is derived from the fact that these queries are not tied in any way to our write model.
In the previous example we produced events that we could imagine to be of interest to several queries. Certainly, we
would have a customer-information
query that would need to be updated, but then we might have additional queries
such as an all-customer-contacts
query that would also respond to the same event.
Additionally, other downstream services may respond to these events similarly to how they would in any other
messaging-based application. Using the same example we might have a service that sends a verification email to the
new address after an email-updated
event is fired.
An example of how a single command might move through a CQRS system.
Event Sourcing
Event sourcing adds to the flexibility of CQRS by relying upon the events as our source of truth. Events are now historical facts that we can use to calculate the current state in ways that we did not originally intend. In this way, event sourcing acts similar to a financial ledger.
Event Name | Event Value | Account Balance |
---|---|---|
account-opened | Account XYZ opened by user John Doe | $0.00 |
money-deposited | John Doe deposited $500 | $500.00 |
check-cleared | Check #1127 cleared for $27.15 | $472.85 |
cash-withdrawn | $100 cash withdrawn from ATM #243 | $372.85 |
In this example we have four events that have been stored. Consider that a new business requirement is added to track money outflow from accounts. There is no way to do this immediately if the current state is all that is persisted (i.e., the current account balance).
However, because our source of truth is these events rather than a simple balance, it is easy to add the business logic after the fact and calculate what we need from the persisted events.
Event Name | Event Value | Account Balance | Outflow |
---|---|---|---|
account-opened | Account XYZ opened by user John Doe | $0.00 | $0.00 |
money-deposited | John Doe deposited $500 | $500.00 | $0.00 |
check-cleared | Check #1127 cleared for $27.15 | $472.85 | $27.15 |
cash-withdrawn | $100 cash withdrawn from ATM #243 | $372.85 | $127.15 |
In this way, we have used the persisted events to provide new business information that otherwise would not have been available. In essence we have turned back time and we can now use new logic to compute states that would otherwise be impossible to know.
Getting started
For this tutorial we will build an application to manage the logic of a bank account. As a simple set of business rules, we want to:
- accept deposits
- provide withdrawals
- allow our customers to write checks
- disallow customers from overdrawing their account
Project setup
Okay let's get some code going. First, up we need a workspace. You know the drill, find your favorite playspace on your hard drive and start a new Rust bin project.
cargo new --bin mybank
There is a lot that happens behind the scenes in a CQRS/event sourcing application, so we'll be using the
cqrs-es
framework to get us off the ground.
Add these dependencies in your cargo.toml:
[dependencies]
cqrs-es = "0.4.2"
async-trait = "0.1.52"
serde = { version = "1.0", features = ["derive"]}
tokio = { version = "1", features = ["full"] }
All of the examples included here are simplified from the cqrs-demo project. More detailed examples can be found by exploring that package.
Commands
In order to make changes to our system we will need commands. These are the simplest components of any CQRS system and consist of little more than packaged data.
When designing commands an easy mental model to use is that of an HTTP API. Each virtual endpoint would receive just the data that is needed to operate that function.
#[derive(Debug, Deserialize)]
pub enum BankAccountCommand {
OpenAccount { account_id: String },
DepositMoney { amount: f64 },
WithdrawMoney { amount: f64 },
WriteCheck { check_number: String, amount: f64 },
}
Note that the Deserialize
trait is derived.
This is not yet needed, but it will be useful when building out a full application.
The most common way to receive commands from a user is via an HTTP body that can be directly deserialized.
Domain Events
Next we will need to create some domain events. Note that we qualify events with 'domain' to differentiate them from other events that might exist within our application. These are domain events because they make assertions about changes in the aggregate state.
In the cqrs-es
framework the domain events are expected to be an enum with payloads similar to the commands,
this will give us a single root event for each aggregate.
The enum as well as the payloads should derive several traits.
Debug
- used for error handling and testing.Clone
- the event may be passed to a number of downstream queries in an asynchronous manner and will need to be cloned.Serialize, Deserialize
- serialization is essential for both storage and publishing to distributed queries.PartialEq
- we will be adding a lot of tests to verify that our business logic is correct.
Adding events and payloads
Let's add four self-descriptive events as part of a single enum.
#![allow(unused)] fn main() { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum BankAccountEvent { AccountOpened { account_id: String, }, CustomerDepositedMoney { amount: f64, balance: f64, }, CustomerWithdrewCash { amount: f64, balance: f64, }, CustomerWroteCheck { check_number: String, amount: f64, balance: f64, }, } }
Again, all of our events are named in the past tense, this is important.
Our events now need to implement cqrs_es::DomainEvent<BankAccount>
to provide an event_name
and event_version
for each event.
This will be important later in any production system when events need to be changed
(see event upcasters).
#![allow(unused)] fn main() { impl DomainEvent for BankAccountEvent { fn event_type(&self) -> String { let event_type: &str = match self { BankAccountEvent::AccountOpened { .. } => "AccountOpened", BankAccountEvent::CustomerDepositedMoney { .. } => "CustomerDepositedMoney", BankAccountEvent::CustomerWithdrewCash { .. } => "CustomerWithdrewCash", BankAccountEvent::CustomerWroteCheck { .. } => "CustomerWroteCheck", }; event_type.to_string() } fn event_version(&self) -> String { "1.0".to_string() } } }
Custom error and service
Aside from our domain data objects, we'll need two additional components to complete an aggregate. An error to indicate a violation of the business rules, and a set of services that will be made available during command processing.
User error
The Aggregate
trait can return an errorfrom its
handle` method indicating that some rule of the business logic was violated,
this information will usually be returned to the user as an error message.
For example, an attempt to withdraw more money from a bank account than the current balance would return this error
and the user would be informed that the balance was not sufficient for this transaction.
#![allow(unused)] fn main() { #[derive(Debug)] pub struct BankAccountError(String); }
This error should implement Display
and Error
as well.
Additionally, implementing the From<&str>
trait will simplify the business logic that we'll be writing in the
next sections.
#![allow(unused)] fn main() { impl Display for BankAccountError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f,"{}",self.0) } } impl std::error::Error for BankAccountError {} impl From<&str> for BankAccountError { fn from(message: &str) -> Self { BankAccountError(message.to_string()) } } }
External services
Business logic doesn't exist in a vacuum and external services may be needed for a variety of reasons. We don't have much logic built yet, so this will initially just be a placeholder. Let's add a couple of calls that will, for now, always return successfully.
#![allow(unused)] fn main() { pub struct BankAccountServices; impl BankAccountServices { async fn atm_withdrawal(&self, atm_id: &str, amount: f64) -> Result<(), AtmError> { Ok(()) } async fn validate_check(&self, account: &str, check: &str) -> Result<(), CheckingError> { Ok(()) } } pub struct AtmError; pub struct CheckingError; }
Aggregate
With the command and event in place we can now start adding our business logic.
In Domain Driven Design all of this logic belongs within the aggregate which
for our example we will call name BankAccount
.
And for our simple set of business rules, we will use two fields.
#[derive(Serialize, Default, Deserialize)]
pub struct BankAccount {
opened: bool,
// this is a floating point for our example, don't do this IRL
balance: f64,
}
In order to operate within the cqrs-es
framework, we will need the traits, Default
, Serialize
and Deserialize
(all usually derived) and we will implement cqrs_es::Aggregate
, minus any of the business logic.
#![allow(unused)] fn main() { #[async_trait] impl Aggregate for BankAccount { type Command = BankAccountCommand; type Event = BankAccountEvent; type Error = BankAccountError; type Services = BankAccountServices; // This identifier should be unique to the system. fn aggregate_type() -> String { "Account".to_string() } // The aggregate logic goes here. Note that this will be the _bulk_ of a CQRS system // so expect to use helper functions elsewhere to keep the code clean. async fn handle( &self, command: Self::Command, services: &Self::Services, ) -> Result<Vec<Self::Event>, Self::Error> { todo!() } fn apply(&mut self, event: Self::Event) { match event { BankAccountEvent::AccountOpened { .. } => { self.opened = true } BankAccountEvent::CustomerDepositedMoney { amount: _, balance } => { self.balance = balance; } BankAccountEvent::CustomerWithdrewCash { amount: _, balance } => { self.balance = balance; } BankAccountEvent::CustomerWroteCheck { check_number: _, amount: _, balance, } => { self.balance = balance; } } } } }
Identifying the aggregate when persisted
The aggregate_type
method is used by the cqrs-es framework to uniquely identify the aggregate and event
when serialized for persistence. Each aggregate should use a unique value within your application.
#![allow(unused)] fn main() { fn aggregate_type() -> String { "Account".to_string() } }
Handling commands
The handle
method of this trait is where all of the business logic will go, for now we will leave that out and just return an empty vector.
#![allow(unused)] fn main() { // note that the aggregate is immutable and an error can be returned async fn handle(&self, command: Self::Command) -> Result<Vec<Self::Event>, AggregateError<Self::Error>> { todo!() } }
The handle
method does not allow any mutation of the aggregate, state should be changed only by emitting events.
Applying committed events
Once events have been committed they will need to be applied to the aggregate in order for it to update its state.
#![allow(unused)] fn main() { // note the aggregate is mutable and there is no return type fn apply(&mut self, event: Self::Event) { match event { BankAccountEvent::AccountOpened { .. } => { self.opened = true } BankAccountEvent::CustomerDepositedMoney { amount: _, balance } => { self.balance = balance; } BankAccountEvent::CustomerWithdrewCash { amount: _, balance } => { self.balance = balance; } BankAccountEvent::CustomerWroteCheck { check_number: _, amount: _, balance, } => { self.balance = balance; } } } }
Note that the apply
function has no return value. The act of applying an event is simply bookkeeping, the action has
already taken place.
An event is a historical fact, it can be ignored, but it should never cause an error.
Adding aggregate tests
Now that we have the basic components in place we can begin setting up our aggregate tests. These are the tests that we will use to verify the business logic for our application. Testing is one of the most valuable aspects of CQRS/event sourcing as it allows us to configure tests that have no coupling with our application logic.
We can do this because we rely only on events for past state, so no amount of refactoring of our application logic will affect the whether a test passes or fails (as long as the result of the command is the same). These tests follow a pattern that you are likely familiar with:
- Given some past events
- When a command is applied
- Then some result is expected
Let's first add a test module and define a new AccountTestFramework
convenience type for our test framework.
#![allow(unused)] fn main() { #[cfg(test)] mod aggregate_tests { use super::*; use cqrs_es::test::TestFramework; type AccountTestFramework = TestFramework<BankAccount>; } }
A first aggregate test
Now within our aggregate_tests
module we will add our first test.
We do not require any previous events so we can initiate our test with the given_no_previous_events
method.
Let's fire a DepositMoney
command and expect to a CustomerDepositedMoney
event.
#![allow(unused)] fn main() { #[test] fn test_deposit_money() { let expected = BankAccountEvent::CustomerDepositedMoney { amount: 200.0, balance: 200.0 }; AccountTestFramework::with(BankAccountServices) .given_no_previous_events() .when(DepositMoney{ amount: 200.0 }) .then_expect_events(vec![expected]); } }
Now if we run this test, we should see a test failure with output that looks something like this:
thread 'aggregate_tests::test' panicked at 'assertion failed: `(left == right)`
left: `[]`,
right: `[CustomerDepositedMoney{ amount: 200.0, balance: 200.0 }]`', <::std::macros::panic ...
We have not added any logic yet, so this is what we should see.
We have told the test to expect a CustomerDepositedMoney
event, but none has been produced.
Adding business logic
Let's go back to our Command
implementation for DepositMoney
and fix this.
#![allow(unused)] fn main() { async fn handle( &self, command: Self::Command, services: &Self::Services, ) -> Result<Vec<Self::Event>, Self::Error> { match command { BankAccountCommand::DepositMoney { amount } => { let balance = self.balance + amount; Ok(vec![BankAccountEvent::CustomerDepositedMoney { amount, balance, }]) } _ => Ok(vec![]) } } }
And running our first test again - success!
Dealing with previous events
Now we should verify that our logic is valid if there is a previous balance. For this, we will use the given
method to
initiate the test, along with a vector containing a sole previous event:
#![allow(unused)] fn main() { #[test] fn test_deposit_money_with_balance() { let previous = BankAccountEvent::CustomerDepositedMoney { amount: 200.0, balance: 200.0 }; let expected = BankAccountEvent::CustomerDepositedMoney { amount: 200.0, balance: 400.0 }; AccountTestFramework::with(BankAccountServices) .given(vec![previous]) .when(DepositMoney{ amount: 200.0 }) .then_expect_events(vec![expected]); } }
These exercises feel a little-brain dead, but they provide a good example of how these tests are structured. Next we will start adding some real logic.
Adding more logic
In our simple example a customer can always deposit money, but making a cash withdrawal is another thing. We should ensure that the customer has the requested funds available before releasing them, lest the account overdraw.
When discussing events, we noted that the process of applying events cannot produce an error since it is a past event. Instead, errors should be produced before the event is generated, during the processing of the command.
Account withdrawal - happy path
First, let's add a test for a happy path withdrawal, again with a previous deposit using the given
initial method:
#![allow(unused)] fn main() { #[test] fn test_withdraw_money() { let previous = BankAccountEvent::CustomerDepositedMoney { amount: 200.0, balance: 200.0 }; let expected = BankAccountEvent::CustomerWithdrewCash { amount: 100.0, balance: 100.0 }; AccountTestFramework::with(BankAccountServices) .given(vec![previous]) .when(WithdrawMoney{ amount: 100.0 }) .then_expect_events(vec![expected]); } }
Since we have not added any withdrawal logic yet this should fail. Let's correct this with some naive logic to produce the event:
#![allow(unused)] fn main() { async fn handle( &self, command: Self::Command, services: &Self::Services, ) -> Result<Vec<Self::Event>, Self::Error> { match command { BankAccountCommand::WithdrawMoney { amount } => { let balance = self.balance - amount; Ok(vec![BankAccountEvent::CustomerWithdrewCash { amount, balance, }]) } ... } } }
Verify funds are available
Now we have success with our happy path test, but then there is nothing to stop a customer from withdrawing more than
is deposited.
Let's add a test case using the then_expect_error
expect case:
#![allow(unused)] fn main() { #[test] fn test_withdraw_money_funds_not_available() { AccountTestFramework::with(BankAccountServices) .given_no_previous_events() .when(BankAccountCommand::WithdrawMoney { amount: 200.0 }) .then_expect_error(BankAccountError("funds not available".to_string())); } }
We should see our new test fail since our naive logic cannot handle this yet. Now we update our command logic to return an error when this situation arises:
#![allow(unused)] fn main() { async fn handle( &self, command: Self::Command, services: &Self::Services, ) -> Result<Vec<Self::Event>, Self::Error> { match command { BankAccountCommand::WithdrawMoney { amount } => { let balance = self.balance - amount; if balance < 0_f64 { return Err(AggregateError::new("funds not available")); } Ok(vec![BankAccountEvent::CustomerWithdrewCash { amount, balance, }]) } ... } } }
And we should now see our test pass.
Note that handling a command is always an atomic process, either all produced events become a part of the factual history of this aggregate instance, or an error is returned.
Putting it all together
Now that we have built and tested the logic for our application we need to find a way to utilize it. We will start by building a test application with in-memory persistence in order to understand the fundamentals. We will need three things for this:
- an event store to insert and retrieve our events
- a query to read our events once committed
- a framework to wire everything together and process commands
Using an event store
In an event sourced application the domain events are our source of truth, and to provide persistence we need an event store. Any persistence mechanism can be used but there are a few things that we need from a production event store:
- append only
- load all events, in order of commit, for a single aggregate instance
- a guarantee that no events are missing
- optimistic locking on the aggregate instance
- provide additional metadata, outside of the event payload, for auditing or logging use
To provide our needed guarantees we identify any domain event by the combination of the aggregate type, the aggregate instance ID and the sequence number. This allows us to correctly order all events, guarantee that we are not missing any events for an aggregate instance, and to provide optimistic locking on append.
To keep all of the context surrounding and event together with the event payload, we use an EventEnvelope
consisting of:
- aggregate instance ID
- sequence number
- aggregate type
- payload
- metadata
The EventStore
trait
In our application we need an implementation of EventStore
for appending and loading events.
For our test application we will use MemStore
, the in-memory event store that ships with the cqrs-es
crate.
#![allow(unused)] fn main() { use cqrs_es::mem_store::MemStore; let event_store = MemStore::<BankAccount>::default(); }
This implementation will not give us any real persistence but it will allow us to get started with testing our application. Later we will use another crate to provide a production capable implementation.
A simple query
The command processing portion of a CQRS handles updates to the system but provides no insight into the current
state. For this we will need one or more queries that read the events as they are committed. In the cqrs-es
crate
these events should implement the Query
trait.
For our first query, we will just print the aggregate instance ID, sequence number and the event payload.
#![allow(unused)] fn main() { struct SimpleLoggingQuery {} #[async_trait] impl Query<BankAccount> for SimpleLoggingQuery { async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope<BankAccount>]) { for event in events { println!("{}-{}\n{:#?}", aggregate_id, event.sequence, &event.payload); } } } }
Note that the trait's sole method takes a vector of
EventEnvelope
s,
a struct that contains the event along with supporting context and
metadata.
This allows queries to have the full context surrounding the event, important since a query may be
interested in a very different set of fields than those of interest within the aggregate.
E.g., the user's IP address is likely unimportant for the business rules but could be of interest in a query used for security audits
Putting everything together
The final piece of our test application is a CQRS framework to load up the aggregate, process incoming commands,
persist the events and apply them to our queries. This is provided by a CqrsFramework
component which takes an
EventStore
and a vector of boxed Query
s.
Wiring this all up and firing two commands:
#![allow(unused)] fn main() { #[tokio::test] async fn test_event_store() { let event_store = MemStore::<BankAccount>::default(); let query = SimpleLoggingQuery {}; let cqrs = CqrsFramework::new(event_store, vec![Box::new(query)]); let aggregate_id = "aggregate-instance-A"; // deposit $1000 cqrs.execute(aggregate_id, DepositMoney{ amount: 1000_f64 }).unwrap(); // write a check for $236.15 cqrs.execute(aggregate_id, WriteCheck{ check_number: "1337".to_string(), amount: 236.15 }).unwrap(); } }
To run the test we should ensure that rust does not consume our output.
cargo test -- --nocapture
Which should give us output something like this:
running 1 test
loading: 0 events for aggregate ID 'aggregate-instance-A'
storing: 1 new events for aggregate ID 'aggregate-instance-A'
aggregate-instance-A-1
{
"CustomerDepositedMoney": {
"amount": 1000.0,
"balance": 1000.0
}
}
loading: 1 events for aggregate ID 'aggregate-instance-A'
storing: 1 new events for aggregate ID 'aggregate-instance-A'
aggregate-instance-A-2
{
"CustomerWroteCheck": {
"check_number": "1137",
"amount": 236.15,
"balance": 763.85
}
}
Here we see the output from our SimpleLoggingQuery
along with some logging from the MemStore
which is just what we hoped
for.
This shows our entire framework working including loading events, rebuilding the aggregate, processing commands and distributing events to a query. Next, we will move on to actually using this in an application.
Building the application
For a bare minimum operating application we are missing a number of components including:
- a Restful API or other interface
- non-volatile persistence
- useful queries
A demo application with examples of all of these features is available in the cqrs-demo project.
The persist module contains the generic entities needed for a backing event store. A database repository handles the implementation specifics with three options currently available:
These libraries also provide persistence for simple queries.
Note that postgres-es
is used for examples in this user guide but all the crates have similar methods available.
For using postgres-es
for persistence, add to the dependencies section of your Cargo.toml
:
[dependencies]
cqrs-es = "0.3.0"
postgres-es = "0.3.0"
Persisted Event Store
A PersistedEventStore
is used to back the CqrsFramework and handle the storing and loading of aggregates
(including domain events) in a database.
The PersistedEventStore
relies on a PersistedEventRepository
for the actual database access of events and snapshots.
For the postgres-es
crate this is implemented by a PostgresEventRepository
which in turn relies on a
database connection pool.
Creating a PostgresEventRepository
#![allow(unused)] fn main() { fn configure_repo() -> PostgresEventRepository { let connection_string = "postgresql://test_user:test_pass@localhost:5432/test"; let pool: Pool<Postgres> = default_postgress_pool(connection_string).await; PostgresEventRepository::new(pool) } }
The default repository will expect to find tables named events
and snapshots
, but the table names are configurable.
To create these tables in a PostgreSql database (see database initialization files for other repository crates):
CREATE TABLE events
(
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
sequence bigint CHECK (sequence >= 0) NOT NULL,
event_type text NOT NULL,
event_version text NOT NULL,
payload json NOT NULL,
metadata json NOT NULL,
timestamp timestamp with time zone DEFAULT (CURRENT_TIMESTAMP),
PRIMARY KEY (aggregate_type, aggregate_id, sequence)
);
CREATE TABLE snapshots
(
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
last_sequence bigint CHECK (last_sequence >= 0) NOT NULL,
current_snapshot bigint CHECK (current_snapshot >= 0) NOT NULL,
payload json NOT NULL,
timestamp timestamp with time zone DEFAULT (CURRENT_TIMESTAMP),
PRIMARY KEY (aggregate_type, aggregate_id, last_sequence)
);
Note that the snapshots
table is not needed for pure event sourcing.
Queries with persisted views
A ViewRepository
provides a simple database backed repository for views that do not require multiple indexes.
This is designed to back work with a GenericQuery
to apply events to a view synchronously immediately after those
events are committed.
A GenericQuery
will load the view, apply any events, and store the updated version back in the database.
The logic for the update is placed in a View
implementation.
For our bank account example this might look like:
#![allow(unused)] fn main() { impl View<BankAccount> for BankAccountView { fn update(&mut self, event: &EventEnvelope<BankAccount>) { match &event.payload { BankAccountEvent::CustomerDepositedMoney { amount, balance } => { self.ledger.push(LedgerEntry::new("deposit", *amount)); self.balance = *balance; } ... } } } }
The view repositories use the same database connection as the event repositories, for postgres-es
this is a database
connection pool.
#![allow(unused)] fn main() { type MyViewRepository = PostgresViewRepository<MyView,MyAggregate>; fn configure_view_repository(db_pool: Pool<Postgres>) -> MyViewRepository { PostgresViewRepository::new("my_view_name", db_pool) } }
The database must have a table prepared before use, where the table name should match the value passed while
initiating the PostgresViewRepository
.
CREATE TABLE my_view_name
(
view_id text NOT NULL,
version bigint CHECK (version >= 0) NOT NULL,
payload json NOT NULL,
PRIMARY KEY (view_id)
);
To use this view repository with a GenericQuery
it must be configured with the CqrsFramework.
#![allow(unused)] fn main() { fn configure_cqrs(store: PostgresEventStore, my_view_repo: MyViewRepository) -> CqrsFramework { let my_query = GenericQuery::<MyViewRepository, MyView, MyAggregate>::new(my_view_repo); let my_query = Box::new(my_query); CqrsFramework::new(store, vec![my_query]); } }
Including metadata with our commands
Any useful application will require much more information than what is solely needed to satisfy the domain (business) logic. This additional data could be needed for debugging, security, an audit trail or a variety of reasons. Some examples include:
- server name, region or other operational information
- username that authorized the request
- IP address that made the call
- date and time that the command was processed
- a request id for distributed tracing
A Domain Event is intended to only carry information that is pertinent to the domain logic,
this additional information should be added as metadata when the command is processed.
All events that are produced will be persisted along with a copy of this metadata.
Any configured Queries will also receive the metadata along with the event payload as part of an EventEnvelope
.
The CqrsFramework
expects the metadata in the form of key-value pairs stored in a standard HashMap<String,String>
,
this metadata should be passed along with the command at the time of execution.
#![allow(unused)] fn main() { async fn process_command( cqrs: PostgresCqrs<BankAccount>, command: BankAccountCommand, ) -> Result<(), AggregateError<BankAccountError>> { let mut metadata = HashMap::new(); metadata.insert("time".to_string(), chrono::Utc::now().to_rfc3339()); cqrs.execute_with_metadata("agg-id-F39A0C", command, metadata).await } }
Event upcasters
Over time the domain model will need to be modified to adapt to new business rules, and with event sourcing the domain model directly relates to events. Event changes can be minimized by keeping events small and focused, but they will be needed. This can be a challenge because domain events are append-only and immutable.
As an example, if our bank services only local customers there is no need to identify the state as part of their address,
this is understood. The payload for an UpdateAddress
event might look something like:
{
"UpdateAddress": {
"address": "912 Spring St",
"city": "Seattle"
}
}
If however the bank begins servicing customers in other states we'll need additional information in our payload, e.g.,
{
"UpdateAddress": {
"address": "912 Spring St",
"city": "Seattle",
"state": "WA"
}
}
We are event sourced, so we will need to load past events in order to build our aggregate to process new commands. However, the persisted form of the event no longer matches the new structure.
The naive solution of versioning events is not preferred due to the duplication of both business logic and tests. This duplication requires additional maintenance, a risk of logic diverging for the same tasks, and leaves a burden on the developer of any new code to understand the legacy changes.
The preferred solution is to use upcasters to convert a legacy event payload to the structure that is expected by the current aggregate logic.
Event Upcaster
The EventUpcaster
trait provides the functionality to make this conversion.
A persistence repository will use any configured upcasters to 'upcast' events as they are loaded.
For each event, the stored eveent_type
and event_version
will be compared to each upcaster to determine if it
should be upcast, and the to upcast it if needed.
#![allow(unused)] fn main() { pub trait EventUpcaster: Send + Sync { fn can_upcast(&self, event_type: &str, event_version: &str) -> bool; fn upcast(&self, event: SerializedEvent) -> SerializedEvent; } }
The EventUpcaster
trait provides flexibility to modify a serialized event in any way needed including changing the
name and modifying metadata.
In most cases this flexibility is not needed and a SemanticVersionEventUpcaster
can be used, this implementation
will use the provided function to modify the event payload of any event with a matching event_name
and with an
event_version
that is previous to the configured value.
For the above example we only need to add a field:
let upcaster = SemanticVersionEventUpcaster::new("UpdateAddress", "0.3.0", Box::new(my_upcaster));
fn my_upcast_fn(mut payload: Value) -> Value {
match payload.get_mut("UpdateAddress").unwrap() {
Value::Object(object) => {
object.insert("state".to_string(), Value::String("WA".to_string()));
payload
}
_ => panic!("invalid payload encountered"),
}
}