'Debezium only reports change events at startup
I am using Debezium Spring Boot starter with its supplier:
org.springframework.boot:spring-boot-starter-parent:2.4.10
org.springframework.cloud.fn:cdc-debezium-boot-starter:1.0.3
org.springframework.cloud.fn:cdc-debezium-supplier:1.0.3
I have configured the SQLServer to enable CDC on database and also on the table to capture by following Debezium documentation:
-- enable CDC on database
USE my_db;
exec sys.sp_cdc_enable_db;
-- add filegroup to store CDC data
ALTER DATABASE my_db;
ADD FILEGROUP CDC_DATA;
-- enable CDC on ACTIONS table
USE my_db;
EXEC sys.sp_cdc_enable_table
@source_schema=N'dbo',
@source_name=N'ACTIONS',
@role_name = NULL,
@filegroup_name=N'CDC_DATA',
@supports_net_changes=0
-- check if it is enabled
EXEC sys.sp_cdc_help_change_data_capture;
It returns:
source_schema | source_table | capture_instance | object_id | source_object_id | start_lsn | end_lsn | supports_net_changes | has_drop_pending | role_name | index_name | filegroup_name | create_date | index_column_list | captured_column_list |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
dbo | ACTIONS | dbo_ACTIONS | 843150049 | 725577623 | 0x000000300000408B003E | NULL | false | NULL | NULL | PK__ACTIONS__546A96891DBD0AB8 | CDC_DATA | 2021-09-30 03:48:03.493 | [ACTIONID] | [ACTIONID], [TMSTAMP], [APUSER], [OSUSER], [COMMENT], [STATUS] |
So the CDC seems to be enabled for this table.
I created a Spring bean that subscribes to the Debezium flux:
@Data
@Slf4j
@Service
public class LogDatabaseChanges {
private final Supplier<Flux<Message<?>>> cdcSupplier;
@PostConstruct
public void logChangeEvents() {
cdcSupplier.get().subscribe(t -> {
log.info("#### {}", new String((byte[]) t.getPayload(), StandardCharsets.UTF_8));
});
}
}
When I start my Spring Boot application, Debezium creates a snapshot and detects the row changes. Changes are then written in logs.
However, when I then update data by either inserting a new row or update a value, nothing is triggered.
Here is a sample of application logs:
d.c.s.SqlServerSnapshotChangeEventSource : No previous offset has been found
d.c.s.SqlServerSnapshotChangeEventSource : According to the connector configuration both schema and data will be snapshotted
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 1 - Preparing
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 2 - Determining captured tables
i.d.r.history.DatabaseHistoryMetrics : Unable to register the MBean 'debezium.sql_server:type=connector-metrics,context=schema-history,server=foo': debezium.sql_server:type=connector-metrics,context=schema-history,server=foo
io.debezium.util.Threads : Requested thread factory for connector SqlServerConnector, id = foo named = change-event-source-coordinator
io.debezium.util.Threads : Creating thread debezium-sqlserverconnector-foo-change-event-source-coordinator
i.d.p.ChangeEventSourceCoordinator : Unable to register the MBean 'debezium.sql_server:type=connector-metrics,context=snapshot,server=foo': debezium.sql_server:type=connector-metrics,context=snapshot,server=foo
i.d.p.ChangeEventSourceCoordinator : Unable to register the MBean 'debezium.sql_server:type=connector-metrics,context=streaming,server=foo': debezium.sql_server:type=connector-metrics,context=streaming,server=foo
i.d.p.ChangeEventSourceCoordinator : Metrics registered
i.d.p.ChangeEventSourceCoordinator : Context created
d.c.s.SqlServerSnapshotChangeEventSource : No previous offset has been found
d.c.s.SqlServerSnapshotChangeEventSource : According to the connector configuration both schema and data will be snapshotted
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 1 - Preparing
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 2 - Determining captured tables
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 3 - Locking captured tables
d.c.s.SqlServerSnapshotChangeEventSource : Setting locking timeout to 10 s
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 3 - Locking captured tables
d.c.s.SqlServerSnapshotChangeEventSource : Setting locking timeout to 10 s
d.c.s.SqlServerSnapshotChangeEventSource : Executing schema locking
d.c.s.SqlServerSnapshotChangeEventSource : Locking table server_ListeriaWGS.dbo.ACTIONS
d.c.s.SqlServerSnapshotChangeEventSource : Executing schema locking
d.c.s.SqlServerSnapshotChangeEventSource : Locking table server_ListeriaWGS.dbo.ACTIONS
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 4 - Determining snapshot offset
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 5 - Reading structure of captured tables
d.c.s.SqlServerSnapshotChangeEventSource : Reading structure of schema 'server_ListeriaWGS'
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 6 - Persisting schema history
i.d.c.sqlserver.SqlServerConnectorTask : The Kafka Connect schema name 'foo.dbo.ACTIONS.Value' is not a valid Avro schema name, so replacing with 'EC2AMAZ_DU501KO.dbo.ACTIONS.Value'
i.d.c.sqlserver.SqlServerConnectorTask : The Kafka Connect schema name 'foo.dbo.ACTIONS.Key' is not a valid Avro schema name, so replacing with 'EC2AMAZ_DU501KO.dbo.ACTIONS.Key'
i.d.c.sqlserver.SqlServerConnectorTask : The Kafka Connect schema name 'foo.dbo.ACTIONS.Envelope' is not a valid Avro schema name, so replacing with 'EC2AMAZ_DU501KO.dbo.ACTIONS.Envelope'
d.c.s.SqlServerSnapshotChangeEventSource : Schema locks released.
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 7 - Snapshotting data
.d.r.RelationalSnapshotChangeEventSource : Exporting data from table 'server_ListeriaWGS.dbo.ACTIONS'
.d.r.RelationalSnapshotChangeEventSource : For table 'server_ListeriaWGS.dbo.ACTIONS' using select statement: 'SELECT [ACTIONS].[ACTIONID],[ACTIONS].[TMSTAMP],[ACTIONS].[APUSER],[ACTIONS].[OSUSER],[ACTIONS].[COMMENT],[ACTIONS].[STATUS] FROM [dbo].[ACTIONS]'
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 4 - Determining snapshot offset
c.b.n.d.c.a.LogDatabaseChanges : {"databaseName":"server_ListeriaWGS"}
c.b.n.d.c.a.LogDatabaseChanges : {"source":{"version":"1.3.0.Final","connector":"sqlserver","name":"foo","ts_ms":1632978886876,"snapshot":"true","db":"server_ListeriaWGS","schema":"dbo","table":"ACTIONS","change_lsn":null,"commit_lsn":"00000030:00002e0a:0028","event_serial_no":null},"databaseName":"server_ListeriaWGS","schemaName":"dbo","ddl":null,"tableChanges":[{"type":"CREATE","id":"\"server_ListeriaWGS\".\"dbo\".\"ACTIONS\"","table":{"defaultCharsetName":null,"primaryKeyColumnNames":["ACTIONID"],"columns":[{"name":"ACTIONID","jdbcType":4,"nativeType":null,"typeName":"int","typeExpression":"int","charsetName":null,"length":10,"scale":0,"position":1,"optional":false,"autoIncremented":false,"generated":false},{"name":"TMSTAMP","jdbcType":12,"nativeType":null,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":40,"scale":null,"position":2,"optional":true,"autoIncremented":false,"generated":false},{"name":"APUSER","jdbcType":12,"nativeType":null,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":80,"scale":null,"position":3,"optional":true,"autoIncremented":false,"generated":false},{"name":"OSUSER","jdbcType":12,"nativeType":null,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":80,"scale":null,"position":4,"optional":true,"autoIncremented":false,"generated":false},{"name":"COMMENT","jdbcType":12,"nativeType":null,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":150,"scale":null,"position":5,"optional":true,"autoIncremented":false,"generated":false},{"name":"STATUS","jdbcType":4,"nativeType":null,"typeName":"int","typeExpression":"int","charsetName":null,"length":10,"scale":0,"position":6,"optional":true,"autoIncremented":false,"generated":false}]}}]}
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 5 - Reading structure of captured tables
d.c.s.SqlServerSnapshotChangeEventSource : Reading structure of schema 'server_ListeriaWGS'
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 6 - Persisting schema history
i.d.c.sqlserver.SqlServerConnectorTask : The Kafka Connect schema name 'foo.dbo.ACTIONS.Value' is not a valid Avro schema name, so replacing with 'EC2AMAZ_DU501KO.dbo.ACTIONS.Value'
i.d.c.sqlserver.SqlServerConnectorTask : The Kafka Connect schema name 'foo.dbo.ACTIONS.Key' is not a valid Avro schema name, so replacing with 'EC2AMAZ_DU501KO.dbo.ACTIONS.Key'
i.d.c.sqlserver.SqlServerConnectorTask : The Kafka Connect schema name 'foo.dbo.ACTIONS.Envelope' is not a valid Avro schema name, so replacing with 'EC2AMAZ_DU501KO.dbo.ACTIONS.Envelope'
o.s.c.f.common.cdc.CdcAutoConfiguration : [CDC Event]: SourceRecord{sourcePartition=null, sourceOffset={commit_lsn=00000030:00002e0a:0028, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='foo', kafkaPartition=0, key=Struct{databaseName=server_ListeriaWGS}, keySchema=Schema{io.debezium.connector.sqlserver.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.3.0.Final,connector=sqlserver,name=foo,ts_ms=1632978889724,snapshot=true,db=server_ListeriaWGS,schema=dbo,table=ACTIONS,commit_lsn=00000030:00002e0a:0028},databaseName=server_ListeriaWGS,schemaName=dbo,tableChanges=[Struct{type=CREATE,id="server_ListeriaWGS"."dbo"."ACTIONS",table=Struct{primaryKeyColumnNames=[ACTIONID],columns=[Struct{name=ACTIONID,jdbcType=4,typeName=int,typeExpression=int,length=10,scale=0,position=1,optional=false,autoIncremented=false,generated=false}, Struct{name=TMSTAMP,jdbcType=12,typeName=varchar,typeExpression=varchar,length=40,position=2,optional=true,autoIncremented=false,generated=false}, Struct{name=APUSER,jdbcType=12,typeName=varchar,typeExpression=varchar,length=80,position=3,optional=true,autoIncremented=false,generated=false}, Struct{name=OSUSER,jdbcType=12,typeName=varchar,typeExpression=varchar,length=80,position=4,optional=true,autoIncremented=false,generated=false}, Struct{name=COMMENT,jdbcType=12,typeName=varchar,typeExpression=varchar,length=150,position=5,optional=true,autoIncremented=false,generated=false}, Struct{name=STATUS,jdbcType=4,typeName=int,typeExpression=int,length=10,scale=0,position=6,optional=true,autoIncremented=false,generated=false}]}}]}, valueSchema=Schema{io.debezium.connector.sqlserver.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
d.c.s.SqlServerSnapshotChangeEventSource : Schema locks released.
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 7 - Snapshotting data
.d.r.RelationalSnapshotChangeEventSource : Exporting data from table 'server_ListeriaWGS.dbo.ACTIONS'
.d.r.RelationalSnapshotChangeEventSource : For table 'server_ListeriaWGS.dbo.ACTIONS' using select statement: 'SELECT [ACTIONS].[ACTIONID],[ACTIONS].[TMSTAMP],[ACTIONS].[APUSER],[ACTIONS].[OSUSER],[ACTIONS].[COMMENT],[ACTIONS].[STATUS] FROM [dbo].[ACTIONS]'
.d.r.RelationalSnapshotChangeEventSource : Finished exporting 3 records for table 'server_ListeriaWGS.dbo.ACTIONS'; total duration '00:00:02,886'
.d.r.RelationalSnapshotChangeEventSource : Finished exporting 3 records for table 'server_ListeriaWGS.dbo.ACTIONS'; total duration '00:00:00,23'
.d.p.s.AbstractSnapshotChangeEventSource : Snapshot - Final stage
c.b.n.d.c.a.LogDatabaseChanges : {"ACTIONID":456}
c.b.n.d.c.a.LogDatabaseChanges : {"before":null,"after":{"ACTIONID":456,"TMSTAMP":"789","APUSER":null,"OSUSER":null,"COMMENT":null,"STATUS":null},"source":{"version":"1.3.0.Final","connector":"sqlserver","name":"foo","ts_ms":1632978890036,"snapshot":"true","db":"server_ListeriaWGS","schema":"dbo","table":"ACTIONS","change_lsn":null,"commit_lsn":"00000030:00002e0a:0028","event_serial_no":null},"op":"r","ts_ms":1632978890043,"transaction":null}
c.b.n.d.c.a.LogDatabaseChanges : {"ACTIONID":1234}
c.b.n.d.c.a.LogDatabaseChanges : {"before":null,"after":{"ACTIONID":1234,"TMSTAMP":"456","APUSER":null,"OSUSER":null,"COMMENT":null,"STATUS":null},"source":{"version":"1.3.0.Final","connector":"sqlserver","name":"foo","ts_ms":1632978890049,"snapshot":"true","db":"server_ListeriaWGS","schema":"dbo","table":"ACTIONS","change_lsn":null,"commit_lsn":"00000030:00002e0a:0028","event_serial_no":null},"op":"r","ts_ms":1632978890049,"transaction":null}
c.b.n.d.c.a.LogDatabaseChanges : {"ACTIONID":5645}
c.b.n.d.c.a.LogDatabaseChanges : {"before":null,"after":{"ACTIONID":5645,"TMSTAMP":"15654","APUSER":null,"OSUSER":null,"COMMENT":null,"STATUS":null},"source":{"version":"1.3.0.Final","connector":"sqlserver","name":"foo","ts_ms":1632978890049,"snapshot":"last","db":"server_ListeriaWGS","schema":"dbo","table":"ACTIONS","change_lsn":null,"commit_lsn":"00000030:00002e0a:0028","event_serial_no":null},"op":"r","ts_ms":1632978890049,"transaction":null}
o.s.c.f.common.cdc.CdcAutoConfiguration : [CDC Event]: SourceRecord{sourcePartition={server=foo}, sourceOffset={commit_lsn=00000030:00002e0a:0028, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='foo.dbo.ACTIONS', kafkaPartition=null, key=Struct{ACTIONID=456}, keySchema=Schema{EC2AMAZ_DU501KO.dbo.ACTIONS.Key:STRUCT}, value=Struct{after=Struct{ACTIONID=456,TMSTAMP=789},source=Struct{version=1.3.0.Final,connector=sqlserver,name=foo,ts_ms=1632978890259,snapshot=true,db=server_ListeriaWGS,schema=dbo,table=ACTIONS,commit_lsn=00000030:00002e0a:0028},op=r,ts_ms=1632978890260}, valueSchema=Schema{EC2AMAZ_DU501KO.dbo.ACTIONS.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
o.s.c.f.common.cdc.CdcAutoConfiguration : [CDC Event]: SourceRecord{sourcePartition={server=foo}, sourceOffset={commit_lsn=00000030:00002e0a:0028, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='foo.dbo.ACTIONS', kafkaPartition=null, key=Struct{ACTIONID=1234}, keySchema=Schema{EC2AMAZ_DU501KO.dbo.ACTIONS.Key:STRUCT}, value=Struct{after=Struct{ACTIONID=1234,TMSTAMP=456},source=Struct{version=1.3.0.Final,connector=sqlserver,name=foo,ts_ms=1632978890260,snapshot=true,db=server_ListeriaWGS,schema=dbo,table=ACTIONS,commit_lsn=00000030:00002e0a:0028},op=r,ts_ms=1632978890260}, valueSchema=Schema{EC2AMAZ_DU501KO.dbo.ACTIONS.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
o.s.c.f.common.cdc.CdcAutoConfiguration : [CDC Event]: SourceRecord{sourcePartition={server=foo}, sourceOffset={commit_lsn=00000030:00002e0a:0028, snapshot=true, snapshot_completed=true}} ConnectRecord{topic='foo.dbo.ACTIONS', kafkaPartition=null, key=Struct{ACTIONID=5645}, keySchema=Schema{EC2AMAZ_DU501KO.dbo.ACTIONS.Key:STRUCT}, value=Struct{after=Struct{ACTIONID=5645,TMSTAMP=15654},source=Struct{version=1.3.0.Final,connector=sqlserver,name=foo,ts_ms=1632978890260,snapshot=last,db=server_ListeriaWGS,schema=dbo,table=ACTIONS,commit_lsn=00000030:00002e0a:0028},op=r,ts_ms=1632978890260}, valueSchema=Schema{EC2AMAZ_DU501KO.dbo.ACTIONS.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
.d.p.s.AbstractSnapshotChangeEventSource : Snapshot - Final stage
d.c.s.SqlServerSnapshotChangeEventSource : Removing locking timeout
d.c.s.SqlServerSnapshotChangeEventSource : Removing locking timeout
i.d.p.ChangeEventSourceCoordinator : Snapshot ended with SnapshotResult [status=COMPLETED, offset=SqlServerOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, sourceInfo=SourceInfo [serverName=foo, changeLsn=NULL, commitLsn=00000030:00002e0a:0028, eventSerialNo=null, snapshot=FALSE, sourceTime=2021-09-30T05:14:50.049Z], partition={server=foo}, snapshotCompleted=true, eventSerialNo=1]]
.d.p.m.StreamingChangeEventSourceMetrics : Connected metrics set to 'true'
i.d.p.ChangeEventSourceCoordinator : Starting streaming
i.d.p.ChangeEventSourceCoordinator : Snapshot ended with SnapshotResult [status=COMPLETED, offset=SqlServerOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, sourceInfo=SourceInfo [serverName=foo, changeLsn=NULL, commitLsn=00000030:00002e0a:0028, eventSerialNo=null, snapshot=FALSE, sourceTime=2021-09-30T05:14:50.260Z], partition={server=foo}, snapshotCompleted=true, eventSerialNo=1]]
.d.p.m.StreamingChangeEventSourceMetrics : Connected metrics set to 'true'
i.d.p.ChangeEventSourceCoordinator : Starting streaming
.c.s.SqlServerStreamingChangeEventSource : Last position recorded in offsets is 00000030:00002e0a:0028(NULL)[1]
.c.s.SqlServerStreamingChangeEventSource : Last position recorded in offsets is 00000030:00002e0a:0028(NULL)[1]
As you can notice, there is c.b.n.d.c.a.LogDatabaseChanges
lines in logs so it means that it works at startup. But once snapshotting is finished, nothing more is received.
What am I missing ?
Solution 1:[1]
I ran into similar issue snapshot works but new updates are not streamed. I narrowed it down to problem with FileGroup.
Try commenting out @filegroup_name=N'CDC_DATA', and see if it helps.
I think it is some bug in the mssql connector, though even official debezium tutorial uses filegroup. https://debezium.io/documentation/reference/stable/connectors/sqlserver.html#_enabling_cdc_on_a_sql_server_table
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | user19096954 |