Dear Beam users,

I am using the following code to log debug info about my streaming pipeline:

class DebugWindowInformation(beam.DoFn):
    def to_runner_api_parameter(self, unused_context):
        pass

    def process(self, data_item, extra='', timest=beam.DoFn.TimestampParam,
windowparam=beam.DoFn.WindowParam, *args):
        import logging
        # GCP does NOT log debug on ROOT level
        print(type(windowparam))
        print(windowparam)
        logging.info(f'[{datetime.datetime.now()}] [{timest}] window:
{windowparam.start}-{windowparam.end} message: {extra} {data_item}')
        #logging.info(f'[{datetime.datetime.now()}]
[{timest.to_utc_datetime()}] window:
{windowparam.start.to_utc_datetime()}-{windowparam.end.to_utc_datetime()}
message: {extra} {data_item}')
        yield data_item

Unfortunately I get the following error:
<class 'apache_beam.transforms.core._DoFnParam'>
WindowParam
logging.info(f'[{datetime.datetime.now()}] [{timest}] window:
{windowparam.start}-{windowparam.end} message: {extra} {data_item}')
AttributeError: '_DoFnParam' object has no attribute 'start'

The code is taken from examples. Anyone has and idea what might cause the
error?

Any tip is appreciated,
Matyas

Reply via email to