'KTable-KTable foreign-key join not producing all messages when topics have more than one partition

See Update below to show potential workaround

Our application consumes 2 topics as KTables, performs a left join, and outputs to a topic. During testing, we found that this works as expected when our output topic has only 1 partition. When we increase the number of partitions, we notice that the number of messages that get produced to the output topic decreases.

We tested this theory with multiple partition configurations prior to starting the app. With 1 partition, we see 100% of the messages. With 2, we see some messages (less than 50%). With 10, we see barely any (less than 10%).

Because we are left joining, every single message that is consumed from Topic 1 should get written to our output topic, but we're finding that this is not happening. It seems like messages are getting stuck in the "intermediate" topics created from the foreign key join of the Ktables, but there are no error messages.

Any help would be greatly appreciated!

Service.java

@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {

    return (topicOne, topicTwo) ->
            topicOne
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();
}

build.gradle

plugins {
    id 'org.springframework.boot' version '2.3.1.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}

...

ext {
    set('springCloudVersion', "Hoxton.SR6")
}

...

implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'

Note: We are excluding the org.apache.kafka dependencies due to a bug in the versions included in spring-cloud-stream

application.yml

spring:
  application:
    name: app-name
    stream:
      bindings:
        process-in-0:
          destination: topic1
          group: ${spring.application.name}
        process-in-1:
          destination: topic2
          group: ${spring.application.name}
        process-out-0:
          destination: outputTopic
      kafka:
        streams:
          binder:
            applicationId: ${spring.application.name}
            brokers: ${KAFKA_BROKERS}
            configuration:
              commit.interval.ms: 1000
              producer:
                acks: all
                retries: 20
              default:
                key:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
                value:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            min-partition-count: 2

Test Scenario:

To provide a concrete example, if I publish the following 3 messages to Topic 1:

{"fieldA": 1, "fieldB": 1},,{"fieldA": 1, "fieldB": 1}
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
{"fieldA": 4, "fieldB": 4},,{"fieldA": 4, "fieldB": 4}

The output topic will only receive 2 messages.

{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}

What happened to the other 2? It seems certain key/value pairs are just unable to get written to the output topic. Retrying these "lost" messages does not work either.

Update:

I was able to get this functioning properly by consuming Topic 1 as a KStream instead of a KTable and calling toTable() before going on to do the KTable-KTable join. I am still not sure why my original solution does not work, but hopefully this workaround can shed some light on the actual issue.

@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {

    return (topicOne, topicTwo) ->
            topicOne
                    .map(...)
                    .toTable()
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();
}


Solution 1:[1]

Given the description of the problem, it seems that the data in the (left) KTable input topic is not correctly partitioned by its key. For a single partitioned topic, well, there is only one partition and all data goes to this one partition and the join result is complete.

However, for a multi-partitioned input topic, you need to ensure that the data is partitioned by key, otherwise, two records with the same key might end up in different partitions and thus the join fails (as the join is done on a per-partition basis).

Note that even if a foreign-key join does not require that both input topics are co-partitioned, it is still required that each input topic itself is partitioned by its key!

If you use a map().toTable() you basically trigger an internal repartitioning of the data that ensures that the data gets partitioned by the key, and this fixes the problem.

Solution 2:[2]

I had a similar issue. I have two incoming KStreams, which I converted to KTables, and performed a KTable-KTable FK join. Kafka streams produced absolutely no records, the joined were never performed.

Repartitioning the KStreams didn't work for me. Instead I had to manually set the partition size to 1.

Here's a stripped down example of what doesn't work:

Note I'm using Kotlin, with some extension helper functions

fun enrichUsersData(
  userDataStream: KStream<UserId, UserData>,
  environmentDataStream: KStream<RealmId, EnvironmentMetaData>,
) {

  // aggregate all users on a server into an aggregating DTO
  val userDataTable: KTable<ServerId, AggregatedUserData> =
    userDataStream
      .groupBy { _: UserId, userData: UserData -> userData.serverId }
      .aggregate({ AggregatedUserData }) { serverId: ServerId, userData: UserData, usersAggregate: AggregatedUserData ->
        usersAggregate
          .addUserData(userData)
          .setServerId(serverId)
        return@aggregate usersAggregate
      }

  // convert all incoming environment data into a KTable
  val environmentDataTable: KTable<RealmId, EnvironmentMetaData> =
    environmentDataStream
      .toTable()

  // Now, try to enrich the user's data with the environment data
  // the KTable-KTable FK join is correctly configured, but...
  val enrichedUsersData: KTable<ServerId, AggregatedUserData> =
    userDataTable.join(
      other = environmentDataTable,
      tableJoined = tableJoined("enrich-user-data.join"),
      materialized = materializedAs(
        "enriched-user-data.store",
        jsonMapper.serde(),
        jsonMapper.serde(),
      ),
      foreignKeyExtractor = { usersData: AggregatedUserData -> usersData.realmId },
    ) { usersData: AggregatedUserData, environmentData: EnvironmentMetaData ->
      usersData.enrichUserData(environmentData)
      // this join is never called!!
      return@join usersData
    }
}

If I manually set the partition size to 1, then it works.

fun enrichUsersData(
  userDataStream: KStream<UserId, UserData>,
  environmentDataStream: KStream<RealmId, EnvironmentMetaData>,
) {

  // manually set the partition size to 1 *before* creating the table
  val userDataTable: KTable<ServerId, AggregatedUserData> =
    userDataStream
      .repartition(
        repartitionedAs(
          "user-data.pre-table-repartition",
          jsonMapper.serde(),
          jsonMapper.serde(),
          numberOfPartitions = 1,
        )
      )
      .groupBy { _: UserId, userData: UserData -> userData.serverId }
      .aggregate({ AggregatedUserData }) { serverId: ServerId, userData: UserData, usersAggregate: AggregatedUserData ->
        usersAggregate
          .addUserData(userData)
          .setServerId(serverId)
        return@aggregate usersAggregate
      }

  // again, manually set the partition size to 1 *before* creating the table
  val environmentDataTable: KTable<RealmId, EnvironmentMetaData> =
    environmentDataStream
      .repartition(
        repartitionedAs(
          "environment-metadata.pre-table-repartition",
          jsonMapper.serde(),
          jsonMapper.serde(),
          numberOfPartitions = 1,
        )
      )
      .toTable()

  // this join now works as expected!
  val enrichedUsersData: KTable<ServerId, AggregatedUserData> =
    userDataTable.join(
      other = environmentDataTable,
      tableJoined = tableJoined("enrich-user-data.join"),
      materialized = materializedAs(
        "enriched-user-data.store",
        jsonMapper.serde(),
        jsonMapper.serde(),
      ),
      foreignKeyExtractor = { usersData: AggregatedUserData -> usersData.realmId },
    ) { usersData: AggregatedUserData, environmentData: EnvironmentMetaData ->
      usersData.enrichUserData(environmentData)
      return@join usersData
    }
}

Solution 3:[3]

Selecting the key on joined topic might help. Partition configuration of topics should be same.

return (topicOne, topicTwo) ->
        topicOne
            .leftJoin(topicTwo,
                value -> MyOtherKey.newBuilder()
                    .setFieldA(value.getFieldA())
                    .setFieldB(value.getFieldB())
                    .build(),
                this::enrich)
            .toStream().selectKey((key, value) -> key);

Solution 4:[4]

This is a strange issue, I have never heard of a number of output topic partitions controlling the data write frequency. However I do know that toStream() writes the data to downstream only when the cache is full, so try setting cache.max.bytes.buffering = 0. Also, KTable keeps only the latest record for each key, so if you have multiple values against the same key, only latest value would stay and be written downstream.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Matthias J. Sax
Solution 2 aSemy
Solution 3 user3500408
Solution 4 Dharman