'Using Kafka as a (CQRS) Eventstore. Good idea?

Although I've come across Kafka before, I just recently realized Kafka may perhaps be used as (the basis of) a CQRS, eventstore.

One of the main points that Kafka supports:

  • Event capturing / storing, all HA of course.
  • Pub / sub architecture
  • Ability to replay the eventlog which allows the ability for new subscribers to register with the system after the fact.

Admittedly I'm not 100% versed into CQRS / Event sourcing but this seems pretty close to what an eventstore should be. Funny thing is: I really can't find that much about Kafka being used as an eventstore, so perhaps I am missing something.

So, anything missing from Kafka for it to be a good eventstore? Would it work? Using it production? Interested in insight, links, etc.

Basically the state of the system is saved based on the transactions/events the system has ever received, instead of just saving the current state / snapshot of the system which is what is usually done. (Think of it as a General Ledger in Accounting: all transactions ultimately add up to the final state) This allows all kinds of cool things, but just read up on the links provided.



Solution 1:[1]

Kafka is meant to be a messaging system which has many similarities to an event store however to quote their intro:

The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the retention is set for two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so retaining lots of data is not a problem.

So while messages can potentially be retained indefinitely, the expectation is that they will be deleted. This doesn't mean you can't use this as an event store, but it may be better to use something else. Take a look at EventStoreDB for an alternative.

UPDATE

Kafka documentation:

Event sourcing is a style of application design where state changes are logged as a time-ordered sequence of records. Kafka's support for very large stored log data makes it an excellent backend for an application built in this style.

UPDATE 2

One concern with using Kafka for event sourcing is the number of required topics. Typically in event sourcing, there is a stream (topic) of events per entity (such as user, product, etc). This way, the current state of an entity can be reconstituted by re-applying all events in the stream. Each Kafka topic consists of one or more partitions and each partition is stored as a directory on the file system. There will also be pressure from ZooKeeper as the number of znodes increases.

Solution 2:[2]

I am one of the original authors of Kafka. Kafka will work very well as a log for event sourcing. It is fault-tolerant, scales to enormous data sizes, and has a built in partitioning model.

We use it for several use cases of this form at LinkedIn. For example our open source stream processing system, Apache Samza, comes with built-in support for event sourcing.

I think you don't hear much about using Kafka for event sourcing primarily because the event sourcing terminology doesn't seem to be very prevalent in the consumer web space where Kafka is most popular.

I have written a bit about this style of Kafka usage here.

Solution 3:[3]

I keep coming back to this QA. And I did not find the existing answers nuanced enough, so I am adding this one.

TL;DR. Yes or No, depending on your event sourcing usage.

There are two primary kinds of event sourced systems of which I am aware.

Downstream event processors = Yes

In this kind of system, events happen in the real world and are recorded as facts. Such as a warehouse system to keep track of pallets of products. There are basically no conflicting events. Everything has already happened, even if it was wrong. (I.e. pallet 123456 put on truck A, but was scheduled for truck B.) Then later the facts are checked for exceptions via reporting mechanisms. Kafka seems well-suited for this kind of down-stream, event processing application.

In this context, it is understandable why Kafka folks are advocating it as an Event Sourcing solution. Because it is quite similar to how it is already used in, for example, click streams. However, people using the term Event Sourcing (as opposed to Stream Processing) are likely referring to the second usage...

Application-controlled source of truth = No

This kind of application declares its own events as a result of user requests passing through business logic. Kafka does not work well in this case for two primary reasons.

Lack of entity isolation

This scenario needs the ability to load the event stream for a specific entity. The common reason for this is to build a transient write model for the business logic to use to process the request. Doing this is impractical in Kafka. Using topic-per-entity could allow this, except this is a non-starter when there may be thousands or millions of entities. This is due to technical limits in Kafka/Zookeeper.

One of the main reasons to use a transient write model in this way is to make business logic changes cheap and easy to deploy.

Using topic-per-type is recommended instead for Kafka, but this would require loading events for every entity of that type just to get events for a single entity. Since you cannot tell by log position which events belong to which entity. Even using Snapshots to start from a known log position, this could be a significant number of events to churn through if structural changes to the snapshot are needed to support logic changes.

Lack of conflict detection

