Re: Pipeline AttributeError on Python3

2019-11-22 Thread Thomas Weise
We have not seen the issue with Python 3.6 on 2.16+ after applying this
patch. 

Thanks!

On Thu, Nov 21, 2019 at 4:41 PM Thomas Weise  wrote:

> We are currently verifying the patch. Will report back tomorrow.
>
> On Thu, Nov 21, 2019 at 8:40 AM Valentyn Tymofieiev 
> wrote:
>
>> That would be helpful, thanks a lot! It should be a straightforward patch.
>> Also, thanks Guenther, for sharing your investigation on
>> https://bugs.python.org/issue34572, it was very helpful.
>>
>> On Thu, Nov 21, 2019 at 8:25 AM Thomas Weise  wrote:
>>
>>> Valentyn, thanks a lot for following up on this.
>>>
>>> If the change can be cherry picked in isolation, we should be able to
>>> verify this soon (with 2.16).
>>>
>>>
>>> On Thu, Nov 21, 2019 at 8:12 AM Valentyn Tymofieiev 
>>> wrote:
>>>
 To close the loop here: To my knowledge this issue affects all Python 3
 users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
 including users on Python 3.7.3 and newer versions.

 The issue is addressed on Beam master, and we have a cherry-pick out
 for Beam 2.17.0.

 Workaround options for users on 2.16.0 and earlier SDKs:

 - Patch the SDK you are using with
 https://github.com/apache/beam/pull/10167.
 - Temporarily switch to Python 2 until 2.17.0. We have not seen the
 issue on Python 2, so it may be rare on non-existent on Python 2.
 - Pass --experiments worker_threads=1 . This option may work only for
 some, but not all pipelines.

 See BEAM-8651  for
 details on the issue.

 On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev <
 valen...@google.com> wrote:

> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to
> track this issue and any recommendation for the users that will come out 
> of
> it.
>
> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <
> valen...@google.com> wrote:
>
>>  I think we have heard of this issue from the same source:
>>
>> This looks exactly like a race condition that we've encountered on
>>> Python 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>>> thread-safety of the unpickler, as concurrent unpickle threads can 
>>> access a
>>> module before it has been fully imported. See
>>> https://bugs.python.org/issue34572 for more information.
>>>
>>> The traceback shows a Python 3.6 venv so this could be a different
>>> issue (the unpickle bug was introduced in version 3.7). If it's the same
>>> bug then upgrading to Python 3.7.3 or higher should fix that issue. One
>>> potential workaround is to ensure that all of the modules get imported
>>> during the initialization of the sdk_worker, as this bug only affects
>>> imports done by the unpickler.
>>
>>
>> The symptoms do sound similar, so I would try to reproduce your issue
>> on 3.7.3 and see if it is gone, or try to reproduce
>> https://bugs.python.org/issue34572 in the version of interpreter you
>> use. If this doesn't help, you can try to reproduce the race using your
>> input.
>>
>> To get the output of serialized do fn, you could do the following:
>> 1. Patch https://github.com/apache/beam/pull/10036.
>> 2. Set logging level to DEBUG, see:
>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
>> .
>> 3. Check for log output for payload of your transform, it may look
>> like:
>>
>> transforms {
>>   key:
>> "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>>   value {
>> spec {
>>   urn: "beam:transform:pardo:v1"
>>   payload: "\n\347\006\n\275\006\n
>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
>> 
>>
>> Then you can extract the output of pickled fn:
>>
>> from apache_beam.utils import proto_utils
>> from apache_beam.portability.api import beam_runner_api_pb2
>> from apache_beam.internal import pickler
>>
>> payload = b'\n\347\006\n\275\006\n
>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
>> pardo_payload = proto_utils.parse_Bytes(x,
>> beam_runner_api_pb2.ParDoPayload)
>> pickled_fn = pardo_payload.do_fn.spec.payload
>>
>> pickler.loads(pickle_fn) # Presumably the race happens here when
>> unpickling one of your transforms
>> (pricingrealtime.aggregation.aggregation_transform).
>>
>>
>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar 
>> wrote:
>>
>>> Thanks Valentyn,
>>>
>>> Aggregation_transform.py doesn't have any transformation method
>>> which extends beam.DoFn. We are using plain python method which we 
>>> passed
>>> in beam.Map().  I am not sure how to get the dump of serialized_fn. Can 

Re: Pipeline AttributeError on Python3

2019-11-21 Thread Thomas Weise
We are currently verifying the patch. Will report back tomorrow.

On Thu, Nov 21, 2019 at 8:40 AM Valentyn Tymofieiev 
wrote:

> That would be helpful, thanks a lot! It should be a straightforward patch.
> Also, thanks Guenther, for sharing your investigation on
> https://bugs.python.org/issue34572, it was very helpful.
>
> On Thu, Nov 21, 2019 at 8:25 AM Thomas Weise  wrote:
>
>> Valentyn, thanks a lot for following up on this.
>>
>> If the change can be cherry picked in isolation, we should be able to
>> verify this soon (with 2.16).
>>
>>
>> On Thu, Nov 21, 2019 at 8:12 AM Valentyn Tymofieiev 
>> wrote:
>>
>>> To close the loop here: To my knowledge this issue affects all Python 3
>>> users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
>>> including users on Python 3.7.3 and newer versions.
>>>
>>> The issue is addressed on Beam master, and we have a cherry-pick out for
>>> Beam 2.17.0.
>>>
>>> Workaround options for users on 2.16.0 and earlier SDKs:
>>>
>>> - Patch the SDK you are using with
>>> https://github.com/apache/beam/pull/10167.
>>> - Temporarily switch to Python 2 until 2.17.0. We have not seen the
>>> issue on Python 2, so it may be rare on non-existent on Python 2.
>>> - Pass --experiments worker_threads=1 . This option may work only for
>>> some, but not all pipelines.
>>>
>>> See BEAM-8651  for
>>> details on the issue.
>>>
>>> On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev <
>>> valen...@google.com> wrote:
>>>
 I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track
 this issue and any recommendation for the users that will come out of it.

 On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev 
 wrote:

>  I think we have heard of this issue from the same source:
>
> This looks exactly like a race condition that we've encountered on
>> Python 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>> thread-safety of the unpickler, as concurrent unpickle threads can 
>> access a
>> module before it has been fully imported. See
>> https://bugs.python.org/issue34572 for more information.
>>
>> The traceback shows a Python 3.6 venv so this could be a different
>> issue (the unpickle bug was introduced in version 3.7). If it's the same
>> bug then upgrading to Python 3.7.3 or higher should fix that issue. One
>> potential workaround is to ensure that all of the modules get imported
>> during the initialization of the sdk_worker, as this bug only affects
>> imports done by the unpickler.
>
>
> The symptoms do sound similar, so I would try to reproduce your issue
> on 3.7.3 and see if it is gone, or try to reproduce
> https://bugs.python.org/issue34572 in the version of interpreter you
> use. If this doesn't help, you can try to reproduce the race using your
> input.
>
> To get the output of serialized do fn, you could do the following:
> 1. Patch https://github.com/apache/beam/pull/10036.
> 2. Set logging level to DEBUG, see:
> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
> .
> 3. Check for log output for payload of your transform, it may look
> like:
>
> transforms {
>   key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>   value {
> spec {
>   urn: "beam:transform:pardo:v1"
>   payload: "\n\347\006\n\275\006\n
> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
> 
>
> Then you can extract the output of pickled fn:
>
> from apache_beam.utils import proto_utils
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.internal import pickler
>
> payload = b'\n\347\006\n\275\006\n
> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
> pardo_payload = proto_utils.parse_Bytes(x,
> beam_runner_api_pb2.ParDoPayload)
> pickled_fn = pardo_payload.do_fn.spec.payload
>
> pickler.loads(pickle_fn) # Presumably the race happens here when
> unpickling one of your transforms
> (pricingrealtime.aggregation.aggregation_transform).
>
>
> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar 
> wrote:
>
>> Thanks Valentyn,
>>
>> Aggregation_transform.py doesn't have any transformation method which
>> extends beam.DoFn. We are using plain python method which we passed in
>> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>> please let me the process?
>>
>> I also heard that some people ran into this issue on Python 3.7.1 but
>> the same issue is not present on Python 3.7.3. Can you confirm this?
>>
>>
>>
>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <
>> valen...@google.com> wrote:

Re: Pipeline AttributeError on Python3

2019-11-21 Thread Valentyn Tymofieiev
That would be helpful, thanks a lot! It should be a straightforward patch.
Also, thanks Guenther, for sharing your investigation on
https://bugs.python.org/issue34572, it was very helpful.

On Thu, Nov 21, 2019 at 8:25 AM Thomas Weise  wrote:

> Valentyn, thanks a lot for following up on this.
>
> If the change can be cherry picked in isolation, we should be able to
> verify this soon (with 2.16).
>
>
> On Thu, Nov 21, 2019 at 8:12 AM Valentyn Tymofieiev 
> wrote:
>
>> To close the loop here: To my knowledge this issue affects all Python 3
>> users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
>> including users on Python 3.7.3 and newer versions.
>>
>> The issue is addressed on Beam master, and we have a cherry-pick out for
>> Beam 2.17.0.
>>
>> Workaround options for users on 2.16.0 and earlier SDKs:
>>
>> - Patch the SDK you are using with
>> https://github.com/apache/beam/pull/10167.
>> - Temporarily switch to Python 2 until 2.17.0. We have not seen the issue
>> on Python 2, so it may be rare on non-existent on Python 2.
>> - Pass --experiments worker_threads=1 . This option may work only for
>> some, but not all pipelines.
>>
>> See BEAM-8651  for
>> details on the issue.
>>
>> On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev 
>> wrote:
>>
>>> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track
>>> this issue and any recommendation for the users that will come out of it.
>>>
>>> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev 
>>> wrote:
>>>
  I think we have heard of this issue from the same source:

 This looks exactly like a race condition that we've encountered on
> Python 3.7.1: There's a bug in some older 3.7.x releases that breaks the
> thread-safety of the unpickler, as concurrent unpickle threads can access 
> a
> module before it has been fully imported. See
> https://bugs.python.org/issue34572 for more information.
>
> The traceback shows a Python 3.6 venv so this could be a different
> issue (the unpickle bug was introduced in version 3.7). If it's the same
> bug then upgrading to Python 3.7.3 or higher should fix that issue. One
> potential workaround is to ensure that all of the modules get imported
> during the initialization of the sdk_worker, as this bug only affects
> imports done by the unpickler.


 The symptoms do sound similar, so I would try to reproduce your issue
 on 3.7.3 and see if it is gone, or try to reproduce
 https://bugs.python.org/issue34572 in the version of interpreter you
 use. If this doesn't help, you can try to reproduce the race using your
 input.

 To get the output of serialized do fn, you could do the following:
 1. Patch https://github.com/apache/beam/pull/10036.
 2. Set logging level to DEBUG, see:
 https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
 .
 3. Check for log output for payload of your transform, it may look like:

 transforms {
   key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
   value {
 spec {
   urn: "beam:transform:pardo:v1"
   payload: "\n\347\006\n\275\006\n
 beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
 

 Then you can extract the output of pickled fn:

 from apache_beam.utils import proto_utils
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.internal import pickler

 payload = b'\n\347\006\n\275\006\n
 beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
 pardo_payload = proto_utils.parse_Bytes(x,
 beam_runner_api_pb2.ParDoPayload)
 pickled_fn = pardo_payload.do_fn.spec.payload

 pickler.loads(pickle_fn) # Presumably the race happens here when
 unpickling one of your transforms
 (pricingrealtime.aggregation.aggregation_transform).


 On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar 
 wrote:

> Thanks Valentyn,
>
> Aggregation_transform.py doesn't have any transformation method which
> extends beam.DoFn. We are using plain python method which we passed in
> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
> please let me the process?
>
> I also heard that some people ran into this issue on Python 3.7.1 but
> the same issue is not present on Python 3.7.3. Can you confirm this?
>
>
>
> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <
> valen...@google.com> wrote:
>
>> +user@, bcc: dev@
>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing
>> to this issue, although we saw instances of this bug in exactly opposite
>> scenarios - when pipeline was defined *in one file*, but not in
>> multiple 

Re: Pipeline AttributeError on Python3

2019-11-21 Thread Thomas Weise
Valentyn, thanks a lot for following up on this.

If the change can be cherry picked in isolation, we should be able to
verify this soon (with 2.16).


On Thu, Nov 21, 2019 at 8:12 AM Valentyn Tymofieiev 
wrote:

> To close the loop here: To my knowledge this issue affects all Python 3
> users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
> including users on Python 3.7.3 and newer versions.
>
> The issue is addressed on Beam master, and we have a cherry-pick out for
> Beam 2.17.0.
>
> Workaround options for users on 2.16.0 and earlier SDKs:
>
> - Patch the SDK you are using with
> https://github.com/apache/beam/pull/10167.
> - Temporarily switch to Python 2 until 2.17.0. We have not seen the issue
> on Python 2, so it may be rare on non-existent on Python 2.
> - Pass --experiments worker_threads=1 . This option may work only for
> some, but not all pipelines.
>
> See BEAM-8651  for
> details on the issue.
>
> On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev 
> wrote:
>
>> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track
>> this issue and any recommendation for the users that will come out of it.
>>
>> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev 
>> wrote:
>>
>>>  I think we have heard of this issue from the same source:
>>>
>>> This looks exactly like a race condition that we've encountered on
 Python 3.7.1: There's a bug in some older 3.7.x releases that breaks the
 thread-safety of the unpickler, as concurrent unpickle threads can access a
 module before it has been fully imported. See
 https://bugs.python.org/issue34572 for more information.

 The traceback shows a Python 3.6 venv so this could be a different
 issue (the unpickle bug was introduced in version 3.7). If it's the same
 bug then upgrading to Python 3.7.3 or higher should fix that issue. One
 potential workaround is to ensure that all of the modules get imported
 during the initialization of the sdk_worker, as this bug only affects
 imports done by the unpickler.
>>>
>>>
>>> The symptoms do sound similar, so I would try to reproduce your issue on
>>> 3.7.3 and see if it is gone, or try to reproduce
>>> https://bugs.python.org/issue34572 in the version of interpreter you
>>> use. If this doesn't help, you can try to reproduce the race using your
>>> input.
>>>
>>> To get the output of serialized do fn, you could do the following:
>>> 1. Patch https://github.com/apache/beam/pull/10036.
>>> 2. Set logging level to DEBUG, see:
>>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
>>> .
>>> 3. Check for log output for payload of your transform, it may look like:
>>>
>>> transforms {
>>>   key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>>>   value {
>>> spec {
>>>   urn: "beam:transform:pardo:v1"
>>>   payload: "\n\347\006\n\275\006\n
>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
>>> 
>>>
>>> Then you can extract the output of pickled fn:
>>>
>>> from apache_beam.utils import proto_utils
>>> from apache_beam.portability.api import beam_runner_api_pb2
>>> from apache_beam.internal import pickler
>>>
>>> payload = b'\n\347\006\n\275\006\n
>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
>>> pardo_payload = proto_utils.parse_Bytes(x,
>>> beam_runner_api_pb2.ParDoPayload)
>>> pickled_fn = pardo_payload.do_fn.spec.payload
>>>
>>> pickler.loads(pickle_fn) # Presumably the race happens here when
>>> unpickling one of your transforms
>>> (pricingrealtime.aggregation.aggregation_transform).
>>>
>>>
>>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar 
>>> wrote:
>>>
 Thanks Valentyn,

 Aggregation_transform.py doesn't have any transformation method which
 extends beam.DoFn. We are using plain python method which we passed in
 beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
 please let me the process?

 I also heard that some people ran into this issue on Python 3.7.1 but
 the same issue is not present on Python 3.7.3. Can you confirm this?



 On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <
 valen...@google.com> wrote:

> +user@, bcc: dev@
> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing
> to this issue, although we saw instances of this bug in exactly opposite
> scenarios - when pipeline was defined *in one file*, but not in
> multiple files.
>
> Could you try replacing instances of super() in
> aggregation_transform.py  as done in
> https://github.com/apache/beam/pull/9513 and see if this issue is
> still reproducible?
>
> If that doesn't work, I would try to get the dump of serialized_fn,
> and try to reproduce the issue in isolated environment, such as:
>
> 

Re: Pipeline AttributeError on Python3

2019-11-21 Thread Valentyn Tymofieiev
To close the loop here: To my knowledge this issue affects all Python 3
users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
including users on Python 3.7.3 and newer versions.

The issue is addressed on Beam master, and we have a cherry-pick out for
Beam 2.17.0.

Workaround options for users on 2.16.0 and earlier SDKs:

- Patch the SDK you are using with https://github.com/apache/beam/pull/10167.

- Temporarily switch to Python 2 until 2.17.0. We have not seen the issue
on Python 2, so it may be rare on non-existent on Python 2.
- Pass --experiments worker_threads=1 . This option may work only for some,
but not all pipelines.

See BEAM-8651  for details
on the issue.

On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev 
wrote:

> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track
> this issue and any recommendation for the users that will come out of it.
>
> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev 
> wrote:
>
>>  I think we have heard of this issue from the same source:
>>
>> This looks exactly like a race condition that we've encountered on Python
>>> 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>>> thread-safety of the unpickler, as concurrent unpickle threads can access a
>>> module before it has been fully imported. See
>>> https://bugs.python.org/issue34572 for more information.
>>>
>>> The traceback shows a Python 3.6 venv so this could be a different issue
>>> (the unpickle bug was introduced in version 3.7). If it's the same bug then
>>> upgrading to Python 3.7.3 or higher should fix that issue. One potential
>>> workaround is to ensure that all of the modules get imported during the
>>> initialization of the sdk_worker, as this bug only affects imports done by
>>> the unpickler.
>>
>>
>> The symptoms do sound similar, so I would try to reproduce your issue on
>> 3.7.3 and see if it is gone, or try to reproduce
>> https://bugs.python.org/issue34572 in the version of interpreter you
>> use. If this doesn't help, you can try to reproduce the race using your
>> input.
>>
>> To get the output of serialized do fn, you could do the following:
>> 1. Patch https://github.com/apache/beam/pull/10036.
>> 2. Set logging level to DEBUG, see:
>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
>> .
>> 3. Check for log output for payload of your transform, it may look like:
>>
>> transforms {
>>   key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>>   value {
>> spec {
>>   urn: "beam:transform:pardo:v1"
>>   payload: "\n\347\006\n\275\006\n
>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
>> 
>>
>> Then you can extract the output of pickled fn:
>>
>> from apache_beam.utils import proto_utils
>> from apache_beam.portability.api import beam_runner_api_pb2
>> from apache_beam.internal import pickler
>>
>> payload = b'\n\347\006\n\275\006\n
>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
>> pardo_payload = proto_utils.parse_Bytes(x,
>> beam_runner_api_pb2.ParDoPayload)
>> pickled_fn = pardo_payload.do_fn.spec.payload
>>
>> pickler.loads(pickle_fn) # Presumably the race happens here when
>> unpickling one of your transforms
>> (pricingrealtime.aggregation.aggregation_transform).
>>
>>
>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar 
>> wrote:
>>
>>> Thanks Valentyn,
>>>
>>> Aggregation_transform.py doesn't have any transformation method which
>>> extends beam.DoFn. We are using plain python method which we passed in
>>> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>>> please let me the process?
>>>
>>> I also heard that some people ran into this issue on Python 3.7.1 but
>>> the same issue is not present on Python 3.7.3. Can you confirm this?
>>>
>>>
>>>
>>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev 
>>> wrote:
>>>
 +user@, bcc: dev@
 https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
 this issue, although we saw instances of this bug in exactly opposite
 scenarios - when pipeline was defined *in one file*, but not in
 multiple files.

 Could you try replacing instances of super() in
 aggregation_transform.py  as done in
 https://github.com/apache/beam/pull/9513 and see if this issue is
 still reproducible?

 If that doesn't work, I would try to get the dump of serialized_fn, and
 try to reproduce the issue in isolated environment, such as:

 form apache_beam.internal import pickler
 serialized_fn = "..content.."
 pickler.loads(serialized_fn)

 then I would try to trim the doFn in the example to a
 minimally-reproducible example. It could be another issue with dill
 dependency.


 On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar 
 wrote:

> Hi All,
>
> 

Re: Pipeline AttributeError on Python3

2019-11-13 Thread Valentyn Tymofieiev
I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track this
issue and any recommendation for the users that will come out of it.

On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev 
wrote:

>  I think we have heard of this issue from the same source:
>
> This looks exactly like a race condition that we've encountered on Python
>> 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>> thread-safety of the unpickler, as concurrent unpickle threads can access a
>> module before it has been fully imported. See
>> https://bugs.python.org/issue34572 for more information.
>>
>> The traceback shows a Python 3.6 venv so this could be a different issue
>> (the unpickle bug was introduced in version 3.7). If it's the same bug then
>> upgrading to Python 3.7.3 or higher should fix that issue. One potential
>> workaround is to ensure that all of the modules get imported during the
>> initialization of the sdk_worker, as this bug only affects imports done by
>> the unpickler.
>
>
> The symptoms do sound similar, so I would try to reproduce your issue on
> 3.7.3 and see if it is gone, or try to reproduce
> https://bugs.python.org/issue34572 in the version of interpreter you use.
> If this doesn't help, you can try to reproduce the race using your input.
>
> To get the output of serialized do fn, you could do the following:
> 1. Patch https://github.com/apache/beam/pull/10036.
> 2. Set logging level to DEBUG, see:
> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
> .
> 3. Check for log output for payload of your transform, it may look like:
>
> transforms {
>   key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>   value {
> spec {
>   urn: "beam:transform:pardo:v1"
>   payload: "\n\347\006\n\275\006\n
> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
> 
>
> Then you can extract the output of pickled fn:
>
> from apache_beam.utils import proto_utils
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.internal import pickler
>
> payload = b'\n\347\006\n\275\006\n
> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
> pardo_payload = proto_utils.parse_Bytes(x,
> beam_runner_api_pb2.ParDoPayload)
> pickled_fn = pardo_payload.do_fn.spec.payload
>
> pickler.loads(pickle_fn) # Presumably the race happens here when
> unpickling one of your transforms
> (pricingrealtime.aggregation.aggregation_transform).
>
>
> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar  wrote:
>
>> Thanks Valentyn,
>>
>> Aggregation_transform.py doesn't have any transformation method which
>> extends beam.DoFn. We are using plain python method which we passed in
>> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>> please let me the process?
>>
>> I also heard that some people ran into this issue on Python 3.7.1 but the
>> same issue is not present on Python 3.7.3. Can you confirm this?
>>
>>
>>
>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev 
>> wrote:
>>
>>> +user@, bcc: dev@
>>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
>>> this issue, although we saw instances of this bug in exactly opposite
>>> scenarios - when pipeline was defined *in one file*, but not in
>>> multiple files.
>>>
>>> Could you try replacing instances of super() in
>>> aggregation_transform.py  as done in
>>> https://github.com/apache/beam/pull/9513 and see if this issue is still
>>> reproducible?
>>>
>>> If that doesn't work, I would try to get the dump of serialized_fn, and
>>> try to reproduce the issue in isolated environment, such as:
>>>
>>> form apache_beam.internal import pickler
>>> serialized_fn = "..content.."
>>> pickler.loads(serialized_fn)
>>>
>>> then I would try to trim the doFn in the example to a
>>> minimally-reproducible example. It could be another issue with dill
>>> dependency.
>>>
>>>
>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar 
>>> wrote:
>>>
 Hi All,

 We have noticed a weird intermittent issue on Python3 but we don't run
 into this issue on python2. Sometimes when we are trying to submit the
 pipeline, we get AttributeError (Check the stack trace below).  we have
 double-checked and we do find the attribute/methods are present in the
 right module and in right place but somehow the pipeline still complains
 about it. In some cases, we refer methods before their definition. We tried
 to reorder the method definition but that didn't help at all.

 We don't see the same issue when the entire pipeline is defined in one
 file. Also, note that this doesn't happen all the time when we submit the
 pipeline, so I feel it is some kind of race condition. When we enable the
 worker recycle logic it happens most of the time when sdk worker is
 recycled.

 Some more information about the environment:
 Python version: 

Re: Pipeline AttributeError on Python3

2019-11-07 Thread Valentyn Tymofieiev
 I think we have heard of this issue from the same source:

This looks exactly like a race condition that we've encountered on Python
> 3.7.1: There's a bug in some older 3.7.x releases that breaks the
> thread-safety of the unpickler, as concurrent unpickle threads can access a
> module before it has been fully imported. See
> https://bugs.python.org/issue34572 for more information.
>
> The traceback shows a Python 3.6 venv so this could be a different issue
> (the unpickle bug was introduced in version 3.7). If it's the same bug then
> upgrading to Python 3.7.3 or higher should fix that issue. One potential
> workaround is to ensure that all of the modules get imported during the
> initialization of the sdk_worker, as this bug only affects imports done by
> the unpickler.


The symptoms do sound similar, so I would try to reproduce your issue on
3.7.3 and see if it is gone, or try to reproduce
https://bugs.python.org/issue34572 in the version of interpreter you use.
If this doesn't help, you can try to reproduce the race using your input.

To get the output of serialized do fn, you could do the following:
1. Patch https://github.com/apache/beam/pull/10036.
2. Set logging level to DEBUG, see:
https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
.
3. Check for log output for payload of your transform, it may look like:

transforms {
  key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
  value {
spec {
  urn: "beam:transform:pardo:v1"
  payload: "\n\347\006\n\275\006\n
beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT


Then you can extract the output of pickled fn:

from apache_beam.utils import proto_utils
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.internal import pickler

payload = b'\n\347\006\n\275\006\n
beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
pardo_payload = proto_utils.parse_Bytes(x, beam_runner_api_pb2.ParDoPayload)
pickled_fn = pardo_payload.do_fn.spec.payload

pickler.loads(pickle_fn) # Presumably the race happens here when unpickling
one of your transforms (pricingrealtime.aggregation.aggregation_transform).


On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar  wrote:

> Thanks Valentyn,
>
> Aggregation_transform.py doesn't have any transformation method which
> extends beam.DoFn. We are using plain python method which we passed in
> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
> please let me the process?
>
> I also heard that some people ran into this issue on Python 3.7.1 but the
> same issue is not present on Python 3.7.3. Can you confirm this?
>
>
>
> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev 
> wrote:
>
>> +user@, bcc: dev@
>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
>> this issue, although we saw instances of this bug in exactly opposite
>> scenarios - when pipeline was defined *in one file*, but not in multiple
>> files.
>>
>> Could you try replacing instances of super() in aggregation_transform.py
>> as done in https://github.com/apache/beam/pull/9513 and see if this
>> issue is still reproducible?
>>
>> If that doesn't work, I would try to get the dump of serialized_fn, and
>> try to reproduce the issue in isolated environment, such as:
>>
>> form apache_beam.internal import pickler
>> serialized_fn = "..content.."
>> pickler.loads(serialized_fn)
>>
>> then I would try to trim the doFn in the example to a
>> minimally-reproducible example. It could be another issue with dill
>> dependency.
>>
>>
>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar 
>> wrote:
>>
>>> Hi All,
>>>
>>> We have noticed a weird intermittent issue on Python3 but we don't run
>>> into this issue on python2. Sometimes when we are trying to submit the
>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>> double-checked and we do find the attribute/methods are present in the
>>> right module and in right place but somehow the pipeline still complains
>>> about it. In some cases, we refer methods before their definition. We tried
>>> to reorder the method definition but that didn't help at all.
>>>
>>> We don't see the same issue when the entire pipeline is defined in one
>>> file. Also, note that this doesn't happen all the time when we submit the
>>> pipeline, so I feel it is some kind of race condition. When we enable the
>>> worker recycle logic it happens most of the time when sdk worker is
>>> recycled.
>>>
>>> Some more information about the environment:
>>> Python version: 3
>>> Beam version: 2.16
>>> Flink version: 1.8
>>>
>>> *Stack trace: *
>>>
>>>- :
>>>
>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>> bundle}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>> at
>>> 

Re: Pipeline AttributeError on Python3

2019-11-06 Thread Rakesh Kumar
Thanks Valentyn,

Aggregation_transform.py doesn't have any transformation method which
extends beam.DoFn. We are using plain python method which we passed in
beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
please let me the process?

I also heard that some people ran into this issue on Python 3.7.1 but the
same issue is not present on Python 3.7.3. Can you confirm this?



On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev 
wrote:

> +user@, bcc: dev@
> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
> this issue, although we saw instances of this bug in exactly opposite
> scenarios - when pipeline was defined *in one file*, but not in multiple
> files.
>
> Could you try replacing instances of super() in aggregation_transform.py
> as done in https://github.com/apache/beam/pull/9513 and see if this issue
> is still reproducible?
>
> If that doesn't work, I would try to get the dump of serialized_fn, and
> try to reproduce the issue in isolated environment, such as:
>
> form apache_beam.internal import pickler
> serialized_fn = "..content.."
> pickler.loads(serialized_fn)
>
> then I would try to trim the doFn in the example to a
> minimally-reproducible example. It could be another issue with dill
> dependency.
>
>
> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar  wrote:
>
>> Hi All,
>>
>> We have noticed a weird intermittent issue on Python3 but we don't run
>> into this issue on python2. Sometimes when we are trying to submit the
>> pipeline, we get AttributeError (Check the stack trace below).  we have
>> double-checked and we do find the attribute/methods are present in the
>> right module and in right place but somehow the pipeline still complains
>> about it. In some cases, we refer methods before their definition. We tried
>> to reorder the method definition but that didn't help at all.
>>
>> We don't see the same issue when the entire pipeline is defined in one
>> file. Also, note that this doesn't happen all the time when we submit the
>> pipeline, so I feel it is some kind of race condition. When we enable the
>> worker recycle logic it happens most of the time when sdk worker is
>> recycled.
>>
>> Some more information about the environment:
>> Python version: 3
>> Beam version: 2.16
>> Flink version: 1.8
>>
>> *Stack trace: *
>>
>>- :
>>
>> TimerException{java.lang.RuntimeException: Failed to finish remote bundle}
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>> at
>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>> at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>> at
>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>> ... 7 more
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.RuntimeException: Error received from SDK harness for instruction
>> 6: Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 307, in get
>> processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>> IndexError: pop from empty list
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most 

Re: Pipeline AttributeError on Python3

2019-10-28 Thread Valentyn Tymofieiev
+user@, bcc: dev@
https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to this
issue, although we saw instances of this bug in exactly opposite scenarios
- when pipeline was defined *in one file*, but not in multiple files.

Could you try replacing instances of super() in aggregation_transform.py
as done in https://github.com/apache/beam/pull/9513 and see if this issue
is still reproducible?

If that doesn't work, I would try to get the dump of serialized_fn, and try
to reproduce the issue in isolated environment, such as:

form apache_beam.internal import pickler
serialized_fn = "..content.."
pickler.loads(serialized_fn)

then I would try to trim the doFn in the example to a
minimally-reproducible example. It could be another issue with dill
dependency.


On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar  wrote:

> Hi All,
>
> We have noticed a weird intermittent issue on Python3 but we don't run
> into this issue on python2. Sometimes when we are trying to submit the
> pipeline, we get AttributeError (Check the stack trace below).  we have
> double-checked and we do find the attribute/methods are present in the
> right module and in right place but somehow the pipeline still complains
> about it. In some cases, we refer methods before their definition. We tried
> to reorder the method definition but that didn't help at all.
>
> We don't see the same issue when the entire pipeline is defined in one
> file. Also, note that this doesn't happen all the time when we submit the
> pipeline, so I feel it is some kind of race condition. When we enable the
> worker recycle logic it happens most of the time when sdk worker is
> recycled.
>
> Some more information about the environment:
> Python version: 3
> Beam version: 2.16
> Flink version: 1.8
>
> *Stack trace: *
>
>- :
>
> TimerException{java.lang.RuntimeException: Failed to finish remote bundle}
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
> at
> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
> at
> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
> ... 7 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error received from SDK harness for instruction
> 6: Traceback (most recent call last):
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 307, in get
> processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
> IndexError: pop from empty list
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
> line 261, in loads
> return dill.loads(s)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
> line 317, in loads
> return load(file, ignore)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
> line 305, in load
> obj = pik.load()
>   File
> 

Pipeline AttributeError on Python3

2019-10-28 Thread Rakesh Kumar
Hi All,

We have noticed a weird intermittent issue on Python3 but we don't run into
this issue on python2. Sometimes when we are trying to submit the pipeline,
we get AttributeError (Check the stack trace below).  we have
double-checked and we do find the attribute/methods are present in the
right module and in right place but somehow the pipeline still complains
about it. In some cases, we refer methods before their definition. We tried
to reorder the method definition but that didn't help at all.

We don't see the same issue when the entire pipeline is defined in one
file. Also, note that this doesn't happen all the time when we submit the
pipeline, so I feel it is some kind of race condition. When we enable the
worker recycle logic it happens most of the time when sdk worker is
recycled.

Some more information about the environment:
Python version: 3
Beam version: 2.16
Flink version: 1.8

*Stack trace: *

   - :

TimerException{java.lang.RuntimeException: Failed to finish remote bundle}
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to finish remote bundle
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
at
org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
at
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
... 7 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Error received from SDK harness for instruction
6: Traceback (most recent call last):
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 307, in get
processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
line 261, in loads
return dill.loads(s)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
line 317, in loads
return load(file, ignore)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
line 305, in load
obj = pik.load()
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
line 474, in find_class
return StockUnpickler.find_class(self, module, name)
*AttributeError: Can't get attribute '_timestamp_keyed_result' on *

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 165, in _execute
response = task()
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 198, in 
self._execute(lambda: worker.do_instruction(work), work)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 351, in do_instruction
request.instruction_id)
  File
"/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 371, in process_bundle