'How to write Debezium Connector Configuration and Debezium Listener for mongoDB in spring-boot and before, after payload as JSON?

Could anyone please provide the sample code in spring-boot for Debezium connector Listeners class and configuration class for MongoDB which provide the changed data payload in Json. Thanks in advance!

Here's my Debezium Connector Listner:

@Slf4j
@Component
public class DebeziumListener {

    private final Executor executor = Executors.newSingleThreadExecutor();
    private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;

    public DebeziumListener(Configuration mongoConnector) {
        this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
                .using(mongoConnector.asProperties()).notifying(t -> {
                    try {
                        log.error("Running Debezium Engine.");
                        handleChangeEvent(t);
                    } catch (Exception e) {
                        log.error("Error on running Debezium Engine.");
                        log.error(e.getMessage());
                    }
                }).build();
    }

    /**
     * It Records any Change Event in database based on DebeziumConnectorConfig.
     * 
     * @param sourceRecordChangeEvent
     * @throws IOException
     */
    private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordChangeEvent) throws IOException {
        SourceRecord sourceRecord = sourceRecordChangeEvent.record();
        Struct sourceRecordChangeValue = (Struct) sourceRecord.value();

        if (sourceRecordChangeValue != null) {
            Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
            if (operation != Operation.READ) {
                // Handling Update & Insert operations.
                String records = operation == Operation.DELETE ? BEFORE : AFTER;
                Struct struct = (Struct) sourceRecordChangeValue.get(records);
                Map<String, Object> payload = struct.schema().fields().stream().map(Field::name)
                        .filter(fieldName -> struct.get(fieldName) != null)
                        .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                        .collect(toMap(Pair::getKey, Pair::getValue));
                log.info("Changes have been detected for product id: {} with Operation: {}",
                        Optional.ofNullable(payload).map(p -> p.get("productId")).orElse(null), operation.name());
                log.info(payload);
                log.debug("Updated Data: {} with Operation: {}", payload, operation.name());
            }
        }
    }

    @PostConstruct
    private void start() {
        this.executor.execute(debeziumEngine);

    }

    @PreDestroy
    private void stop() throws IOException {
        if (this.debeziumEngine != null) {
            this.debeziumEngine.close();
        }
    }
}

and here is the connector config file:

@Configuration
public class DebeziumConnectorConfig {

    @Bean
    public io.debezium.config.Configuration mongoConnector() throws IOException {
        File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
        File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
        return io.debezium.config.Configuration.create().
with("name", "mongo-connector")
                .with("connector.class", "io.debezium.connector.mongodb.MongoDbConnector")
                .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
                .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
                .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
                .with("mongodb.members.auto.discover", "false")
                .with("tasks.max", "1")
                .with("mongodb.hosts", "rs0/localhost,27017")
                .with("mongodb.name", "products_topic")
                .with("errors.log.include.messages", "true")
                .with("capture.mode", "change_streams_update_full")
                .with("collection.include.list", "myCollection")
                .with("database.include.list", "myDb")
                .with("schema", "true")
                .with("heartbeat.interval.ms", 0)
                .with("database.history.kafka.bootstrap.servers" , "kafka:9092")
                .with( "transforms", "route")
                .with( "transforms.route.type" , "org.apache.kafka.connect.transforms.RegexRouter")
                 .with( "transforms.route.regex" , "([^.]+)\\.([^.]+)\\.([^.]+)")
                 .with( "transforms.route.replacement" , "$3")
                .build();
    }
}

and after the application startup server keeps on logging the below logs:

7 2022-05-13 09:01:14.455 INFO 15452 --- [ica-set-monitor] i.d.c.mongodb.ReplicaSetDiscovery : Checking current members of replica set at rs0/localhost,27017 2022-05-13 09:01:14.456 INFO 15452 --- [ica-set-monitor] i.d.c.mongodb.ReplicaSetDiscovery : Checking current members of replica set at rs0/localhost,27017



Solution 1:[1]

I'm fairly certain you have a typo in your mongodb.hosts configuration property. It should have a value of:

rs0/localhost:27017

(Note the : rather than the , you have in your question).

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Naros