'Apache Beam pipeline with Write to jdbc

I am trying to create a pipeline which is reading some data from Pubsub and writing to into a postgres database.

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True
Device = NamedTuple(
     "Device",
     [
         ("id", str),
         ("userId", str),
         ("patientId", str)
     ])
coders.registry.register_coder(Device, coders.RowCoder)
p = beam.Pipeline(options = pipeline_options)

(p
   | 'ReadFromPubSub' >> beam.io.gcp.pubsub.ReadFromPubSub(topic="projects/us-vpc/topics/pipeline").with_output_types(bytes)
   | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
   | beam.Map(lambda x:
             Device(id= "e4f63782-66f5-4f49-911f-0b00efa5b23e", userId="Random",
                             patientId=str('12345')))
     .with_output_types(Device)
   | beam.WindowInto(beam.window.FixedWindows(1))
   | 'Write to jdbc' >> WriteToJdbc(
       table_name= "patient_device",
       driver_class_name = db_driver_name,
       jdbc_url = jdbc:postgresql://localhost:5432/db_name,
       username = dev-user,
       password = db-password
       )
) 
result = p.run()
result.wait_until_finish()

I can see four steps are being created on the data flow after it is deployed on gcp.

  1. ReadFromPubSub
  2. Decode
  3. Map
  4. WindowInto

But the issue is 'Write to jdbc' step is not created on the dataflow.

Here is the command to execute the dataflow:

python pipeline.py --runner DataflowRunner --project us-con-project-location --temp_location gs://staging/temp --staging_location gs://staging/temp --region us-east1 --input_topic "projects/us-vpc/topics/pipeline" --subnetwork regions/us-east1/subnetworks/-public-01

Any help would be appreciated!

Examples Followed: https://beam.apache.org/releases/pydoc/2.24.0/apache_beam.io.jdbc.html

Apache Beam pipeline with JdbcIO



Solution 1:[1]

Note that cross-language pipelines on Dataflow require Runner v2. I'm not sure that JdbcIO works well for streaming though; you could try debugging this with a batch pipeline first (replacing your PubSub read with a simple Create).

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 robertwb