How to Get the Current Entity State from Events

Oskar Dudycz avatar
Oskar Dudycz

Introduction

In Event Sourcing, the application state is stored in a series of events. When you add an event, it is placed at the end of a structure called an append-only log. Events are the source of truth. This has many advantages, such as:

  • The history of changes in your system
  • Simplified diagnostics
  • Alignment to business, as the code structures correspond to business facts

However, it does not have to cause a complete disruption in your code. You can still use aggregates and entities. In Event Sourcing, events are logically grouped and linked into streams. Streams are ordered sequences of events. One stream includes all events related to a specific business object, such as InvoiceInitiated, InvoiceIssued, and InvoiceSent.

Many people believe that Snapshots are a must-have in an Event-Sourced system. Instead of retrieving all stream events to rebuild the state, you could retrieve one record and use it for your business logic. It sounds promising and can be helpful as a technical optimisation technique, but should not be consistently used as a starting point.

“But, isn’t loading more than one event a performance issue?”

Frankly, it’s not. Downloading even a dozen or several dozen small events incurs minimal overhead. Events are concise, containing only the information needed. Event Stores are optimised for such operations and the reads scale very well. You can read more about snapshots in this article, Snapshots in Event Sourcing).

Building State from Events

The recommended approach is to build the current state from events with the following steps:

  1. Get all events for a given stream. You choose them based on the stream identifier (derived from the business object/record ID).
  2. Sort the events in the order of occurrence.
  3. Create a default, empty entity (e.g., using the default constructor).
  4. Apply each event sequentially to the entity.

The first three points are relatively obvious, but what does “apply an event” mean? There are two ways:

  • Use the When function. You pass a generic event object as an input parameter. Inside the method, you can use pattern matching to determine what logic applies to the specific event type. It is a framework-independent solution. You have to write a bit more yourself, but there is less magic.
  • Some frameworks provide convention-based solutions that simplify handling and make it a bit more magical. You could build a convention-based mechanism that reads events and applies them internally.

This process is also called stream aggregation. This article will focus on the general approach for now to understand the flow properly. In C#, it might look like this:

C#

public record Person(string Name, string Address);

public record InvoiceInitiated(
    double Amount,
    string Number,
    Person IssuedTo,
    DateTime InitiatedAt
);

public record InvoiceIssued(
    string IssuedBy,
    DateTime IssuedAt
);

public enum InvoiceSendMethod
{
    Email,
    Post
}

public record InvoiceSent(
    InvoiceSendMethod SentVia,
    DateTime SentAt
);

public enum InvoiceStatus
{
    Initiated = 1,
    Issued = 2,
    Sent = 3
}

public class Invoice
{
    public string Id { get; set; }
    public double Amount { get; private set; }
    public string Number { get; private set; }
    public InvoiceStatus Status { get; private set; }
    public Person IssuedTo { get; private set; }
    public DateTime InitiatedAt { get; private set; }
    public string IssuedBy { get; private set; }
    public DateTime IssuedAt { get; private set; }
    public InvoiceSendMethod SentVia { get; private set; }
    public DateTime SentAt { get; private set; }

    public void When(object @event)
    {
        switch (@event)
        {
            case InvoiceInitiated invoiceInitiated:
                Apply(invoiceInitiated);
                break;
            case InvoiceIssued invoiceIssued:
                Apply(invoiceIssued);
                break;
            case InvoiceSent invoiceSent:
                Apply(invoiceSent);
                break;
        }
    }

    private void Apply(InvoiceInitiated @event)
    {
        Id = @event.Number;
        Amount = @event.Amount;
        Number = @event.Number;
        IssuedTo = @event.IssuedTo;
        InitiatedAt = @event.InitiatedAt;
        Status = InvoiceStatus.Initiated;
    }

    private void Apply(InvoiceIssued @event)
    {
        IssuedBy = @event.IssuedBy;
        IssuedAt = @event.IssuedAt;
        Status = InvoiceStatus.Issued;
    }

    private void Apply(InvoiceSent @event)
    {
        SentVia = @event.SentVia;
        SentAt = @event.SentAt;
        Status = InvoiceStatus.Sent;
    }
}

The usage is as follows:

var invoiceInitiated = new InvoiceInitiated(
    34.12,
    "INV/2021/11/01",
    new Person("Oscar the Grouch", "123 Sesame Street"),
    DateTime.UtcNow
);

var invoiceIssued = new InvoiceIssued(
    "Cookie Monster",
    DateTime.UtcNow
);

var invoiceSent = new InvoiceSent(
    InvoiceSendMethod.Email,
    DateTime.UtcNow
);

// 1, 2. Get all events and sort them in the order of appearance
var events = new object[]
{
    invoiceInitiated,
    invoiceIssued,
    invoiceSent
};

// 3. Construct empty Invoice object
var invoice = new Invoice();

// 4. Apply each event on the entity
foreach (var @event in events)
{
    invoice.When(@event);
}

If you prefer, you can add the base class with an abstract method to enure that classes follow the convention and write the more generalised logic.

