'Streams Processor key-value types

I'm trying to implement a custom topology processing step implementing the Processor interface and then adding an instance of my custom processor to the topology via KStream.process, however, this always returns void and only allows Processor<KIn,VIn,Void,Void> but I would like to add more processing steps after. My doubts are:

  1. Can another processing step be added after KStream.process?
  2. How should a custom processor with specific output types be used?


Solution 1:[1]

This was recently addressed by KIP-820 (in this PR), by changing the signatures from:

void KStream#process(ProcessorSupplier<K, V, Void, Void> processorSupplier, ...)

to

KStream<KOut,VOut> KStream#process(ProcessorSupplier<K, V, KOut, VOut> processorSupplier, String... stateStoreNames)

It'll be available in version 3.3 from the looks of things

Solution 2:[2]

If you'd like to add more steps, you'd add a new Processor to your Topology, after you've forwarded records through the context of a previous one.

In other words, define the init method with the output types for the ProcessorContext, and store the context as a field that you can use in the process method

Or you can use the DSL map method to do the same

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Luke
Solution 2 OneCricketeer