Could you confirm whether or not you are using --save_main_session when you
launch your pipeline?

On Mon, Oct 28, 2019 at 4:59 PM Valentyn Tymofieiev <[email protected]>
wrote:

> +user@, bcc: dev@
> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
> this issue, although we saw instances of this bug in exactly opposite
> scenarios - when pipeline was defined *in one file*, but not in multiple
> files.
>
> Could you try replacing instances of super() in aggregation_transform.py
> as done in https://github.com/apache/beam/pull/9513 and see if this issue
> is still reproducible?
>
> If that doesn't work, I would try to get the dump of serialized_fn, and
> try to reproduce the issue in isolated environment, such as:
>
> form apache_beam.internal import pickler
> serialized_fn = "..content.."
> pickler.loads(serialized_fn)
>
> then I would try to trim the doFn in the example to a
> minimally-reproducible example. It could be another issue with dill
> dependency.
>
>
> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <[email protected]> wrote:
>
>> Hi All,
>>
>> We have noticed a weird intermittent issue on Python3 but we don't run
>> into this issue on python2. Sometimes when we are trying to submit the
>> pipeline, we get AttributeError (Check the stack trace below).  we have
>> double-checked and we do find the attribute/methods are present in the
>> right module and in right place but somehow the pipeline still complains
>> about it. In some cases, we refer methods before their definition. We tried
>> to reorder the method definition but that didn't help at all.
>>
>> We don't see the same issue when the entire pipeline is defined in one
>> file. Also, note that this doesn't happen all the time when we submit the
>> pipeline, so I feel it is some kind of race condition. When we enable the
>> worker recycle logic it happens most of the time when sdk worker is
>> recycled.
>>
>> Some more information about the environment:
>> Python version: 3
>> Beam version: 2.16
>> Flink version: 1.8
>>
>> *Stack trace: *
>>
>>    - :
>>
>> TimerException{java.lang.RuntimeException: Failed to finish remote bundle}
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>> at
>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>> at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>> at
>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>> ... 7 more
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.RuntimeException: Error received from SDK harness for instruction
>> 6: Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 307, in get
>>     processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>> IndexError: pop from empty list
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>> line 261, in loads
>>     return dill.loads(s)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 317, in loads
>>     return load(file, ignore)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 305, in load
>>     obj = pik.load()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 474, in find_class
>>     return StockUnpickler.find_class(self, module, name)
>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
>> 'pricingrealtime.aggregation.aggregation_transform' from
>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 165, in _execute
>>     response = task()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 198, in <lambda>
>>     self._execute(lambda: worker.do_instruction(work), work)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 351, in do_instruction
>>     request.instruction_id)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 371, in process_bundle
>>     instruction_id, request.process_bundle_descriptor_reference)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 313, in get
>>     self.data_channel_factory)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 576, in __init__
>>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 620, in create_execution_tree
>>     descriptor.transforms, key=topological_height, reverse=True)])
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 619, in <listcomp>
>>     for transform_id in sorted(
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 544, in wrapper
>>     result = cache[args] = func(*args)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 603, in get_operation
>>     in descriptor.transforms[transform_id].outputs.items()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 602, in <dictcomp>
>>     for tag, pcoll_id
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 601, in <listcomp>
>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 544, in wrapper
>>     result = cache[args] = func(*args)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 603, in get_operation
>>     in descriptor.transforms[transform_id].outputs.items()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 602, in <dictcomp>
>>     for tag, pcoll_id
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 601, in <listcomp>
>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 544, in wrapper
>>     result = cache[args] = func(*args)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 606, in get_operation
>>     transform_id, transform_consumers)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 865, in create_operation
>>     return creator(self, transform_id, transform_proto, payload,
>> consumers)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 1108, in create
>>     serialized_fn, parameter)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 1146, in _create_pardo_operation
>>     dofn_data = pickler.loads(serialized_fn)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>> line 265, in loads
>>     return dill.loads(s)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 317, in loads
>>     return load(file, ignore)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 305, in load
>>     obj = pik.load()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 474, in find_class
>>     return StockUnpickler.find_class(self, module, name)
>> AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
>> 'pricingrealtime.aggregation.aggregation_transform' from
>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>
>>

Reply via email to