'Pyflink jdbc sink
I am trying to make use of Pyflink's JdbcSink to connect to Oracle's ADB instance. I can find examples of JdbcSink using java in Flink's official documentation. But there is no content provided for Python API to do the same. I was trying to replicate the way JdbcSink can be implemented in java into python as well but Pyflink's JdbcSink's method signature is different from Java's JdbcSink. And I can not find any example or documentation regarding use of Pyflink's JdbcSink. I am new to pyflink. Here is what I have been trying to do:
env = StreamExecutionEnvironment.get_execution_environment()
kafka_consumer = FlinkKafkaConsumer(
topics='TestStream',
deserialization_schema=SimpleStringSchema(),
properties={'bootstrap.servers': bootstrapServers,
'group.id': consumerGroupName,
'enable.auto.commit': 'false',
'session.timeout.ms': "6000",
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'auto.offset.reset': 'earliest',
'sasl.jaas.config': value
})
ds = env.add_source(kafka_consumer)
ds.add_sink(JdbcSink.sink("insert into log_data (log_line) values (?)",
Types.ROW([Types.STRING()]),
JdbcConnectionOptions.JdbcConnectionOptionsBuilder().with_url(jdbc_url).with_password(jdbc_password).with_user_name(jdbc_user).with_driver_name(jdbc_driver).build(),
JdbcExecutionOptions.builder().with_batch_size(10).with_batch_interval_ms(200).with_max_retries(5).build()))
env.execute()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|