It seems that your DoFn is expecting a side input, could you verify that you are actually feeding the side input to your DoFn like `beam.ParDo(DebugWindowInformation(), 'extra_info') `, I suspect that missing side input of your DoFn has messed up the argument translation.
On Tue, Feb 23, 2021 at 5:29 PM Ahmet Altay <[email protected]> wrote: > /cc +Yichi Zhang <[email protected]> > > On Fri, Feb 19, 2021 at 2:24 AM Manninger, Matyas < > [email protected]> wrote: > >> Dear Beam users, >> >> I am using the following code to log debug info about my streaming >> pipeline: >> >> class DebugWindowInformation(beam.DoFn): >> def to_runner_api_parameter(self, unused_context): >> pass >> >> def process(self, data_item, extra='', >> timest=beam.DoFn.TimestampParam, windowparam=beam.DoFn.WindowParam, *args): >> import logging >> # GCP does NOT log debug on ROOT level >> print(type(windowparam)) >> print(windowparam) >> logging.info(f'[{datetime.datetime.now()}] [{timest}] window: >> {windowparam.start}-{windowparam.end} message: {extra} {data_item}') >> #logging.info(f'[{datetime.datetime.now()}] >> [{timest.to_utc_datetime()}] window: >> {windowparam.start.to_utc_datetime()}-{windowparam.end.to_utc_datetime()} >> message: {extra} {data_item}') >> yield data_item >> >> Unfortunately I get the following error: >> <class 'apache_beam.transforms.core._DoFnParam'> >> WindowParam >> logging.info(f'[{datetime.datetime.now()}] [{timest}] window: >> {windowparam.start}-{windowparam.end} message: {extra} {data_item}') >> AttributeError: '_DoFnParam' object has no attribute 'start' >> >> The code is taken from examples. Anyone has and idea what might cause the >> error? >> >> Any tip is appreciated, >> Matyas >> >> >>
