'How to write Debezium Connector Configuration and Debezium Listener for mongoDB in spring-boot and before, after payload as JSON?
Could anyone please provide the sample code in spring-boot for Debezium connector Listeners class and configuration class for MongoDB which provide the changed data payload in Json. Thanks in advance!
Here's my Debezium Connector Listner:
@Slf4j
@Component
public class DebeziumListener {
private final Executor executor = Executors.newSingleThreadExecutor();
private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;
public DebeziumListener(Configuration mongoConnector) {
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(mongoConnector.asProperties()).notifying(t -> {
try {
log.error("Running Debezium Engine.");
handleChangeEvent(t);
} catch (Exception e) {
log.error("Error on running Debezium Engine.");
log.error(e.getMessage());
}
}).build();
}
/**
* It Records any Change Event in database based on DebeziumConnectorConfig.
*
* @param sourceRecordChangeEvent
* @throws IOException
*/
private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordChangeEvent) throws IOException {
SourceRecord sourceRecord = sourceRecordChangeEvent.record();
Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
if (sourceRecordChangeValue != null) {
Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
if (operation != Operation.READ) {
// Handling Update & Insert operations.
String records = operation == Operation.DELETE ? BEFORE : AFTER;
Struct struct = (Struct) sourceRecordChangeValue.get(records);
Map<String, Object> payload = struct.schema().fields().stream().map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
log.info("Changes have been detected for product id: {} with Operation: {}",
Optional.ofNullable(payload).map(p -> p.get("productId")).orElse(null), operation.name());
log.info(payload);
log.debug("Updated Data: {} with Operation: {}", payload, operation.name());
}
}
}
@PostConstruct
private void start() {
this.executor.execute(debeziumEngine);
}
@PreDestroy
private void stop() throws IOException {
if (this.debeziumEngine != null) {
this.debeziumEngine.close();
}
}
}
and here is the connector config file:
@Configuration
public class DebeziumConnectorConfig {
@Bean
public io.debezium.config.Configuration mongoConnector() throws IOException {
File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
return io.debezium.config.Configuration.create().
with("name", "mongo-connector")
.with("connector.class", "io.debezium.connector.mongodb.MongoDbConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
.with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
.with("mongodb.members.auto.discover", "false")
.with("tasks.max", "1")
.with("mongodb.hosts", "rs0/localhost,27017")
.with("mongodb.name", "products_topic")
.with("errors.log.include.messages", "true")
.with("capture.mode", "change_streams_update_full")
.with("collection.include.list", "myCollection")
.with("database.include.list", "myDb")
.with("schema", "true")
.with("heartbeat.interval.ms", 0)
.with("database.history.kafka.bootstrap.servers" , "kafka:9092")
.with( "transforms", "route")
.with( "transforms.route.type" , "org.apache.kafka.connect.transforms.RegexRouter")
.with( "transforms.route.regex" , "([^.]+)\\.([^.]+)\\.([^.]+)")
.with( "transforms.route.replacement" , "$3")
.build();
}
}
and after the application startup server keeps on logging the below logs:
7 2022-05-13 09:01:14.455 INFO 15452 --- [ica-set-monitor] i.d.c.mongodb.ReplicaSetDiscovery : Checking current members of replica set at rs0/localhost,27017 2022-05-13 09:01:14.456 INFO 15452 --- [ica-set-monitor] i.d.c.mongodb.ReplicaSetDiscovery : Checking current members of replica set at rs0/localhost,27017
Solution 1:[1]
I'm fairly certain you have a typo in your mongodb.hosts
configuration property. It should have a value of:
rs0/localhost:27017
(Note the :
rather than the ,
you have in your question).
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Naros |