'Apache Beam Python SDK: How to access timestamp of an element?

I'm reading messages via ReadFromPubSub with timestamp_attribute=None, which should set timestamps to the publishing time.

This way, I end up with a PCollection of PubsubMessage elements.

How can I access the timestamps of these element in order, e.g. save them to a database? The only properties I can see are data and attributes, and the attributes only has keys coming from Pub/Sub.

Edit: Sample code

with beam.Pipeline(options=pipeline_options) as p:
    items = (p
        | ReadFromPubSub(topic=args.read_topic, with_attributes=True)
        | beam.WindowInto(beam.window.FixedWindows(args.time_window))
        | 'FormatMessage' >> beam.Map(format_message)
        | 'WriteRaw' >> WriteToBigQuery(args.raw_table, args.dataset,
            args.project, write_disposition='WRITE_APPEND')
    )

where format_message would take a PubsubMessage and return a dictionary representing a row to append to the table:

def format_message(message):
    formatted_message = {
        'data': base64.b64encode(message.data),
        'attributes': str(message.attributes)
    }
    return formatted_message


Solution 1:[1]

Turns out the mapped function can be modified to read additional arguments:

def format_message(message, timestamp=beam.DoFn.TimestampParam):    
    formatted_message = {
        'data': base64.b64encode(message.data),
        'attributes': str(message.attributes),
        'timestamp': float(timestamp)
    }

    return formatted_message

More possible parameters: https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn

Solution 2:[2]

Have you tried setting up with_attributes=True?

Hope the Beam docs be helpful. The parameters include:

with_attributes – True - output elements will be PubsubMessage objects. Default to False - output elements will be of type bytes (message data only).

Solution 3:[3]

It seems there's (newly release?!) timestamp_attribute arguments when you call beam.io.gcp.pubsub.ReadFromPubSub()

but I tried on my end, it doesn't work as I had expected. Posted a new query on SO if someone wanna follow-up DataFlow (PY 2.x SDk) ReadFromPubSub :: id_label & timestamp_attribute behaving unexpectedly

Solution 4:[4]

I don't know at what time this was introduced, but in addition to shadesofdarkred's answer there is another way (and for example can be used in a lambda as well).

The PubsubMessage returned by ReadFromPubSub has an attribute publish_time.

Therefore, considering the code in the original question, you could easily access it using message.publish_time.

From the docs:

publish_time

(datetime) Time at which the message was published. Will be reset to None if the Message is being written to pubsub.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 shadesofdarkred
Solution 2 Tina Iris
Solution 3 Vibhor Jain
Solution 4 dnnshssm