Secondly, users can create race conditions due to concurrent requests against the same entity. It may be quite undesirable to save conflicting events and resolve them after the fact. So it is important to be able to prevent conflicting events. To scale request load, it is common to use stateless services while preventing write conflicts using conditional writes (only write if the last entity event was #x). A.k.a. Optimistic Concurrency. Kafka does not support optimistic concurrency. Even if it supported it at the topic level, it would need to be all the way down to the entity level to be effective. To use Kafka and prevent conflicting events, you would need to use a stateful, serialized writer (per "shard" or whatever is Kafka's equivalent) at the application level. This is a significant architectural requirement/restriction.

Main reason: fitment for problem

added 2021/09/29

Kafka is designed to solve giant-scale data problems. An app-controlled source of truth is a smaller scale, in-depth solution. Using event sourcing to good effect requires crafting events and streams to match the business processes. This usually has a much higher level of detail than would be generally useful to at-scale consumers. Consider if your bank statement contained an entry for every step of a bank's internal transaction processes. A single deposit or withdrawal could have many entries before it is confirmed to your account. The bank needs that level of detail to process transactions. But it's mostly inscrutable bank jargon (domain-specific language) to you, unusable for reconciling your account. Instead, the bank publishes separate events for consumers. These are course-grained summaries of each completed transaction. These summary events are what consumers know as "transactions" on their bank statement.

When I asked myself the same question as the OP, I wanted to know if Kafka was a scaling option for event sourcing. But perhaps a better question is whether it makes sense for my event sourced solution to operate at a giant scale. I can't speak to every case, but I think often it does not. When this scale enters the picture, like with the bank statement example, the granularity of events tends to be different. My event sourced system should probably publish course-grained events to the Kafka cluster to feed at-scale consumers rather than use Kafka as internal storage.

Scale can still be needed for event sourcing. Strategies differ depending on why. Often event streams have a "done" or "no-longer-useful" state. Archiving those streams is a good answer if event size/volume is a problem. Sharding is another option -- a perfect fit for regional- or tenant-isolated scenarios. In less siloed scenarios, when streams are arbitrarily related in a way that can cross shard boundaries, sharding is still the move (partition by stream ID). But there are no order guarantees across streams, which can make the event consumer's job harder. For example, the consumer may receive transaction events before it receives events describing the accounts involved. The first instinct is to "just use timestamps" to order received events. But it is still not possible to guarantee perfect occurrence order. Too many uncontrollable factors. Network hiccups, clock drift, cosmic rays, etc. Ideally you design the consumer to not require cross-stream dependencies. Have a strategy for temporarily missing data. Like progressive enhancement for data. If you really need the data to be unavailable instead of incomplete, use the same tactic. But keep the incomplete data in a separate area or marked unavailable until it's all filled in. You can also just attempt to process each event, knowing it may fail due to missing prerequisites. Put failed events in a retry queue, processing next events, and retry failed events later. But watch out for poison messages (events).

Summary

Can you force Kafka to work for an app-controlled source of truth? Sure if you try hard enough and integrate deeply enough. But is it a good idea? No.


Update per comment

The comment has been deleted, but the question was something like: what do people use for event storage then?

It seems that most people roll their own event storage implementation on top of an existing database. For non-distributed scenarios, like internal back-ends or stand-alone products, it is well-documented how to create a SQL-based event store. And there are libraries available on top of a various kinds databases. There is also EventStoreDB, which is built for this purpose.

In distributed scenarios, I've seen a couple of different implementations. Jet's Panther project uses Azure CosmosDB, with the Change Feed feature to notify listeners. Another similar implementation I've heard about on AWS is using DynamoDB with its Streams feature to notify listeners. The partition key probably should be the stream id for best data distribution (to lessen the amount of over-provisioning). However, a full replay across streams in Dynamo is expensive (read and cost-wise). So this impl was also setup for Dynamo Streams to dump events to S3. When a new listener comes online, or an existing listener wants a full replay, it would read S3 to catch up first.

My current project is a multi-tenant scenario, and I rolled my own on top of Postgres. Something like Citus seems appropriate for scalability, partitioning by tentant+stream.

Kafka is still very useful in distributed scenarios. It is a non-trivial problem to expose each service's key events to other services. An event store is typically not built for that, but it's precisely what Kafka does well. Each service has its own internal source of truth (could be events, BNF, graph, etc), then listens to Kafka to know what is happening "outside". The service posts public events to Kafka to inform the outside of interesting things it encountered.

Solution 4:[4]

You can use Kafka as event store, but I do not recommend doing so, although it might looks like good choice:

  • Kafka only guarantees at least once deliver and there are duplicates in the event store that cannot be removed. Update: Here you can read why it is so hard with Kafka and some latest news about how to finally achieve this behavior: https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
  • Due to immutability, there is no way to manipulate event store when application evolves and events need to be transformed (there are of course methods like upcasting, but...). Once might say you never need to transform events, but that is not correct assumption, there could be situation where you do backup of original, but you upgrade them to latest versions. That is valid requirement in event driven architectures.
  • No place to persist snapshots of entities/aggregates and replay will become slower and slower. Creating snapshots is must feature for event store from long term perspective.
  • Given Kafka partitions are distributed and they are hard to manage and backup compare with databases. Databases are simply simpler :-)