public abstract class Aggregate<TId>
{
    public TId Id { get; protected set; }

    public abstract void When(object @event);
}

Having that, you could write such a method for KurrentDB to retrieve the aggregate state from events:

public async Task<TAggregate?> Find<TAggregate, TId>(
    Guid id,
    CancellationToken cancellationToken
) where TAggregate : Aggregate<TId>, new()
{
    var readResult = eventStore.ReadStreamAsync(
        Direction.Forwards,
        $"{typeof(T).Name}-{id}",
        StreamPosition.Start,
        cancellationToken: cancellationToken
    );

    var aggregate = new TAggregate();

    await foreach (var @event in readResult)
    {
        var eventData = Deserialize(@event);
        aggregate.When(eventData!);
    }

    return aggregate;
}

Of course, this is a highly imperative approach. If you prefer a functional approach, you could use a pattern described in this article, Why Partial is an extremely useful TypeScript feature.

In the functional approach, you don’t need base classes. You don’t need aggregates. Instead, you’re splitting the behavior (functions) from the state (entity).

TypeScript

In TypeScript, having event and entity defined as:

type Event<
  EventType extends string = string,
  EventData extends Record<string, unknown> = Record<string, unknown>,
  EventMetadata extends Record<string, unknown> = Record<string, unknown>
> = Readonly<{
  type: Readonly<EventType>;
  data: Readonly<EventData>;
  metadata?: Readonly<EventMetadata>;
}>;

type Person = Readonly<{
  name: string;
  address: string;
}>;

type InvoiceInitiated = Event<
  'invoice-initiated',
  {
    number: string;
    amount: number;
    issuedTo: Person;
    initiatedAt: Date;
  }
>;

type InvoiceIssued = Event<
  'invoice-issued',
  {
    number: string;
    issuedBy: string;
    issuedAt: Date;
  }
>;

type InvoiceSent = Event<
  'invoice-sent',
  {
    number: string;
    sentVia: InvoiceSendMethod;
    sentAt: Date;
  }
>;

type InvoiceEvent =
  | InvoiceInitiated
  | InvoiceIssued
  | InvoiceSent;

type Invoice = Readonly<{
  number: string;
  amount: number;
  status: InvoiceStatus;
  issuedTo: Person;
  initiatedAt: Date;
  issued?: Readonly<{
    by?: string;
    at?: Date;
  }>;
  sent?: Readonly<{
    via?: InvoiceSendMethod;
    at?: Date;
  }>;
}>;

You can define the When method as:

function when(
  currentState: Partial<Invoice>,
  event: CashRegisterEvent
): Partial<Invoice> {
  switch (event.type) {
    case 'invoice-initiated':
      return {
        number: event.data.number,
        amount: event.data.amount,
        status: InvoiceStatus.INITIATED,
        issuedTo: event.data.issuedTo,
        initiatedAt: event.data.initiatedAt,
      };

    case 'invoice-issued':
      return {
        ...currentState,
        status: InvoiceStatus.ISSUED,
        issued: {
          by: event.data.issuedBy,
          at: event.data.issuedAt,
        },
      };

    case 'invoice-sent':
      return {
        ...currentState,
        status: InvoiceStatus.SENT,
        sent: {
          via: event.data.sentVia,
          at: event.data.sentAt,
        },
      };

    default:
      return {
        ...currentState,
      };
  }
}

Using the reduce method and partial type described in the previously mentioned article, you can define the generic stream aggregation method as:

function aggregateStream<Aggregate, StreamEvents extends Event>(
  events: StreamEvents[],
  when: (
    currentState: Partial<Aggregate>,
    event: StreamEvents
  ) => Partial<Aggregate>,
  check?: (state: Partial<Aggregate>) => state is Aggregate
): Aggregate {
  const state = events.reduce<Partial<Aggregate>>(when, {});

  if (!check) {
    console.warn('No type check method was provided in the aggregate method');
    return <Aggregate>state;
  }

  if (!check(state)) {
    throw 'Aggregate state is not valid';
  }

  return state;
}

Then you could use it as follows to rebuild the current state:

const events: InvoiceEvent[] = [];

for await (const resolvedEvent of eventStore.readStream(`invoice-${invoiceId}`)) {
  events.push(<InvoiceEvent>{
    type: resolvedEvent.event!.type,
    data: resolvedEvent.event!.data,
    metadata: resolvedEvent.event?.metadata,
  });
}

const invoice = aggregateStream<Invoice, InvoiceEvent>(
  events,
  when,
  isInvoice
);

An object-oriented approach brings more ceremony. However, it has an advantage over the functional approach, as it keeps object state and behavior grouped together.

Stream aggregation is a simple but powerful pattern. It enables easy debugging, facilitates writing unit tests, and provides better control over what is happening. It’s also the essence of Event Sourcing, where events are treated as the source of truth.

Check out the detailed samples in these repositories:

https://github.com/oskardudycz/EventSourcing.NetCore https://github.com/oskardudycz/EventSourcing.NodeJS

This post was written and published by Oskar Dudycz on his blog, and it has been edited and republished here with his permission.