'kafka-stream aggregation on multiple inputs with cogroup and filter

I'm trying to implement a kafka-stream aggregation on multiple (4) input topics.

Let's the topics are: A, B, C, D;

The topology should:

  • pull 2 single messages from A and B, apply the aggregation, apply a filter, store on KTable
  • pull N messages from C and D, apply the aggregation, store on KTable

The Aggregator code is not provided, but the behaviour is:

  • message from B contains a value, we call X
  • n messages from C and D are handled as counters increment, and the aggregated object should do +1 to counter from C and +1 to counter from D and the final
  • the filter should verify that X = C_counter + D_counter
  • when the equation is verified, store on KTable
  • finally do something after filter/storage

Here the code snippet:

private Topology buildTopology() {
    StreamsBuilder streamsBuilder = new StreamsBuilder();

    // create the 4 streams, reading strings
    KStream<String, String> streamA_AsString = streamsBuilder.stream(DemoTopic_A);
    KStream<String, String> streamC_AsString = streamsBuilder.stream(DemoTopic_C);
    KStream<String, String> streamB_AsString = streamsBuilder.stream(DemoTopic_B);
    KStream<String, String> streamD_AsString = streamsBuilder.stream(DemoTopic_D);

    // map the strings to java object (the entity used for aggregation)
    KStream<String, DemoEntity> streamA = streamA_AsString.map(demoKeyValueMapper);
    KStream<String, DemoEntity> streamC = streamC_AsString.map(demoKeyValueMapper);
    KStream<String, DemoEntity> streamB = streamB_AsString.map(demoKeyValueMapper);
    KStream<String, DemoEntity> streamD = streamD_AsString.map(demoKeyValueMapper);

    // group the message/object by key
    final KGroupedStream<String, DemoEntity> streamA_Grouped = streamA.groupByKey();
    final KGroupedStream<String, DemoEntity> streamProgressGrouped = streamC.groupByKey();
    final KGroupedStream<String, DemoEntity> streamPushingGrouped = streamB.groupByKey();
    final KGroupedStream<String, DemoEntity> streamErrorGrouped = streamD.groupByKey();

    // instance the aggregator
    DemoAggregator demoAggregator = new DemoAggregator();

    // build the aggregation chain
    // using cogroup to group previous kgrouped, providing the aggregator
    streamA_Grouped
        .cogroup(demoAggregator)
        .cogroup(streamProgressGrouped, demoAggregator)
        .cogroup(streamPushingGrouped, demoAggregator)
        .cogroup(streamErrorGrouped, demoAggregator)
        // provide the initializer
        .aggregate(demoInitializer)
        // apply the filter and, at same time, store into KTable
        .filter(isCompleted, Named.as(DemoCompletionStorageTableName))
        // transform to stateless KStream for further usage
        // from here, no more stateful by changelog
        .toStream()
        .foreach((key, value) -> {
            // use values
            log.info("here we would use values for: { key:{}, message:{} }", () -> key, () -> value);
        });

    return streamsBuilder.build();
}

Unfortunately the topology won't start, and this is the error:

Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: Processor COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition-filter is already added.

It seems it already added that COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition-filter into an object NodeFactory, and so the exception. The class from Kafka dependency is "InternalTopologyBuilder", on method "addProcessor".

Searching on Google that error string I found only the source code of KafkaStreams... no other stackoverflow question, nor forum, nothing....

Any idea?

Thanks in advance



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source