logo le blog invivoo blanc

Event Sourcing and CQRS using Kafka Streams

23 September 2021 | Design & Code | 0 comments

In this article we are going to study Event Sourcing, CQRS and Stream processing with Apache Kafka.

The classical way to store state changes of an application is to use a database, it can be either a relational database or a NoSQL database it doesn’t matter. Using a relational database we can store any changes as the last state, and it’s uncommon for these types of systems to be CRUD (Create, Read, Update and Delete).

With these types of systems, the Rollback to revert to a previous state is very complicated to implement.

Unlike the “classic” approach, when using Event Sourcing pattern, we persist an application state changes as a sequence of state-changing events .

To better understand the problem, we will see a sample example that will illustrate the difference between systems based on CRUD, and systems based on Event Sourcing.

Imagine a bank that wants to develop its transactional system (to put it simple, transactions between customers). The state of the system in this case is the current customer balance.

With CRUD based systems we will have a relational database with a table in this form :

Now suppose that customer A sends 1000 euros to customer B, in this case the state of the database becomes :

What will happen if customer A complains that his balance is not consistent, or if we want to revert to a previous state or perform statistics on the number of transactions carried out? etc…

With Event Sourcing every time a transaction is made, a new TransactionMade event is added to the account’s aggregate events list and kept in a single atomic operation. Later, the account balance can be reconstituted by replaying the persistent TransactiontMade events.

In a nutshell, Event Sourcing is just a different way of storing data. It came to answer this question : how to reliably/atomically update the database and publish events?

Benefits of event sourcing

  • Solves data consistency issues in a Microservice based architecture
  • Reliable event publishing
  • Eliminates mapping problem
  • Reliable Audit
  • Preserved history
  • Implement futures requirements more easily
  • High performance
  • Scalable view

Drawbacks of event sourcing

  • Unfamiliar style of programming
  • Design and implementation are more complex
  • Problems with duplication (idempotence ? exactly once semantic ?)
  • The implementation of the interactive queries (querying the event store) is challenging

What is Apache Kafka?

Kafka is a high performance, low latency, scalable and durable broker that is used by thousands of businesses around the world. Event Sourcing and Apache Kafka are closely related, since Apache Kafka maintains an immutable sequence of events that multiple applications can subscribe to.


With Apache Kafka multiple producers can subscribe to multiple topics, and publish events to multiple partitions. Consumers on the other hand are divided into consumer groups, consumers in the same consumer group can only read from different partitions (this is how Kafka guarantees order by partition). Two consumers from different consumer groups can read the same event.
With this Apache Kafka gives the possibility to publish an event once and to consume it multiple times.

CQRS (Command Query Responsibility Segregation)

CQRS is a pattern used in microservices architectures that is widely used with Event Sourcing. The idea of this pattern is to divide the application into two parts, the “Command” part which allows you to modify the state of the system and the “Query” part to query the system without changing the state, the only responsibility of this part is to make queries efficiently and quickly.

The role of the “Event Handler ” here is to transform events and write a materialized view that will be queried.

The Materialized view can be MongoDB, Redis, Kafka (KTable or Table in KSQL), SQL or Elasticsearch…

How to implement CQRS with Apache Kafka?

A very easy way to implement CQRS with Apache Kafka is to use Kafka Streams which is used to write complex logic, in this case the Events part will directly write to a Kafka Topic and the “Event Handler” will be implemented using Kafka Streams.

For state storage Kafka Streams use by default RocksDB which is an in memory database, we can also use KSQL if we think we can write our real-time job as SQL-like.

Kafka Streams

Kafka Streams is a library for creating streaming applications, in particular applications that transform input Kafka Topic into output Kafka Topic (or calls to external services, or database updates, …) in a distributed and fault-tolerant manner.

Stream processing is a programming paradigm, equivalent to reactive programming, that allows some applications to more easily exploit a limited form of parallel processing.

Benefits of using Kafka Streams

Unlike other streaming systems (Apache Spark, Storm, Flink), Kafka Streams is just a library that we add in our application. Among the great advantages of kafka streams we find :

  • Exactly once semantic
  • Easy integration with Kafka
  • Easy integration with Schema Registry for data governance
  • Scalability and parallelism

The local Store can be an in-memory database like RocksDB or just an in-memory HashMap.
 
The way it works is that each instance of the application that integrates the Kafka Streams library to perform stateful stream processing, hosts a subset of the application state, modeled as state store partitions. The store must be partitioned in the same way as the application keyspace.

Demo

To fully understand event sourcing and cqrs I created an application Url shortener which uses kafka streams to do cqrs.

The idea of this application is to take a url and hash it to make it smaller, so the app will be responsible for redirecting to the original url

link to the Github repository : https://github.com/zakariamaaraki/Url-shortener

Code snippet

This code snippet calculates the number of visits to each URL

@Configuration
@Slf4j
@EnableKafkaStreams
public class StreamTopologyConfig {
 
  // Serializers/deserializers (serde) for String and Long types
  final Serde<String> stringSerde = Serdes.String();
  final Serde<Long> longSerde = Serdes.Long();
 
  @Bean
  public Topology topology(@Value("${spring.kafka.topic.visitor}") String topicName,
                           @Value("${spring.kafka.store.visitor-store-name}") String storeName,
                           @Autowired StreamsBuilder builder) {
    
     KStream<String, Long> stream = builder.stream(topicName, Consumed.with(stringSerde, longSerde));
    
     KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(storeName);
    
     stream.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
           .aggregate(() -> 0L, (url, counter, aggregate) -> counter + aggregate, Materialized.<String, Long>as(storeSupplier)
                 .withKeySerde(stringSerde)
                 .withValueSerde(longSerde));
    
     log.info("{}", builder.build().describe());
    
     return builder.build();
  }

}

Conclusion

In this article we have seen :

  • What is Event Sourcing
  • Benefits of Event Sourcing
  • Drawbacks of event Sourcing
  • Apache Kafka and Kafka Streams
  • Benefits of using Kafka Streams
  • Relationship between Apache Kafka and Event Sourcing
  • The pattern CQRS
  • The implementation of CQRS using Kafka Streams

References

Kafka Apache documentation

Promised Land of Event Sourcing

Can you also check our other article about Kafka: the Big Data streaming platform if you want to dig into the sibject !