'Apache Beam pipeline with Write to jdbc
I am trying to create a pipeline which is reading some data from Pubsub and writing to into a postgres database.
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True
Device = NamedTuple(
"Device",
[
("id", str),
("userId", str),
("patientId", str)
])
coders.registry.register_coder(Device, coders.RowCoder)
p = beam.Pipeline(options = pipeline_options)
(p
| 'ReadFromPubSub' >> beam.io.gcp.pubsub.ReadFromPubSub(topic="projects/us-vpc/topics/pipeline").with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| beam.Map(lambda x:
Device(id= "e4f63782-66f5-4f49-911f-0b00efa5b23e", userId="Random",
patientId=str('12345')))
.with_output_types(Device)
| beam.WindowInto(beam.window.FixedWindows(1))
| 'Write to jdbc' >> WriteToJdbc(
table_name= "patient_device",
driver_class_name = db_driver_name,
jdbc_url = jdbc:postgresql://localhost:5432/db_name,
username = dev-user,
password = db-password
)
)
result = p.run()
result.wait_until_finish()
I can see four steps are being created on the data flow after it is deployed on gcp.
- ReadFromPubSub
- Decode
- Map
- WindowInto
But the issue is 'Write to jdbc' step is not created on the dataflow.
Here is the command to execute the dataflow:
python pipeline.py --runner DataflowRunner --project us-con-project-location --temp_location gs://staging/temp --staging_location gs://staging/temp --region us-east1 --input_topic "projects/us-vpc/topics/pipeline" --subnetwork regions/us-east1/subnetworks/-public-01
Any help would be appreciated!
Examples Followed: https://beam.apache.org/releases/pydoc/2.24.0/apache_beam.io.jdbc.html
Solution 1:[1]
Note that cross-language pipelines on Dataflow require Runner v2. I'm not sure that JdbcIO works well for streaming though; you could try debugging this with a batch pipeline first (replacing your PubSub read with a simple Create).
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | robertwb |