So, before you make your choice you think twice. Event store as combination of application layer interfaces (monitoring and management), SQL/NoSQL store and Kafka as broker is better choice than leaving Kafka handle both roles to create complete feature full solution.

Event store is complex service which requires more than what Kafka can offer if you are serious about applying Event sourcing, CQRS, Sagas and other patterns in event driven architecture and stay high performance.

Feel free to challenge my answer! You might not like what I say about your favorite broker with lots of overlapping capabilities, but still, Kafka wasn't designed as event store, but more as high performance broker and buffer at the same time to handle fast producers versus slow consumers scenarios, for example.

Please look at eventuate.io microservices open source framework to discover more about the potential problems: http://eventuate.io/

Update as of 8th Feb 2018

I don't incorporate new info from comments, but agree on some of those aspects. This update is more about some recommendations for microservice event-driven platform. If you are serious about microservice robust design and highest possible performance in general I will provide you with few hints you might be interested.

  1. Don't use Spring - it is great (I use it myself a lot), but is heavy and slow at the same time. And it is not microservice platform at all. It's "just" a framework to help you implement one (lot of work behind this..). Other frameworks are "just" lightweight REST or JPA or differently focused frameworks. I recommend probably best-in-class open source complete microservice platform available which is coming back to pure Java roots: https://github.com/networknt

If you wonder about performance, you can compare yourself with existing benchmark suite. https://github.com/networknt/microservices-framework-benchmark

  1. Don't use Kafka at all :-)) It is half joke. I mean while Kafka is great, it is another broker centric system. I think future is in broker-less messaging systems. You might be surprised but there are faster then Kafka systems :-), of course you must get down to lower level. Look at Chronicle.

  2. For Event store I recommend superior Postgresql extension called TimescaleDB, which focuses on high performance timeseries data processing (events are timeseries) in large volume. Of course CQRS, Event sourcing (replay, etc. features) are built in light4j framework out of the box which uses Postgres as low storage.

  3. For messaging try to look at Chronicle Queue, Map, Engine, Network. I mean get rid of this old-fashioned broker centric solutions and go with micro messaging system (embedded one). Chronicle Queue is actually even faster than Kafka. But I agree it is not all in one solution and you need to do some development otherwise you go and buy Enterprise version(paid one). In the end the effort to build from Chronicle your own messaging layer will be paid by removing the burden of maintaining the Kafka cluster.

Solution 5:[5]

All the existing answers seem to be quite comprehensive, but there's a terminology issue, which I'd like to resolve in my answer.

What's Event Sourcing?

It seems like if you look at five different places, you get five different answers to that question.

However, if you look at Greg Young's paper from 2010, it summarises the idea quite nicely, from page 32 onwards, but it doesn't contain the ultimate definition, so I dare formulate it myself.

Event Sourcing is a way to persist state. Instead of replacing one state with another as a result of a state mutation, you persist an event that represents that mutation. Therefore, you can always get the current state of the entity by reading all the entity events and applying those state mutations in sequence. By doing that, the current entity state becomes a left fold of all the events for that entity.

What means a "good" event store (database)?

Any persistence mechanism needs to perform two basic operations:

  • Save the new entity state to the database
  • Retrieve the entity state from the database

That's where Greg talks about the concept of entity streams, where each entity has its own stream of events, uniquely identified by the entity id. When you have a database, which is capable of reading all the entity events by the entity id (read the stream), using Event Sourcing is not a hard problem.

