'Accessing schema.name from debezium sink connector to postgres

We have a debezium source connectors working perfectly fine, and one of the properties set is, for example:

"transforms.SetSchemaMetadata.schema.name": "myschemaname1"

and

"transforms.SetSchemaMetadata.schema.name": "myschemaname2"

The data is making it to Kafka just fine, and this value above in it, as schema.name. The issue we're having is then when trying to create a jdbc sink connector to postgres, and we want to route the data not to the "public" schema, but to dynamically send it to the schema we added above (e.g. myschemaname1 or myschemaname2) depending on what each payload/record brings.

The question is: how can I access schema.name?

Here are the sink connector properties set:

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
transforms.RemoveString.regex=emr.(.*)
table.name.format=${'schema.name'}."${topic}"
tasks.max=3
transforms=RemoveString
topics.regex=emr.users
key.converter.schemas.enable=true
delete.enabled=true
auto.evolve=true
transforms.RemoveString.type=org.apache.kafka.connect.transforms.RegexRouter
value.converter.schemas.enable=true
auto.create=true
value.converter=org.apache.kafka.connect.json.JsonConverter
connection.url=jdbc:postgresql://<our db dns>:5432/<our db name>?user=<db user>&password=<db password>
transforms.RemoveString.replacement=$1
insert.mode=upsert
key.converter=org.apache.kafka.connect.json.JsonConverter
pk.mode=record_key
pk.fields=id

In the above properties, this is how we initially thought we could try, but obviously is not correct:

table.name.format=${'schema.name'}."${topic}"

Error with the above is

org.postgresql.util.PSQLException: ERROR: cross-database references are not implemented: "${'schema.name'}.users"

And that is because of the dot between schema and name and the dot between that and topic var, which makes postgres think is a cross db reference (2 dots).



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source