'Pyflink jdbc sink

I am trying to make use of Pyflink's JdbcSink to connect to Oracle's ADB instance. I can find examples of JdbcSink using java in Flink's official documentation. But there is no content provided for Python API to do the same. I was trying to replicate the way JdbcSink can be implemented in java into python as well but Pyflink's JdbcSink's method signature is different from Java's JdbcSink. And I can not find any example or documentation regarding use of Pyflink's JdbcSink. I am new to pyflink. Here is what I have been trying to do:

env = StreamExecutionEnvironment.get_execution_environment()

kafka_consumer = FlinkKafkaConsumer(
    topics='TestStream',
    deserialization_schema=SimpleStringSchema(),
    properties={'bootstrap.servers': bootstrapServers,
                'group.id': consumerGroupName,
                'enable.auto.commit': 'false',
                'session.timeout.ms': "6000",
                'security.protocol': 'SASL_SSL',
                'sasl.mechanism': 'PLAIN',
                'auto.offset.reset': 'earliest',
                'sasl.jaas.config': value
                })
ds = env.add_source(kafka_consumer)

ds.add_sink(JdbcSink.sink("insert into log_data (log_line) values (?)",
                          Types.ROW([Types.STRING()]),
                          JdbcConnectionOptions.JdbcConnectionOptionsBuilder().with_url(jdbc_url).with_password(jdbc_password).with_user_name(jdbc_user).with_driver_name(jdbc_driver).build(),
                          JdbcExecutionOptions.builder().with_batch_size(10).with_batch_interval_ms(200).with_max_retries(5).build()))

env.execute()


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source