As Greg's paper mentions Event Sourcing in the context of CQRS, he explains why those two concepts play nicely with each other. Although, you have a database full of atomic state mutations for a bunch of entities, querying across the current state of multiple entities is hard work. The issue is solved by separating the transactional (event-sourced) store that is used as the source of truth, and the reporting (query, read) store, which is used for reports and queries of the current system state across multiple entities. The query store doesn't contain any events, it contains the projected state of multiple entities, composed based on the needs for querying data. It doesn't necessarily need to contain snapshots of each entity, you are free to choose the shape and form of the query model, as long as you can project your events to that model.

For that reason, a "proper" event database would need to support what we call _real-time subscriptions that would deliver new (and historical, if we need to replay) events to the query model to project.

We also know that we need the entity state in hand when making decisions about its allowed state transition. For example, a money transfer that has already been executed, should not be executed twice. As the query model is by definition stale (even for milliseconds), it becomes dangerous when you make decisions on stale data. Therefore, we use the most recent, and totally consistent state from the transactional (event) store to reconstruct the entity state when executing operations on the entity.

Sometimes, you also want to remove the whole entity from the database, meaning deleting all its events. That could be a requirement, for example, to be GDPR-compliant.

So, what attributes would then be needed for a database sued as an event store to get a decent event-sourced system working? Just a few:

  • Append events to the ordered, append-only log, using entity id as a key
  • Load all the events for a single entity, in an ordered sequence, using the entity id as a key
  • Delete all the events for a given entity, using the entity id as a key
  • Support real-time subscriptions to project events to query models

What is Kafka?

Kafka is a highly-scalable message broker, based on an append-only log. Messages in Kafka are produced to topics, and one topic nowadays often contains a single message type to play nicely with the schema registry. A topic could be something like cpu-load where we produce time-series measurements of the CPU load for many servers.

Kafka topics can be partitioned. Partitioning allows you to produce and consume messages in parallel. Messages are ordered only within a single partition, and you'd normally need to use a predictable partition key, so Kafka can distribute messages across the partitions.

Now, let's go through the checklist:

  • Can you append events to Kafka? Yes, it's called produce. Can you append events with the entity id as a key? Not really, as the partition key is used to distribute messages across partitions, so it's really just a partition key. One thing mentioned in another answer is optimistic concurrency. If you worked with a relational database, you probably used the Version column. For NoSQL databases you might have used the document eTag. Both allow you to ensure that you update the entity that is in the state that you know about, and it hasn't been mutated during your operation. Kafka does not provide you with anything to support optimistic concurrency for such state transitions.
  • Can you read all the events for a single entity from a Kafka topic, using the entity id as a key? No, you can't. As Kafka is not a database, it has no index on its topics, so the only way to retrieve messages from a topic is to consume them.
  • Can you delete events from Kafka using the entity id as a key? No, it's impossible. Messages get removed from the topic only after their retention period expires.
  • Can you subscribe to a Kafka topic to receive live (and historical) events in order, so you can project them to your query models? Yes, and because topics are partitioned, you can scale out your projections to increase performance.

So, why people keep doing it?

I believe that the reason why a lot of people claim that Kafka is a good choice to be an event store for event-sourced systems is that they confuse Event Sourcing with simple pub-sub (you can use a hype word "EDA", or Event-Driven Architecture instead). Using message brokers to fan out events to other system components is a pattern known for decades. The issue with "classic" brokers as that messages are gone as soon as they are consumed, so you cannot build something like a query model that would be built from history. Another issue is that when projecting events, you want them to be consumed in the same order as they are produced, and "classic" brokers normally aim to support the competing consumers pattern, which doesn't support ordered message processing by definition. Make no mistake, Kafka does not support competing consumers, it has a limitation of one consumer per one or more partitions, but not the other way around. Kafka solved the ordering issue, and historical messages retention issue quite nicely. So, you can now build query models from events you push through Kafka. But that's not what the original idea of Event Sourcing is about, it's what we today call EDA. As soon as this separation is clear, we, hopefully, stop seeing claims that any append-only event log is a good candidate to be an event store database for event-sourced systems.

Solution 6:[6]

I think you should look at axon framework along with their support for Kafka

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Alexey Zimarev
Solution 2 Jay Kreps
Solution 3
Solution 4 halfer
Solution 5 Alexey Zimarev
Solution 6 Darshu Bc