'Clickhouse Kafka engine on cluster

I'm playing with Kafka engine on ClickHouse cluster. At the moment ClickHouse 22.1 cluster and Kafka are run in Docker. Here are configurations: https://github.com/apanasevich/clickhouse-etl-cluster

Here are DDL for Kafka integration:

CREATE DATABASE etl ON CLUSTER cluster;

CREATE TABLE etl.tmp ON CLUSTER cluster
(
    `event_timestamp` DateTime64(3, 'Europe/Moscow'),
    `account_id` Int32,
    `session` String,
    `type` Int16
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/etl/tmp', '{replica}')
PARTITION BY toMonday(event_timestamp)
ORDER BY (account_id, event_timestamp)
TTL toDateTime(event_timestamp) + toIntervalDay(90);

CREATE TABLE etl.tmp_kafka_datasource
(
    `topic_data` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:29092', kafka_topic_list = 'Tmp', 
kafka_group_name = 'clickhouse_etl_group', kafka_format = 'JSONAsString', kafka_num_consumers = 1;

CREATE MATERIALIZED VIEW etl.tmp_consumer TO etl.tmp
(
    `event_timestamp` DateTime64(3),
    `account_id` Int32,
    `session` String,
    `type` Int16
) AS
SELECT
    fromUnixTimestamp64Milli(JSONExtract(topic_data, 'time', 'Int64')) AS event_timestamp,
    toInt32(JSON_VALUE(topic_data, '$.data.accountId')) AS account_id,
    JSON_VALUE(topic_data, '$.data.session') AS session,
    toInt16(JSON_VALUE(topic_data, '$.data.type')) AS type
FROM etl.tmp_kafka_datasource;

The problem that there is no data in table tmp despite the fact that it is consumed by VIEW (there are no lags for corresponding consumer group in Kafka).

At the same time data can be successfully inserted directly via INSERT INTO tmp statement and it's replicated onto the second host.

Edited: Data from topic can be successfully consumed directly via select * from etl.tmp_kafka_datasource query.

The same scenario on a single ClickHouse instance works pretty well.

I've deleted volume mapping from docker file and now the only errors are:

2022.03.11 07:39:54.519652 [ 1 ] {} <Warning> Application: Listen [::]:9005 failed: Poco::Exception. Code: 1000, e.code() = 0, DNS error: EAI: Address family for hostname not supported (version 21.11.4.14 (official build)). If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> . Example for disabled IPv4: <listen_host>::</listen_host>
2022.03.11 07:39:54.641049 [ 249 ] {} <Error> virtual bool DB::DDLWorker::initializeMainThread(): Code: 999. Coordination::Exception: All connection tries failed while connecting to ZooKeeper. nodes: 172.27.0.2:2181
Poco::Exception. Code: 1000, e.code() = 111, Connection refused (version 21.11.4.14 (official build)), 172.27.0.2:2181
Poco::Exception. Code: 1000, e.code() = 111, Connection refused (version 21.11.4.14 (official build)), 172.27.0.2:2181
Poco::Exception. Code: 1000, e.code() = 111, Connection refused (version 21.11.4.14 (official build)), 172.27.0.2:2181
 (Connection loss). (KEEPER_EXCEPTION), Stack trace (when copying this message, always include the lines below):

0. DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0x9b605d4 in /usr/bin/clickhouse
1. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error, int) @ 0x134a60d5 in /usr/bin/clickhouse
2. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error) @ 0x134a6416 in /usr/bin/clickhouse
3. Coordination::ZooKeeper::connect(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, Poco::Timespan) @ 0x134e6202 in /usr/bin/clickhouse
4. Coordination::ZooKeeper::ZooKeeper(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Poco::Timespan, Poco::Timespan, Poco::Timespan, std::__1::shared_ptr<DB::ZooKeeperLog>) @ 0x134e475c in /usr/bin/clickhouse
5. zkutil::ZooKeeper::init(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0x134a89e1 in /usr/bin/clickhouse
6. zkutil::ZooKeeper::ZooKeeper(Poco::Util::AbstractConfiguration const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<DB::ZooKeeperLog>) @ 0x134ab12d in /usr/bin/clickhouse
7. void std::__1::allocator<zkutil::ZooKeeper>::construct<zkutil::ZooKeeper, Poco::Util::AbstractConfiguration const&, char const (&) [10], std::__1::shared_ptr<DB::ZooKeeperLog> >(zkutil::ZooKeeper*, Poco::Util::AbstractConfiguration const&, char const (&) [10], std::__1::shared_ptr<DB::ZooKeeperLog>&&) @ 0x11ea3abb in /usr/bin/clickhouse
8. DB::Context::getZooKeeper() const @ 0x11e83856 in /usr/bin/clickhouse
9. DB::DDLWorker::getAndSetZooKeeper() @ 0x11ed5e6c in /usr/bin/clickhouse
10. DB::DDLWorker::initializeMainThread() @ 0x11ee746c in /usr/bin/clickhouse
11. DB::DDLWorker::runMainThread() @ 0x11ed38f4 in /usr/bin/clickhouse
12. ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::DDLWorker::*)(), DB::DDLWorker*>(void (DB::DDLWorker::*&&)(), DB::DDLWorker*&&)::'lambda'()::operator()() @ 0x11ee847a in /usr/bin/clickhouse
13. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0x9ba2697 in /usr/bin/clickhouse
14. ? @ 0x9ba609d in /usr/bin/clickhouse
15. start_thread @ 0x9609 in /usr/lib/x86_64-linux-gnu/libpthread-2.31.so
16. clone @ 0x122293 in /usr/lib/x86_64-linux-gnu/libc-2.31.so
 (version 21.11.4.14 (official build))

2022.03.11 07:42:23.082220 [ 249 ] {eef27851-f848-4b5a-8a95-cf568951cf75} <Warning> etl.tmp (3eb29f08-e712-4705-beb2-9f08e712c705): It looks like the table /clickhouse/tables/01/etl/tmp was created by another server at the same moment, will retry

What I did wrong on cluster?

Edit 2: Also, I've changed tmp table's engine from ReplicatedMergeTree to MergeTree and integration started to work. So, maybe the problem is connected with the incorrect replication setup?



Solution 1:[1]

The problem was that DDL for view was incorrect. Unfortunately, it somehow warked for MergeTree engine but didn't work forReplicatedMergeTree one.

This is the corrected script:

CREATE MATERIALIZED VIEW etl.tmp_consumer TO etl.tmp AS
SELECT ......

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Dmitrii Apanasevich