> Could you try replacing instances of super() in aggregation_transform.py > as done in https://github.com/apache/beam/pull/9513 and see if this issue > is still reproducible?
If you are using --save_main_session, instead of modifying aggregation_transform.py, try to modify the pipeline file itself, or move any Transforms/DoFns that you define from your pipeline launcher file into a separate file, then you don't need to pass --save_main_session. On Mon, Oct 28, 2019 at 5:05 PM Valentyn Tymofieiev <[email protected]> wrote: > Could you confirm whether or not you are using --save_main_session when > you launch your pipeline? > > On Mon, Oct 28, 2019 at 4:59 PM Valentyn Tymofieiev <[email protected]> > wrote: > >> +user@, bcc: dev@ >> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to >> this issue, although we saw instances of this bug in exactly opposite >> scenarios - when pipeline was defined *in one file*, but not in multiple >> files. >> >> Could you try replacing instances of super() in aggregation_transform.py >> as done in https://github.com/apache/beam/pull/9513 and see if this >> issue is still reproducible? >> >> If that doesn't work, I would try to get the dump of serialized_fn, and >> try to reproduce the issue in isolated environment, such as: >> >> form apache_beam.internal import pickler >> serialized_fn = "..content.." >> pickler.loads(serialized_fn) >> >> then I would try to trim the doFn in the example to a >> minimally-reproducible example. It could be another issue with dill >> dependency. >> >> >> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <[email protected]> >> wrote: >> >>> Hi All, >>> >>> We have noticed a weird intermittent issue on Python3 but we don't run >>> into this issue on python2. Sometimes when we are trying to submit the >>> pipeline, we get AttributeError (Check the stack trace below). we have >>> double-checked and we do find the attribute/methods are present in the >>> right module and in right place but somehow the pipeline still complains >>> about it. In some cases, we refer methods before their definition. We tried >>> to reorder the method definition but that didn't help at all. >>> >>> We don't see the same issue when the entire pipeline is defined in one >>> file. Also, note that this doesn't happen all the time when we submit the >>> pipeline, so I feel it is some kind of race condition. When we enable the >>> worker recycle logic it happens most of the time when sdk worker is >>> recycled. >>> >>> Some more information about the environment: >>> Python version: 3 >>> Beam version: 2.16 >>> Flink version: 1.8 >>> >>> *Stack trace: * >>> >>> - : >>> >>> TimerException{java.lang.RuntimeException: Failed to finish remote >>> bundle} >>> at >>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle >>> at >>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667) >>> at >>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144) >>> at >>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754) >>> at >>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86) >>> at >>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118) >>> at >>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750) >>> at >>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744) >>> at >>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460) >>> at >>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330) >>> ... 7 more >>> Caused by: java.util.concurrent.ExecutionException: >>> java.lang.RuntimeException: Error received from SDK harness for instruction >>> 6: Traceback (most recent call last): >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 307, in get >>> processor = self.cached_bundle_processors[bundle_descriptor_id].pop() >>> IndexError: pop from empty list >>> >>> During handling of the above exception, another exception occurred: >>> >>> Traceback (most recent call last): >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py", >>> line 261, in loads >>> return dill.loads(s) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>> line 317, in loads >>> return load(file, ignore) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>> line 305, in load >>> obj = pik.load() >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>> line 474, in find_class >>> return StockUnpickler.find_class(self, module, name) >>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on >>> <module 'pricingrealtime.aggregation.aggregation_transform' from >>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>* >>> >>> During handling of the above exception, another exception occurred: >>> >>> Traceback (most recent call last): >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 165, in _execute >>> response = task() >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 198, in <lambda> >>> self._execute(lambda: worker.do_instruction(work), work) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 351, in do_instruction >>> request.instruction_id) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 371, in process_bundle >>> instruction_id, request.process_bundle_descriptor_reference) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", >>> line 313, in get >>> self.data_channel_factory) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 576, in __init__ >>> self.ops = self.create_execution_tree(self.process_bundle_descriptor) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 620, in create_execution_tree >>> descriptor.transforms, key=topological_height, reverse=True)]) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 619, in <listcomp> >>> for transform_id in sorted( >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 544, in wrapper >>> result = cache[args] = func(*args) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 603, in get_operation >>> in descriptor.transforms[transform_id].outputs.items() >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 602, in <dictcomp> >>> for tag, pcoll_id >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 601, in <listcomp> >>> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 544, in wrapper >>> result = cache[args] = func(*args) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 603, in get_operation >>> in descriptor.transforms[transform_id].outputs.items() >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 602, in <dictcomp> >>> for tag, pcoll_id >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 601, in <listcomp> >>> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 544, in wrapper >>> result = cache[args] = func(*args) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 606, in get_operation >>> transform_id, transform_consumers) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 865, in create_operation >>> return creator(self, transform_id, transform_proto, payload, >>> consumers) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 1108, in create >>> serialized_fn, parameter) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", >>> line 1146, in _create_pardo_operation >>> dofn_data = pickler.loads(serialized_fn) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py", >>> line 265, in loads >>> return dill.loads(s) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>> line 317, in loads >>> return load(file, ignore) >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>> line 305, in load >>> obj = pik.load() >>> File >>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py", >>> line 474, in find_class >>> return StockUnpickler.find_class(self, module, name) >>> AttributeError: Can't get attribute '_timestamp_keyed_result' on <module >>> 'pricingrealtime.aggregation.aggregation_transform' from >>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'> >>> >>>
