'Akka: persist to Cassandra and publish to Kafka multiple events
I need to store to Cassandra and publish to Kafka multiple events, and call some final handler()
only after all events are stored and published.
I came across Update actor state only after all events are persisted approach, but it doesn't cover a case when events should be published to Kafka as well.
There is kafka publisher and base aggregate root actor that processes multiple events and then calls a handler()
(it is typically used to return response from the actor):
abstract class AggregateRootActor () extends ActorPersistence {
def processEvents(events: Seq[Event])(handler: Event => Unit): Unit = {
persistAll(events) { persistedEvent =>
state = //updateActorState
//publish messages to kafka
val futureResult = publisher.publishToKafka(event)
// where to switch context to handle `EventProcessingCompleted` after all events are
// published?
context.become {
case EventProcessingCompleted => handler(persistedEvent)
case ... //
}
}
self ! EventProcessingCompleted
}
}
Any suggested solutions are welcome!
Solution 1:[1]
I would structure it like this, assuming that you don't want the actor to reply until the event has been persisted to Cassandra (for future rehydration) and to Kafka (presumably for broadcast to other systems)
// includes the event and anything else you'd want the handler to have,
// e.g. where to send replies
case class EventProcessingCompleted(...)
persistAll(events) { persistedEvent =>
state = ???
// Other state changes (e.g. becomes) here
publisher.publishToKafka(event).map(_ => EventProcessingCompleted(event)).pipeTo(self)
}
An alternative, which is perhaps more honest about the consistency tradeoffs would be to do the Kafka production by having the actor set up a stream from Akka Persistence Query to the Kafka producer along these lines:
val readJournal = PersistenceQuery(actorSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
// Spin this up after recovery has completed
val kafkaProductionStream =
readJournal.eventsByPersistenceId(actorId, state.lastIdToKafka, Long.MaxValue)
.mapAsync(1) { eventEnvelope =>
publisher.publishToKafka(eventEnvelope._4.asInstanceOf[???]).map(_ => eventEnvelope._3)
}
.mapAsync(1) { sequenceNr =>
self ? RecordKafkaProductionFor(sequenceNr)
}
// run the stream etc.
// persist the highwater mark for sequence numbers produced to Kafka and update state
// can now consider persistence to Cassandra to imply production to Kafka, so
// can reply after persist to Cassandra
To tighten up the guarantees around production to Kafka, it might be useful to have a component (could be a cluster singleton or sharded) of the application which tracks when persistence IDs have been loaded and loads the least recently used persistence IDs to ensure that the query stream runs.
Solution 2:[2]
Actually at the moment there is component from Akka to realise this
I think that is what you want, after successfully persisting Events to Cassandra then publish to Kafka.
If you want to see how the Akka Projections functions and how to implement, I wrote a blog about it, you can find the implementation details there.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Levi Ramsey |
Solution 2 | posthumecaver |