'Apache Beam Python SDK: How to access timestamp of an element?
I'm reading messages via ReadFromPubSub
with timestamp_attribute=None
, which should set timestamps to the publishing time.
This way, I end up with a PCollection
of PubsubMessage
elements.
How can I access the timestamps of these element in order, e.g. save them to a database? The only properties I can see are data
and attributes
, and the attributes
only has keys coming from Pub/Sub.
Edit: Sample code
with beam.Pipeline(options=pipeline_options) as p:
items = (p
| ReadFromPubSub(topic=args.read_topic, with_attributes=True)
| beam.WindowInto(beam.window.FixedWindows(args.time_window))
| 'FormatMessage' >> beam.Map(format_message)
| 'WriteRaw' >> WriteToBigQuery(args.raw_table, args.dataset,
args.project, write_disposition='WRITE_APPEND')
)
where format_message
would take a PubsubMessage
and return a dictionary representing a row to append to the table:
def format_message(message):
formatted_message = {
'data': base64.b64encode(message.data),
'attributes': str(message.attributes)
}
return formatted_message
Solution 1:[1]
Turns out the mapped function can be modified to read additional arguments:
def format_message(message, timestamp=beam.DoFn.TimestampParam):
formatted_message = {
'data': base64.b64encode(message.data),
'attributes': str(message.attributes),
'timestamp': float(timestamp)
}
return formatted_message
More possible parameters: https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn
Solution 2:[2]
Have you tried setting up with_attributes=True?
Hope the Beam docs be helpful. The parameters include:
with_attributes – True - output elements will be PubsubMessage objects. Default to False - output elements will be of type bytes (message data only).
Solution 3:[3]
It seems there's (newly release?!) timestamp_attribute
arguments when you call beam.io.gcp.pubsub.ReadFromPubSub()
but I tried on my end, it doesn't work as I had expected. Posted a new query on SO if someone wanna follow-up DataFlow (PY 2.x SDk) ReadFromPubSub :: id_label & timestamp_attribute behaving unexpectedly
Solution 4:[4]
I don't know at what time this was introduced, but in addition to shadesofdarkred's answer there is another way (and for example can be used in a lambda
as well).
The PubsubMessage
returned by ReadFromPubSub
has an attribute publish_time
.
Therefore, considering the code in the original question, you could easily access it using message.publish_time
.
From the docs:
publish_time
(datetime) Time at which the message was published. Will be reset to None if the Message is being written to pubsub.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | shadesofdarkred |
Solution 2 | Tina Iris |
Solution 3 | Vibhor Jain |
Solution 4 | dnnshssm |