'Number of rows affected by BigQuery query

I run daily commands to insert new records into a BigQuery table, and would like to log how many records get inserted each day.

I create a QueryJob object that contains a SELECT query, and a destination table. I set the write_disposition to WRITE_APPEND, so that the new data is appended to the table.

I've found two options that do something similar, but neither achieve what I'm looking for:

  • query_job.num_dml_affected_rows: This simply returns None, as the query doesn't use DML INSERT, but instead is appending to the destination table.
  • query_job.result().total_rows: This returns the total number of rows in the table, not the number of new rows.

I can think of a variety of ways of achieving the desired result, but not sure what the best approach is:

  • Change the query to a DML Insert - but this means creating dynamic SQL rather than just using the destination table is a python variable.
  • Dump the result into a temporary table, count the rows, then append the data - but this just seems inefficient.
  • Count the records before and after the query and log the delta - this can cause issues with running queries in parallel.

Any suggestions no the best approach?


Following up on muscat's answer, I think this wouldn't work when queries are run in parallel:

Get number of rows: 1000 rows
Function to call queries in paralell:

 - Query 1 --> Adds 100 rows --> finishes 3rd --> Counts 1200 rows
 - Query 2 --> Adds 80 rows --> finishes 2nd --> Counts 1100 rows
 - Query 3 --> Adds 20 rows --> finishes 1st --> Counts 1020 rows

Because there's no way to know which order these query will complete (as they are all called in parallel using using the multiprocessing library), I not sure how I can know how many rows each query added?


Update 2

Example code:

    ...

    # We compile a list of which datasets need to be loaded from
    brands = self._bq.select(f"Select brand,  gaDataset From {self.BRAND_DATASET}.{self.BRAND_TABLE}")
    brands = list(brands.iterrows())
    _, brands = zip(*brands)

    # Define the function for parallel population
    def populate_fn(brand):
        return self._populate(brand, self.predicates)

    logging.info("Populating daily stats for brands in parallel")
    error = self._parallel_apply(populate_fn, brands)
    if error is not None:
        return error

def _populate(self, brand, predicates):
    # We can't just call <bq_load_data> because we need to update the predicates for each brand
    predicates.add_predicate('gaDataset', brand['gaDataset'], operator="_")
    query_job = self._load_data(self.table_name, predicates=predicates)
    logging.info(f"Started for {brand['gaDataset']}: {brand['brand']}")

    self._run_query_job(query_job)
    logging.info(f"{brand['gaDataset']}: {brand['brand']} is now populated.")

The _populate function is run in parallel for each of the brands.

predicates is simply an object that handles how to modify the Jinja templated SQL, with some common parameters from the main object, and additionally some brand specific parameters.

_load_data is a function that actually loads the Jinja templated SQL with the appropriate parameters, and the constructs and returns a QueryJob object.



Solution 1:[1]

Efficient and recommended way is to count the records before and after running the query. There is no issue with running queries in parallel, because we can wait for the query job to complete before checking the updated number of rows. I've prepared example of how it is possible to check the newly added number of rows:

from google.cloud import bigquery

client = bigquery.Client()

# Define destination table.
table_id = "<PROJECT_ID>.<DATASET>.<TABLE>"

# Inspect the number of rows in the table before running the query.
table = client.get_table(table_id)
num_rows_begin = table.num_rows
print("Number of rows before running the query job: " + str(num_rows_begin))

sql = """
    SELECT word, word_count
    FROM `bigquery-public-data.samples.shakespeare`
    LIMIT 10
"""

job_config = bigquery.QueryJobConfig(destination=table_id, write_disposition="WRITE_APPEND")

# Make an API request.
query_job = client.query(sql, job_config=job_config)

# Wait for the job to complete.  
query_job.result()

# Inspect the number of newly added rows in the table after running the query.
# First way:
num_rows_end = query_job._query_results.total_rows - num_rows_begin
print("Loaded {} rows into {}".format(str(num_rows_end), table_id))

# Second way:
table = client.get_table(table_id)
print("Loaded {} rows into {}".format(table.num_rows - num_rows_begin, table_id))

As you can see, there are few ways of checking the number of newly added rows. First one refers to results of query_job: query_job._query_results.total_rows, which is basically the same what query_job.result().total_rows. The second way gets the info about a dataset in the project. The important thing here is that, we need to call table = client.get_table(table_id) method again, before checking the number of rows. If we won't do that, the system will print: Loaded 0 rows into table, due to the fact, that it refers to the number of rows specified before running the query.

I hope you find the above pieces of information useful.

Solution 2:[2]

As an alternative: You can do this all in GBQ as is, you do not even need to go to api way.

Example: Insert into some table and find how many rows got inserted:

insert into `YOUR_TABLE_PATH` (col1, col2) select 'test', 'test'; 

select @@row_count;

This is the reference of all system variables [https://cloud.google.com/bigquery/docs/reference/system-variables]

All of DML affected rows you can it above way.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Pratik Patil