'Issues streaming data from Pub/Sub into BigQuery using Dataflow and Apache Beam (Python)

currently I am facing issues getting my beam pipeline running on Dataflow to write data from Pub/Sub into BigQuery. I've looked through the various steps and all the data itself seems to be getting altered as expected. The problem comes from the step using beam.io.gcp.bigquery.WriteToBigQuery. Checking the stackdriver shows:

There were errors inserting to BigQuery: [<InsertErrorsValueListEntry errors: [<ErrorProto debugInfo: '' location: 'root.array[0].thing' message: 'no such field.' reason: 'invalid'>] index: 0>,

I cut the above log short, but the rest of it is more of the same; missing fields. While this error is accurate in that those fields do not exist in the schema, I pass the additional_bq_parameters as:

{
  'schemaUpdateOptions': ["ALLOW_FIELD_ADDITION"],
  'ignoreUnknownValues': True,
  'maxBadRecords': 1000,
}

These additional arguments seem to get ignored whether I use a callable that just returns the above dictionary, or just set additional_bq_parameters equal to the dictionary itself.

The ParDo function's process looks like the following:

    def process(self, tuple):
        import apache_beam as beam

        def get_additional_bq_parameters(_element):
            return {
                'schemaUpdateOptions': ["ALLOW_FIELD_ADDITION"],
                'ignoreUnknownValues': True,
                'maxBadRecords': 1000,
            }

        key, data = tuple
        table_name = f"table.{key}"
        (data | 'Upload' >> beam.io.gcp.bigquery.WriteToBigQuery(
            table=table_name,
            schema=self.schemas[key], # list of schemas passed to init
            additional_bq_parameters=get_additional_bq_parameters,
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            method='STREAMING_INSERTS',
            insert_retry_strategy="RETRY_NEVER")
        )

When running the pipeline on Dataflow I pass the following arguments:

python3 script.py \
  --project=<project> \
  --job_name=<name> \
  --streaming \
  --runner=DataflowRunner \
  --temp_location=<temp> \
  --experiments use_beam_bq_sink \
  --staging_location=<location> \
  --num_workers=2

If anyone can elaborate why the additional arguments to BigQuery are seemingly not being recognized I'd greatly appreciate it.

Furthermore, I've tried returning the results from the write to BigQuery in an attempt to persist the failed rows to GCS. However, when I try to access the failed rows from the resulting PCollection with either results[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS] or results['FailedRows'] I get the error TypeError: 'PCollection' object is not subscriptable. Based on what I've seen, I thought that this was the proper approach. If anyone could clarify this as well, I would be really grateful.



Solution 1:[1]

It seems that there is an issue with the option additional_bq_parameters and not all the options are supported yet.

I replicated your case and I couldn't execute the pipeline with those parameters; however, if I use some of these, it works

On the other hand, to get the failed rows, you can follow an example given in this post

events = (p
    | "Create data" >> beam.Create(data)
    | "CSV to dict" >> beam.ParDo(CsvToDictFn())
    | "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
        "{0}:dataflow_test.good_lines".format(PROJECT),
        schema=schema,
    )
 )

Access the FAILED_ROWS side output:

(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
    | "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))

Solution 2:[2]

Ignoring fields that are not available in the target table's schema is now possible, starting from the 2.38.0 release. You need to pass ignore_unknown_columns=True argument to WriteToBigQuery.

Expanding your original example:

   (data | 'Upload' >> beam.io.gcp.bigquery.WriteToBigQuery(
            table=table_name,
            schema=self.schemas[key],
            ignore_unknown_columns=True, # PASS THIS PARAMETER TO IGNOR UNKNOWN FIELDS
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            method='STREAMING_INSERTS',
            insert_retry_strategy="RETRY_NEVER")
        )

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2