'Kafka Client Timeout of 60000ms expired before the position for partition could be determined

I'm trying to connect Flink to a Kafka consumer

I'm using Docker Compose to build 4 containers zookeeper, kafka, Flink JobManager and Flink TaskManager.

For zookeeper and Kafka I'm using wurstmeister images, and for Flink I'm using the official image.

docker-compose.yml

version: '3.1'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    hostname: zookeeper
    expose:
      - "2181"
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    hostname: kafka
    links:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_CREATE_TOPICS: 'pipeline:1:1:compact'

  jobmanager:
    build: ./flink_pipeline
    depends_on:
      - kafka
    links:
      - zookeeper
      - kafka
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      JOB_MANAGER_RPC_ADDRESS: jobmanager
      BOOTSTRAP_SERVER: kafka:9092
      ZOOKEEPER: zookeeper:2181

  taskmanager:
    image: flink
    expose:
      - "6121"
      - "6122"
    links:
      - jobmanager
      - zookeeper
      - kafka
    depends_on:
      - jobmanager
    command: taskmanager
    # links:
    #   - "jobmanager:jobmanager"
    environment:
      JOB_MANAGER_RPC_ADDRESS: jobmanager

And When I submit a simple job to Dispatcher the Job fails with the following error:

org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition pipeline-0 could be determined

My Job code is:

public class Main {
    public static void main( String[] args ) throws Exception
    {
        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        Properties properties = new Properties();
        String bootstrapServer = System.getenv("BOOTSTRAP_SERVER");
        String zookeeperServer = System.getenv("ZOOKEEPER");

        if (bootstrapServer == null) {
            System.exit(1);
        }

        properties.setProperty("zookeeper", zookeeperServer);
        properties.setProperty("bootstrap.servers", bootstrapServer);
        properties.setProperty("group.id", "pipeline-analysis");

        FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<String>("pipeline", new SimpleStringSchema(), properties);
        // kafkaConsumer.setStartFromGroupOffsets();
        kafkaConsumer.setStartFromLatest();

        DataStream<String> stream = env.addSource(kafkaConsumer);

        // Defining Pipeline here

        // Printing Outputs
        stream.print();

        env.execute("Stream Pipeline");
    }
}


Solution 1:[1]

I know I'm late to the party but I had the exact same error. In my case, I was not setting up TopicPartitions correctly. My topic had 2 partitions and my producer was producing messages just fine, but it's the spark streaming application, as my consumer, that wasn't really starting and giving up after 60 secs complaining the same error.

Wrong code that I had -

List<TopicPartition> topicPartitionList = Arrays.asList(new topicPartition(topicName, Integer.parseInt(numPartition)));

Correct code -

List<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();

for (int i = 0; i < Integer.parseInt(numPartitions); i++) {
    topicPartitionList.add(new TopicPartition(topicName, i));
}

Solution 2:[2]

I had an error that looks the same.

17:34:37.668 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-3, groupId=api.dev] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on partition assignment
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition aaa-1 could be determined

Turns out it's my hosts file has been changed so the broker address is wrong.

Try this log settings to debug more details.

<logger name="org.apache.kafka.clients.consumer.internals.Fetcher" level="info" />

Solution 3:[3]

I was having issues with this error in a vSphere Integrated Containers environment. For me the problem was that I had advertise on the hostname and not the IP. I had to set the hostname and container name in my compose file.

Here are my settings that finally worked:

kafka:
  depends_on: 
    - zookeeper
  image: wurstmeister/kafka
  ports:
    - "9092:9092"
  mem_limit: 10g
  container_name: kafka
  hostname: kafka
  environment:
    KAFKA_ADVERTISED_LISTENERS: OUTSIDE://kafka:9092
    KAFKA_LISTENERS: OUTSIDE://0.0.0.0:9092
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: OUTSIDE:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: OUTSIDE
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: <REPLACE_WITH_IP>:2181

Solution 4:[4]

I had the same problem, the issue was I had a wrong host entry in /etc/hosts file for kafka node!

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Sanjeev Dhiman
Solution 2 jchnxu
Solution 3 Doug
Solution 4 Prasath Rajan