Dear Beam users,
I am using the following code to log debug info about my streaming pipeline:
class DebugWindowInformation(beam.DoFn):
def to_runner_api_parameter(self, unused_context):
pass
def process(self, data_item, extra='', timest=beam.DoFn.TimestampParam,
windowparam=beam.DoFn.WindowParam, *args):
import logging
# GCP does NOT log debug on ROOT level
print(type(windowparam))
print(windowparam)
logging.info(f'[{datetime.datetime.now()}] [{timest}] window:
{windowparam.start}-{windowparam.end} message: {extra} {data_item}')
#logging.info(f'[{datetime.datetime.now()}]
[{timest.to_utc_datetime()}] window:
{windowparam.start.to_utc_datetime()}-{windowparam.end.to_utc_datetime()}
message: {extra} {data_item}')
yield data_item
Unfortunately I get the following error:
<class 'apache_beam.transforms.core._DoFnParam'>
WindowParam
logging.info(f'[{datetime.datetime.now()}] [{timest}] window:
{windowparam.start}-{windowparam.end} message: {extra} {data_item}')
AttributeError: '_DoFnParam' object has no attribute 'start'
The code is taken from examples. Anyone has and idea what might cause the
error?
Any tip is appreciated,
Matyas