'Not able to create kafka topics with desired partitions using Kafka Admin Client API
I'm using Kafka Admin client API's to create the topic. The topic is getting created, however the topic is getting created with 1 partition by default. The API is not honoring the configurable value provided. Not sure if I'm using it correctly.
Note: Topic creation is enabled at broker level. Also the topic is getting created, but it is getting created with partition 1.
NewTopic newTopic = new NewTopic(TOPIC_NAME, 10, (short) 1);
CreateTopicsResult createTopicsResult = null;
try {
createTopicsResult = KafkaAdminClient.create(getAdminProperties()).createTopics(Collections.singletonList(newTopic));
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
However I'm able to increase the partitions of earlier created topics using Kafka Admin Client API's
Solution 1:[1]
I tried to reproduce this, without success, using the following code:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class AdminApiDemo {
private static final String BOOTSRAP_SERVER = "localhost:9092";
private static final String TOPIC_NAME = "demoTopic";
private static final int NUM_PARTITIONS = 3;
private static final short NUM_REPLICAS = 1;
private final AdminClient adminClient;
private AdminApiDemo(Properties properties) {
this.adminClient = KafkaAdminClient.create(properties);
}
public static void main(String[] args) {
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSRAP_SERVER);
new AdminApiDemo(properties).createTopic(TOPIC_NAME, NUM_PARTITIONS, NUM_REPLICAS);
}
private void createTopic(String topicName, int numPartitions, short numReplicas) {
try {
final NewTopic newTopic = new NewTopic(topicName, numPartitions, numReplicas);
final CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));
result.values().get(topicName).get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
}
}
A kafka-topics --describe
showed the following:
root@kafka:/# kafka-topics --bootstrap-server localhost:9092 --describe --topic demoTopic
Topic:demoTopic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: demoTopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: demoTopic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: demoTopic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
I thought, ok, what if the topic maybe exists before creation, but then again I got an java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'demoTopic' already exists.
, so that could not be your case either.
I know this is no "real" answer, that fixes anything, sorry for that. But I hope it helps, anyway. Maybe someone else can use this to reproduce it in his setting and "sees" the problem.
Solution 2:[2]
You forgot to call .get() on createTopicsResult to wait for a response to your query.
Try this code:
createTopicsResult.values().get(TOPIC_NAME).get()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Jan Held |
Solution 2 | Smirnov Alexander |