'apache pulsar : error while consuming message
I am trying to consume from pulsar topic. My client code can be seen below.
But, I am facing NoClassDefFoundError
. Please let me know how to resolve this.
pulsar client version: 2.10, pulsar server version: 2.10, installation type: standalone(not docker), OS: Ubuntu 20.04.4 LTS
My pulsar client code:
private static void consumeFromPulsarAsync() throws Exception {
logger.info("consumeFromPulsarAsync()");
PulsarClient client = PulsarClient.
builder()
.serviceUrl("pulsar://localhost:6650")
.build();
logger.info("consumeFromPulsarAsync() client");
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("persistent://public/default/ack-2")
.consumerName("pulsar-consumer-id-" + Math.random())
.subscriptionName("pulsar-subscription-id-" + Math.random())
.subscriptionType(SubscriptionType.Shared).subscribe();
logger.info("consumeFromPulsarAsync() consumer");
consumer.receiveAsync().thenCompose((msg) -> {
logger.info("consumeFromPulsarAsync() consumed msg :: {}", msg.getValue());
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
return null;
});
}
My pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.company</groupId>
<artifactId>my.work.manager</artifactId>
<version>release-1.0</version>
<description>my Worker Manager</description>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.14.3</flink.version>
<pulsar.version>2.10.0</pulsar.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pulsar</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20210307</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.30</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<compilerArgs>
<arg>-Xlint:all,-options,-path</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.5</version>
<configuration>
<archive>
<manifest>
<mainClass>com.company.my.manager.flink.myWorkManager</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<targetPath>${project.build.outputDirectory}</targetPath>
<includes>
<include>**/*.yml</include>
</includes>
</resource>
</resources>
</build>
</project>
Error it is showing:
2022-05-10 17:52:52,549 WARN org.apache.pulsar.client.impl.MultiTopicsConsumerImpl [] - Encountered error in partition auto update timer task for multi-topic consumer. Another task will be scheduled. java.lang.NoClassDefFoundError: org/apache/pulsar/shade/com/google/common/primitives/Ints at org.apache.pulsar.shade.com.google.common.collect.Lists.computeArrayListCapacity(Lists.java:152) ~[e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.shade.com.google.common.collect.Lists.newArrayListWithExpectedSize(Lists.java:192) ~[e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl$TopicsPartitionChangedListener.onTopicsExtended(MultiTopicsConsumerImpl.java:1238) ~[e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl$1.run(MultiTopicsConsumerImpl.java:1350) [e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715) [e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34) [e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703) [e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790) [e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503) [e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [e8fe35f4-c20c-4c3e-9b81-a3ff1a0ebaec_my.work.manager-release-1.0-jar-with-dependencies.jar:?] at java.lang.Thread.run(Thread.java:829) [?:?]
Solution 1:[1]
What type of cluster? Docker, Standalone, K8, ... Is the Pulsar client and server both 2.10? What JDK for server? for client? Any errors on the server? What server OS? What client OS?
It says WARN, did it run or crash with the missing class? Sometimes on a MAC you will get a WARN on missing SSL and it will work with warnings.
It may be related to this https://github.com/apache/pulsar/issues/9585
Steps to debug:
Create a new topic. Try again. Try to produce and consume with command line client.
Create a single partitioned topic and test with that.
I ran your example with this test
bin/pulsar-client produce --key "test1" "persistent://public/default/ack-2" -m "Test this thing 4" -n 25
public void consumeFromPulsarAsync() throws Exception {
PulsarClient client = PulsarClient.
builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("persistent://public/default/ack-2")
.consumerName("pulsar-consumer-id-" + Math.random())
.subscriptionName("pulsar-subscription-id-" + Math.random())
.subscriptionType(SubscriptionType.Shared).subscribe();
consumer.receiveAsync().thenCompose((msg) -> {
log.info("consumeFromPulsarAsync() consumed msg :: {}", msg.getValue());
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
return null;
});
}
2022-05-11 10:00:47.321 INFO 28575 --- [r-client-io-6-1] o.a.pulsar.client.impl.ConnectionPool : [[id: 0x2005ff4b, L:/192.168.1.63:49386 - R:pulsar1.fios-router.home/192.168.1.230:6650]] Connected to server 2022-05-11 10:00:47.321 INFO 28575 --- [r-client-io-6-1] org.apache.pulsar.client.impl.ClientCnx : [id: 0x2005ff4b, L:/192.168.1.63:49386 - R:pulsar1.fios-router.home/192.168.1.230:6650] Connected through proxy to target broker at 127.0.0.1:6650 2022-05-11 10:00:47.323 INFO 28575 --- [r-client-io-7-1] o.a.pulsar.client.impl.ConsumerImpl : [persistent://public/default/ack-2-partition-2][pulsar-subscription-id-0.6345962322446594] Subscribing to topic on cnx [id: 0xf5af925d, L:/192.168.1.63:49385 - R:pulsar1.fios-router.home/192.168.1.230:6650], consumerId 2 2022-05-11 10:00:47.324 INFO 28575 --- [r-client-io-6-1] o.a.pulsar.client.impl.ConsumerImpl : [persistent://public/default/ack-2-partition-2][pulsar-subscription-id-0.6808890118894377] Subscribing to topic on cnx [id: 0x2005ff4b, L:/192.168.1.63:49386 - R:pulsar1.fios-router.home/192.168.1.230:6650], consumerId 2 2022-05-11 10:00:47.324 INFO 28575 --- [r-client-io-7-1] o.a.pulsar.client.impl.ConsumerImpl : [persistent://public/default/ack-2-partition-1][pulsar-subscription-id-0.6345962322446594] Subscribing to topic on cnx [id: 0xf5af925d, L:/192.168.1.63:49385 - R:pulsar1.fios-router.home/192.168.1.230:6650], consumerId 1 2022-05-11 10:00:47.324 INFO 28575 --- [r-client-io-6-1] o.a.pulsar.client.impl.ConsumerImpl : [persistent://public/default/ack-2-partition-1][pulsar-subscription-id-0.6808890118894377] Subscribing to topic on cnx [id: 0x2005ff4b, L:/192.168.1.63:49386 - R:pulsar1.fios-router.home/192.168.1.230:6650], consumerId 1 2022-05-11 10:00:47.324 INFO 28575 --- [r-client-io-7-1] o.a.pulsar.client.impl.ConsumerImpl : [persistent://public/default/ack-2-partition-0][pulsar-subscription-id-0.6345962322446594] Subscribing to topic on cnx [id: 0xf5af925d, L:/192.168.1.63:49385 - R:pulsar1.fios-router.home/192.168.1.230:6650], consumerId 0 2022-05-11 10:00:47.324 INFO 28575 --- [r-client-io-6-1] o.a.pulsar.client.impl.ConsumerImpl : [persistent://public/default/ack-2-partition-0][pulsar-subscription-id-0.6808890118894377] Subscribing to topic on cnx [id: 0x2005ff4b, L:/192.168.1.63:49386 - R:pulsar1.fios-router.home/192.168.1.230:6650], consumerId 0 2022-05-11 10:00:47.357 INFO 28575 --- [r-client-io-6-1] o.a.pulsar.client.impl.ConsumerImpl : [persistent://public/default/ack-2-partition-0][pulsar-subscription-id-0.6808890118894377] Subscribed to topic on pulsar1.fios-router.home/192.168.1.230:6650 -- consumer: 0 2022-05-11 10:00:47.370 INFO 28575 --- [r-client-io-6-1] o.a.pulsar.client.impl.ConsumerImpl : [persistent://public/default/ack-2-partition-1][pulsar-subscription-id-0.6808890118894377] Subscribed to topic on pulsar1.fios-router.home/192.168.1.230:6650 -- consumer: 1 2022-05-11 10:00:47.370 INFO 28575 --- [r-client-io-7-1] o.a.pulsar.client.impl.ConsumerImpl : [persistent://public/default/ack-2-partition-1][pulsar-subscription-id-0.6345962322446594] Subscribed to topic on pulsar1.fios-router.home/192.168.1.230:6650 -- consumer: 1 2022-05-11 10:00:47.371 INFO 28575 --- [r-client-io-6-1] o.a.pulsar.client.impl.ConsumerImpl : [persistent://public/default/ack-2-partition-2][pulsar-subscription-id-0.6808890118894377] Subscribed to topic on pulsar1.fios-router.home/192.168.1.230:6650 -- consumer: 2 2022-05-11 10:00:47.372 INFO 28575 --- [r-client-io-6-1] o.a.p.c.impl.MultiTopicsConsumerImpl : [persistent://public/default/ack-2] [pulsar-subscription-id-0.6808890118894377] Success subscribe new topic persistent://public/default/ack-2 in topics consumer, partitions: 3, allTopicPartitionsNumber: 3 2022-05-11 10:00:47.394 INFO 28575 --- [r-client-io-7-1] o.a.pulsar.client.impl.ConsumerImpl : [persistent://public/default/ack-2-partition-0][pulsar-subscription-id-0.6345962322446594] Subscribed to topic on pulsar1.fios-router.home/192.168.1.230:6650 -- consumer: 0 2022-05-11 10:00:47.394 INFO 28575 --- [r-client-io-7-1] o.a.pulsar.client.impl.ConsumerImpl : [persistent://public/default/ack-2-partition-2][pulsar-subscription-id-0.6345962322446594] Subscribed to topic on pulsar1.fios-router.home/192.168.1.230:6650 -- consumer: 2 2022-05-11 10:00:47.395 INFO 28575 --- [r-client-io-7-1] o.a.p.c.impl.MultiTopicsConsumerImpl : [persistent://public/default/ack-2] [pulsar-subscription-id-0.6345962322446594] Success subscribe new topic persistent://public/default/ack-2 in topics consumer, partitions: 3, allTopicPartitionsNumber: 3 2022-05-11 10:00:52.529 INFO 28575 --- [t-internal-11-1] a.f.a.consumer.AirQualityConsumerApp : consumeFromPulsarAsync() consumed msg :: Test this thing 4 2022-05-11 10:00:52.529 INFO 28575 --- [nt-internal-9-1] a.f.a.consumer.AirQualityConsumerApp : consumeFromPulsarAsync() consumed msg :: Test this thing 4 ^C2022-05-11 10:00:54.823 INFO 28575 --- [r-client-io-1-1] o.a.pulsar.client.impl.ConsumerImpl : [persistent://public/default/airquality] [airqualitysj1] Closed consumer 2022-05-11 10:00:54.824 INFO 28575 --- [ionShutdownHook] o.a.pulsar.client.impl.PulsarClientImpl : Client closing. URL: pulsar://pulsar1:6650 2022-05-11 10:00:54.826 INFO 28575 --- [r-client-io-1-1] org.apache.pulsar.client.impl.ClientCnx : [id: 0x4d247f3d, L:/192.168.1.63:49381 ! R:pulsar1.fios-router.home/192.168.1.230:6650] Disconnected 2022-05-11 10:00:54.829 INFO 28575 --- [r-client-io-1-1] org.apache.pulsar.client.impl.ClientCnx : [id: 0x5fa81a8b, L:/192.168.1.63:49382 ! R:pulsar1.fios-router.home/192.168.1.230:6650] Disconnected
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |