> Could you try replacing instances of super() in aggregation_transform.py
> as done in https://github.com/apache/beam/pull/9513 and see if this issue
> is still reproducible?


If you are using --save_main_session, instead of modifying
aggregation_transform.py, try to modify the pipeline file itself, or move
any Transforms/DoFns that you define from your pipeline launcher file into
a separate file, then you don't need to pass --save_main_session.

On Mon, Oct 28, 2019 at 5:05 PM Valentyn Tymofieiev <[email protected]>
wrote:

> Could you confirm whether or not you are using --save_main_session when
> you launch your pipeline?
>
> On Mon, Oct 28, 2019 at 4:59 PM Valentyn Tymofieiev <[email protected]>
> wrote:
>
>> +user@, bcc: dev@
>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
>> this issue, although we saw instances of this bug in exactly opposite
>> scenarios - when pipeline was defined *in one file*, but not in multiple
>> files.
>>
>> Could you try replacing instances of super() in aggregation_transform.py
>> as done in https://github.com/apache/beam/pull/9513 and see if this
>> issue is still reproducible?
>>
>> If that doesn't work, I would try to get the dump of serialized_fn, and
>> try to reproduce the issue in isolated environment, such as:
>>
>> form apache_beam.internal import pickler
>> serialized_fn = "..content.."
>> pickler.loads(serialized_fn)
>>
>> then I would try to trim the doFn in the example to a
>> minimally-reproducible example. It could be another issue with dill
>> dependency.
>>
>>
>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <[email protected]>
>> wrote:
>>
>>> Hi All,
>>>
>>> We have noticed a weird intermittent issue on Python3 but we don't run
>>> into this issue on python2. Sometimes when we are trying to submit the
>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>> double-checked and we do find the attribute/methods are present in the
>>> right module and in right place but somehow the pipeline still complains
>>> about it. In some cases, we refer methods before their definition. We tried
>>> to reorder the method definition but that didn't help at all.
>>>
>>> We don't see the same issue when the entire pipeline is defined in one
>>> file. Also, note that this doesn't happen all the time when we submit the
>>> pipeline, so I feel it is some kind of race condition. When we enable the
>>> worker recycle logic it happens most of the time when sdk worker is
>>> recycled.
>>>
>>> Some more information about the environment:
>>> Python version: 3
>>> Beam version: 2.16
>>> Flink version: 1.8
>>>
>>> *Stack trace: *
>>>
>>>    - :
>>>
>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>> bundle}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>> at
>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>> at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>> at
>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>> ... 7 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>> 6: Traceback (most recent call last):
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 307, in get
>>>     processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>>> IndexError: pop from empty list
>>>
>>> During handling of the above exception, another exception occurred:
>>>
>>> Traceback (most recent call last):
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>> line 261, in loads
>>>     return dill.loads(s)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 317, in loads
>>>     return load(file, ignore)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 305, in load
>>>     obj = pik.load()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 474, in find_class
>>>     return StockUnpickler.find_class(self, module, name)
>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>
>>> During handling of the above exception, another exception occurred:
>>>
>>> Traceback (most recent call last):
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 165, in _execute
>>>     response = task()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 198, in <lambda>
>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 351, in do_instruction
>>>     request.instruction_id)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 371, in process_bundle
>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 313, in get
>>>     self.data_channel_factory)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 576, in __init__
>>>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 620, in create_execution_tree
>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 619, in <listcomp>
>>>     for transform_id in sorted(
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 544, in wrapper
>>>     result = cache[args] = func(*args)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 603, in get_operation
>>>     in descriptor.transforms[transform_id].outputs.items()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 602, in <dictcomp>
>>>     for tag, pcoll_id
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 601, in <listcomp>
>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 544, in wrapper
>>>     result = cache[args] = func(*args)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 603, in get_operation
>>>     in descriptor.transforms[transform_id].outputs.items()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 602, in <dictcomp>
>>>     for tag, pcoll_id
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 601, in <listcomp>
>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 544, in wrapper
>>>     result = cache[args] = func(*args)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 606, in get_operation
>>>     transform_id, transform_consumers)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 865, in create_operation
>>>     return creator(self, transform_id, transform_proto, payload,
>>> consumers)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 1108, in create
>>>     serialized_fn, parameter)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 1146, in _create_pardo_operation
>>>     dofn_data = pickler.loads(serialized_fn)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>> line 265, in loads
>>>     return dill.loads(s)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 317, in loads
>>>     return load(file, ignore)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 305, in load
>>>     obj = pik.load()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 474, in find_class
>>>     return StockUnpickler.find_class(self, module, name)
>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
>>> 'pricingrealtime.aggregation.aggregation_transform' from
>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>
>>>

Reply via email to