Re: Beam Python streaming pipeline on Flink Runner

2019-01-31 Thread Robert Bradshaw
And, as Max says, this is an SDF that wraps a BoundedSource or
UnboundedSource respectively. The other way around is not possible, as
SDF is strictly more powerful.

On Thu, Jan 31, 2019 at 3:52 PM Robert Bradshaw  wrote:
>
> On Thu, Jan 31, 2019 at 3:17 PM Ismaël Mejía  wrote:
> >
> > > Not necessarily. This would be one way. Another way is build an SDF 
> > > wrapper for UnboundedSource. Probably the easier path for migration.
> >
> > That would be fantastic, I have heard about such wrapper multiple
> > times but so far there is not any realistic proposal. I have a hard
> > time to imagine how can we map in a generic way RestrictionTrackers
> > into the existing Bounded/UnboundedSource, so I would love to hear
> > more about the details.
>
> For BoundedSource the restriction is the BoundedSource object itself
> (which splits into multiple distinct bounded sources) with a tracker
> that forwards split calls to the reader, and the body of process would
> read from this reader to completion (never explicitly claiming
> positions from the reader).
>
> For unbounded sources, the restriction tracker always returns true on
> try_claim, false on try_split, and the process method returns current
> elements until either advance or try_claim returns false or try_split
> was called at 0, and the checkpoint mark is returned as the checkpoint
> residual.
>
> Initial splitting and sizing just forwards the calls.
>
> > On Thu, Jan 31, 2019 at 3:07 PM Maximilian Michels  wrote:
> > >
> > >  > In addition to have support in the runners, this will require a
> > >  > rewrite of PubsubIO to use the new SDF API.
> > >
> > > Not necessarily. This would be one way. Another way is build an SDF 
> > > wrapper for
> > > UnboundedSource. Probably the easier path for migration.
> > >
> > > On 31.01.19 14:03, Ismaël Mejía wrote:
> > > >> Fortunately, there is already a pending PR for cross-language 
> > > >> pipelines which
> > > >> will allow us to use Java IO like PubSub in Python jobs.
> > > >
> > > > In addition to have support in the runners, this will require a
> > > > rewrite of PubsubIO to use the new SDF API.
> > > >
> > > > On Thu, Jan 31, 2019 at 12:23 PM Maximilian Michels  
> > > > wrote:
> > > >>
> > > >> Hi Matthias,
> > > >>
> > > >> This is already reflected in the compatibility matrix, if you look 
> > > >> under SDF.
> > > >> There is no UnboundedSource interface for portable pipelines. That's a 
> > > >> legacy
> > > >> abstraction that will be replaced with SDF.
> > > >>
> > > >> Fortunately, there is already a pending PR for cross-language 
> > > >> pipelines which
> > > >> will allow us to use Java IO like PubSub in Python jobs.
> > > >>
> > > >> Thanks,
> > > >> Max
> > > >>
> > > >> On 31.01.19 12:06, Matthias Baetens wrote:
> > > >>> Hey Ankur,
> > > >>>
> > > >>> Thanks for the swift reply. Should I change this in the capability 
> > > >>> matrix
> > > >>>  then?
> > > >>>
> > > >>> Many thanks.
> > > >>> Best,
> > > >>> Matthias
> > > >>>
> > > >>> On Thu, 31 Jan 2019 at 09:31, Ankur Goenka  > > >>> > wrote:
> > > >>>
> > > >>>  Hi Matthias,
> > > >>>
> > > >>>  Unfortunately, unbounded reads including pubsub are not yet 
> > > >>> supported for
> > > >>>  portable runners.
> > > >>>
> > > >>>  Thanks,
> > > >>>  Ankur
> > > >>>
> > > >>>  On Thu, Jan 31, 2019 at 2:44 PM Matthias Baetens 
> > > >>>  > > >>>  > wrote:
> > > >>>
> > > >>>  Hi everyone,
> > > >>>
> > > >>>  Last few days I have been trying to run a streaming pipeline 
> > > >>> (code on
> > > >>>  Github ) on a Flink 
> > > >>> Runner.
> > > >>>
> > > >>>  I am running a Flink cluster locally (v1.5.6
> > > >>>  )
> > > >>>  I have built the SDK Harness Container: /./gradlew
> > > >>>  :beam-sdks-python-container:docker/
> > > >>>  and started the JobServer: /./gradlew
> > > >>>  :beam-runners-flink_2.11-job-server:runShadow
> > > >>>  -PflinkMasterUrl=localhost:8081./
> > > >>>
> > > >>>  I run my pipeline with: /env/bin/python streaming_pipeline.py
> > > >>>  --runner=PortableRunner --job_endpoint=localhost:8099 
> > > >>> --output xxx
> > > >>>  --input_subscription xxx --output_subscription xxx/
> > > >>>  /
> > > >>>  /
> > > >>>  All this is running inside a Ubuntu (Bionic) in a Virtualbox.
> > > >>>
> > > >>>  The job submits fine, but unfortunately fails after a few 
> > > >>> seconds with
> > > >>>  the error attached.
> > > >>>
> > > >>>  Anything I am missing or doing wrong?
> > > >>>
> > > >>>  Many thanks.
> > > >>>  Best,
> > > >>>  Matthias
> > > >>>
> > > >>>


Re: Beam Python streaming pipeline on Flink Runner

2019-01-31 Thread Robert Bradshaw
On Thu, Jan 31, 2019 at 3:17 PM Ismaël Mejía  wrote:
>
> > Not necessarily. This would be one way. Another way is build an SDF wrapper 
> > for UnboundedSource. Probably the easier path for migration.
>
> That would be fantastic, I have heard about such wrapper multiple
> times but so far there is not any realistic proposal. I have a hard
> time to imagine how can we map in a generic way RestrictionTrackers
> into the existing Bounded/UnboundedSource, so I would love to hear
> more about the details.

For BoundedSource the restriction is the BoundedSource object itself
(which splits into multiple distinct bounded sources) with a tracker
that forwards split calls to the reader, and the body of process would
read from this reader to completion (never explicitly claiming
positions from the reader).

For unbounded sources, the restriction tracker always returns true on
try_claim, false on try_split, and the process method returns current
elements until either advance or try_claim returns false or try_split
was called at 0, and the checkpoint mark is returned as the checkpoint
residual.

Initial splitting and sizing just forwards the calls.

> On Thu, Jan 31, 2019 at 3:07 PM Maximilian Michels  wrote:
> >
> >  > In addition to have support in the runners, this will require a
> >  > rewrite of PubsubIO to use the new SDF API.
> >
> > Not necessarily. This would be one way. Another way is build an SDF wrapper 
> > for
> > UnboundedSource. Probably the easier path for migration.
> >
> > On 31.01.19 14:03, Ismaël Mejía wrote:
> > >> Fortunately, there is already a pending PR for cross-language pipelines 
> > >> which
> > >> will allow us to use Java IO like PubSub in Python jobs.
> > >
> > > In addition to have support in the runners, this will require a
> > > rewrite of PubsubIO to use the new SDF API.
> > >
> > > On Thu, Jan 31, 2019 at 12:23 PM Maximilian Michels  
> > > wrote:
> > >>
> > >> Hi Matthias,
> > >>
> > >> This is already reflected in the compatibility matrix, if you look under 
> > >> SDF.
> > >> There is no UnboundedSource interface for portable pipelines. That's a 
> > >> legacy
> > >> abstraction that will be replaced with SDF.
> > >>
> > >> Fortunately, there is already a pending PR for cross-language pipelines 
> > >> which
> > >> will allow us to use Java IO like PubSub in Python jobs.
> > >>
> > >> Thanks,
> > >> Max
> > >>
> > >> On 31.01.19 12:06, Matthias Baetens wrote:
> > >>> Hey Ankur,
> > >>>
> > >>> Thanks for the swift reply. Should I change this in the capability 
> > >>> matrix
> > >>>  then?
> > >>>
> > >>> Many thanks.
> > >>> Best,
> > >>> Matthias
> > >>>
> > >>> On Thu, 31 Jan 2019 at 09:31, Ankur Goenka  > >>> > wrote:
> > >>>
> > >>>  Hi Matthias,
> > >>>
> > >>>  Unfortunately, unbounded reads including pubsub are not yet 
> > >>> supported for
> > >>>  portable runners.
> > >>>
> > >>>  Thanks,
> > >>>  Ankur
> > >>>
> > >>>  On Thu, Jan 31, 2019 at 2:44 PM Matthias Baetens 
> > >>>  > >>>  > wrote:
> > >>>
> > >>>  Hi everyone,
> > >>>
> > >>>  Last few days I have been trying to run a streaming pipeline 
> > >>> (code on
> > >>>  Github ) on a Flink 
> > >>> Runner.
> > >>>
> > >>>  I am running a Flink cluster locally (v1.5.6
> > >>>  )
> > >>>  I have built the SDK Harness Container: /./gradlew
> > >>>  :beam-sdks-python-container:docker/
> > >>>  and started the JobServer: /./gradlew
> > >>>  :beam-runners-flink_2.11-job-server:runShadow
> > >>>  -PflinkMasterUrl=localhost:8081./
> > >>>
> > >>>  I run my pipeline with: /env/bin/python streaming_pipeline.py
> > >>>  --runner=PortableRunner --job_endpoint=localhost:8099 --output 
> > >>> xxx
> > >>>  --input_subscription xxx --output_subscription xxx/
> > >>>  /
> > >>>  /
> > >>>  All this is running inside a Ubuntu (Bionic) in a Virtualbox.
> > >>>
> > >>>  The job submits fine, but unfortunately fails after a few 
> > >>> seconds with
> > >>>  the error attached.
> > >>>
> > >>>  Anything I am missing or doing wrong?
> > >>>
> > >>>  Many thanks.
> > >>>  Best,
> > >>>  Matthias
> > >>>
> > >>>


Re: Beam Python streaming pipeline on Flink Runner

2019-01-31 Thread Maximilian Michels

I have a hard time to imagine how can we map in a generic way 
RestrictionTrackers into the existing Bounded/UnboundedSource, so I would love 
to hear more about the details.


Isn't it the other way around? The SDF is a generalization of UnboundedSource. 
So we would wrap UnboundedSource using SDF. I'm not saying it is trivial, but 
SDF offers all the functionality that UnboundedSource needs.


For example, the @GetInitialRestriction method would call split on the 
UnboundedSource and the restriction trackers would then be used to process the 
splits.


On 31.01.19 15:16, Ismaël Mejía wrote:

Not necessarily. This would be one way. Another way is build an SDF wrapper for 
UnboundedSource. Probably the easier path for migration.


That would be fantastic, I have heard about such wrapper multiple
times but so far there is not any realistic proposal. I have a hard
time to imagine how can we map in a generic way RestrictionTrackers
into the existing Bounded/UnboundedSource, so I would love to hear
more about the details.

On Thu, Jan 31, 2019 at 3:07 PM Maximilian Michels  wrote:


  > In addition to have support in the runners, this will require a
  > rewrite of PubsubIO to use the new SDF API.

Not necessarily. This would be one way. Another way is build an SDF wrapper for
UnboundedSource. Probably the easier path for migration.

On 31.01.19 14:03, Ismaël Mejía wrote:

Fortunately, there is already a pending PR for cross-language pipelines which
will allow us to use Java IO like PubSub in Python jobs.


In addition to have support in the runners, this will require a
rewrite of PubsubIO to use the new SDF API.

On Thu, Jan 31, 2019 at 12:23 PM Maximilian Michels  wrote:


Hi Matthias,

This is already reflected in the compatibility matrix, if you look under SDF.
There is no UnboundedSource interface for portable pipelines. That's a legacy
abstraction that will be replaced with SDF.

Fortunately, there is already a pending PR for cross-language pipelines which
will allow us to use Java IO like PubSub in Python jobs.

Thanks,
Max

On 31.01.19 12:06, Matthias Baetens wrote:

Hey Ankur,

Thanks for the swift reply. Should I change this in the capability matrix
 then?

Many thanks.
Best,
Matthias

On Thu, 31 Jan 2019 at 09:31, Ankur Goenka mailto:goe...@google.com>> wrote:

  Hi Matthias,

  Unfortunately, unbounded reads including pubsub are not yet supported for
  portable runners.

  Thanks,
  Ankur

  On Thu, Jan 31, 2019 at 2:44 PM Matthias Baetens 
mailto:baetensmatth...@gmail.com>> wrote:

  Hi everyone,

  Last few days I have been trying to run a streaming pipeline (code on
  Github ) on a Flink Runner.

  I am running a Flink cluster locally (v1.5.6
  )
  I have built the SDK Harness Container: /./gradlew
  :beam-sdks-python-container:docker/
  and started the JobServer: /./gradlew
  :beam-runners-flink_2.11-job-server:runShadow
  -PflinkMasterUrl=localhost:8081./

  I run my pipeline with: /env/bin/python streaming_pipeline.py
  --runner=PortableRunner --job_endpoint=localhost:8099 --output xxx
  --input_subscription xxx --output_subscription xxx/
  /
  /
  All this is running inside a Ubuntu (Bionic) in a Virtualbox.

  The job submits fine, but unfortunately fails after a few seconds with
  the error attached.

  Anything I am missing or doing wrong?

  Many thanks.
  Best,
  Matthias




Re: Beam Python streaming pipeline on Flink Runner

2019-01-31 Thread Maximilian Michels

> In addition to have support in the runners, this will require a
> rewrite of PubsubIO to use the new SDF API.

Not necessarily. This would be one way. Another way is build an SDF wrapper for 
UnboundedSource. Probably the easier path for migration.


On 31.01.19 14:03, Ismaël Mejía wrote:

Fortunately, there is already a pending PR for cross-language pipelines which
will allow us to use Java IO like PubSub in Python jobs.


In addition to have support in the runners, this will require a
rewrite of PubsubIO to use the new SDF API.

On Thu, Jan 31, 2019 at 12:23 PM Maximilian Michels  wrote:


Hi Matthias,

This is already reflected in the compatibility matrix, if you look under SDF.
There is no UnboundedSource interface for portable pipelines. That's a legacy
abstraction that will be replaced with SDF.

Fortunately, there is already a pending PR for cross-language pipelines which
will allow us to use Java IO like PubSub in Python jobs.

Thanks,
Max

On 31.01.19 12:06, Matthias Baetens wrote:

Hey Ankur,

Thanks for the swift reply. Should I change this in the capability matrix
 then?

Many thanks.
Best,
Matthias

On Thu, 31 Jan 2019 at 09:31, Ankur Goenka mailto:goe...@google.com>> wrote:

 Hi Matthias,

 Unfortunately, unbounded reads including pubsub are not yet supported for
 portable runners.

 Thanks,
 Ankur

 On Thu, Jan 31, 2019 at 2:44 PM Matthias Baetens mailto:baetensmatth...@gmail.com>> wrote:

 Hi everyone,

 Last few days I have been trying to run a streaming pipeline (code on
 Github ) on a Flink Runner.

 I am running a Flink cluster locally (v1.5.6
 )
 I have built the SDK Harness Container: /./gradlew
 :beam-sdks-python-container:docker/
 and started the JobServer: /./gradlew
 :beam-runners-flink_2.11-job-server:runShadow
 -PflinkMasterUrl=localhost:8081./

 I run my pipeline with: /env/bin/python streaming_pipeline.py
 --runner=PortableRunner --job_endpoint=localhost:8099 --output xxx
 --input_subscription xxx --output_subscription xxx/
 /
 /
 All this is running inside a Ubuntu (Bionic) in a Virtualbox.

 The job submits fine, but unfortunately fails after a few seconds with
 the error attached.

 Anything I am missing or doing wrong?

 Many thanks.
 Best,
 Matthias




Re: Beam Python streaming pipeline on Flink Runner

2019-01-31 Thread Ismaël Mejía
> Fortunately, there is already a pending PR for cross-language pipelines which
> will allow us to use Java IO like PubSub in Python jobs.

In addition to have support in the runners, this will require a
rewrite of PubsubIO to use the new SDF API.

On Thu, Jan 31, 2019 at 12:23 PM Maximilian Michels  wrote:
>
> Hi Matthias,
>
> This is already reflected in the compatibility matrix, if you look under SDF.
> There is no UnboundedSource interface for portable pipelines. That's a legacy
> abstraction that will be replaced with SDF.
>
> Fortunately, there is already a pending PR for cross-language pipelines which
> will allow us to use Java IO like PubSub in Python jobs.
>
> Thanks,
> Max
>
> On 31.01.19 12:06, Matthias Baetens wrote:
> > Hey Ankur,
> >
> > Thanks for the swift reply. Should I change this in the capability matrix
> >  then?
> >
> > Many thanks.
> > Best,
> > Matthias
> >
> > On Thu, 31 Jan 2019 at 09:31, Ankur Goenka  > > wrote:
> >
> > Hi Matthias,
> >
> > Unfortunately, unbounded reads including pubsub are not yet supported 
> > for
> > portable runners.
> >
> > Thanks,
> > Ankur
> >
> > On Thu, Jan 31, 2019 at 2:44 PM Matthias Baetens 
> >  > > wrote:
> >
> > Hi everyone,
> >
> > Last few days I have been trying to run a streaming pipeline (code 
> > on
> > Github ) on a Flink Runner.
> >
> > I am running a Flink cluster locally (v1.5.6
> > )
> > I have built the SDK Harness Container: /./gradlew
> > :beam-sdks-python-container:docker/
> > and started the JobServer: /./gradlew
> > :beam-runners-flink_2.11-job-server:runShadow
> > -PflinkMasterUrl=localhost:8081./
> >
> > I run my pipeline with: /env/bin/python streaming_pipeline.py
> > --runner=PortableRunner --job_endpoint=localhost:8099 --output xxx
> > --input_subscription xxx --output_subscription xxx/
> > /
> > /
> > All this is running inside a Ubuntu (Bionic) in a Virtualbox.
> >
> > The job submits fine, but unfortunately fails after a few seconds 
> > with
> > the error attached.
> >
> > Anything I am missing or doing wrong?
> >
> > Many thanks.
> > Best,
> > Matthias
> >
> >


Re: Beam Python streaming pipeline on Flink Runner

2019-01-31 Thread Maximilian Michels

Hi Matthias,

This is already reflected in the compatibility matrix, if you look under SDF. 
There is no UnboundedSource interface for portable pipelines. That's a legacy 
abstraction that will be replaced with SDF.


Fortunately, there is already a pending PR for cross-language pipelines which 
will allow us to use Java IO like PubSub in Python jobs.


Thanks,
Max

On 31.01.19 12:06, Matthias Baetens wrote:

Hey Ankur,

Thanks for the swift reply. Should I change this in the capability matrix 
 then?


Many thanks.
Best,
Matthias

On Thu, 31 Jan 2019 at 09:31, Ankur Goenka > wrote:


Hi Matthias,

Unfortunately, unbounded reads including pubsub are not yet supported for
portable runners.

Thanks,
Ankur

On Thu, Jan 31, 2019 at 2:44 PM Matthias Baetens mailto:baetensmatth...@gmail.com>> wrote:

Hi everyone,

Last few days I have been trying to run a streaming pipeline (code on
Github ) on a Flink Runner.

I am running a Flink cluster locally (v1.5.6
)
I have built the SDK Harness Container: /./gradlew
:beam-sdks-python-container:docker/
and started the JobServer: /./gradlew
:beam-runners-flink_2.11-job-server:runShadow
-PflinkMasterUrl=localhost:8081./

I run my pipeline with: /env/bin/python streaming_pipeline.py
--runner=PortableRunner --job_endpoint=localhost:8099 --output xxx
--input_subscription xxx --output_subscription xxx/
/
/
All this is running inside a Ubuntu (Bionic) in a Virtualbox.

The job submits fine, but unfortunately fails after a few seconds with
the error attached.

Anything I am missing or doing wrong?

Many thanks.
Best,
Matthias




Re: Beam Python streaming pipeline on Flink Runner

2019-01-31 Thread Matthias Baetens
Hey Ankur,

Thanks for the swift reply. Should I change this in the capability matrix
 then?

Many thanks.
Best,
Matthias

On Thu, 31 Jan 2019 at 09:31, Ankur Goenka  wrote:

> Hi Matthias,
>
> Unfortunately, unbounded reads including pubsub are not yet supported for
> portable runners.
>
> Thanks,
> Ankur
>
> On Thu, Jan 31, 2019 at 2:44 PM Matthias Baetens <
> baetensmatth...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> Last few days I have been trying to run a streaming pipeline (code on
>> Github ) on a Flink Runner.
>>
>> I am running a Flink cluster locally (v1.5.6
>> )
>> I have built the SDK Harness Container: *./gradlew
>> :beam-sdks-python-container:docker*
>> and started the JobServer: *./gradlew
>> :beam-runners-flink_2.11-job-server:runShadow
>> -PflinkMasterUrl=localhost:8081.*
>>
>> I run my pipeline with: *env/bin/python streaming_pipeline.py
>> --runner=PortableRunner --job_endpoint=localhost:8099 --output xxx
>> --input_subscription xxx --output_subscription xxx*
>>
>> All this is running inside a Ubuntu (Bionic) in a Virtualbox.
>>
>> The job submits fine, but unfortunately fails after a few seconds with
>> the error attached.
>>
>> Anything I am missing or doing wrong?
>>
>> Many thanks.
>> Best,
>> Matthias
>>
>>
>>


Re: Beam Python streaming pipeline on Flink Runner

2019-01-31 Thread Ankur Goenka
Hi Matthias,

Unfortunately, unbounded reads including pubsub are not yet supported for
portable runners.

Thanks,
Ankur

On Thu, Jan 31, 2019 at 2:44 PM Matthias Baetens 
wrote:

> Hi everyone,
>
> Last few days I have been trying to run a streaming pipeline (code on
> Github ) on a Flink Runner.
>
> I am running a Flink cluster locally (v1.5.6
> )
> I have built the SDK Harness Container: *./gradlew
> :beam-sdks-python-container:docker*
> and started the JobServer: *./gradlew
> :beam-runners-flink_2.11-job-server:runShadow
> -PflinkMasterUrl=localhost:8081.*
>
> I run my pipeline with: *env/bin/python streaming_pipeline.py
> --runner=PortableRunner --job_endpoint=localhost:8099 --output xxx
> --input_subscription xxx --output_subscription xxx*
>
> All this is running inside a Ubuntu (Bionic) in a Virtualbox.
>
> The job submits fine, but unfortunately fails after a few seconds with the
> error attached.
>
> Anything I am missing or doing wrong?
>
> Many thanks.
> Best,
> Matthias
>
>
>


Beam Python streaming pipeline on Flink Runner

2019-01-31 Thread Matthias Baetens
Hi everyone,

Last few days I have been trying to run a streaming pipeline (code on Github
) on a Flink Runner.

I am running a Flink cluster locally (v1.5.6
)
I have built the SDK Harness Container: *./gradlew
:beam-sdks-python-container:docker*
and started the JobServer: *./gradlew
:beam-runners-flink_2.11-job-server:runShadow
-PflinkMasterUrl=localhost:8081.*

I run my pipeline with: *env/bin/python streaming_pipeline.py
--runner=PortableRunner --job_endpoint=localhost:8099 --output xxx
--input_subscription xxx --output_subscription xxx*

All this is running inside a Ubuntu (Bionic) in a Virtualbox.

The job submits fine, but unfortunately fails after a few seconds with the
error attached.

Anything I am missing or doing wrong?

Many thanks.
Best,
Matthias
TimerException{java.lang.RuntimeException: Failed to finish remote bundle}
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to finish remote bundle
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:624)
at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:87)
at 
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:679)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:673)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:378)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
... 7 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
Error received from SDK harness for instruction 5: Traceback (most recent call 
last):
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 148, in _execute
response = task()
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 183, in 
self._execute(lambda: worker.do_instruction(work), work)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 256, in do_instruction
request.instruction_id)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 272, in process_bundle
bundle_processor.process_bundle(instruction_id)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 489, in process_bundle
].process_encoded(data.data)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 126, in process_encoded
self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 182, in 
apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
  File "apache_beam/runners/worker/operations.py", line 183, in 
apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 89, in 
apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 497, in 
apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 498, in 
apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 678, in 
apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
  File "apache_beam/runners/common.py", line 684, in 
apache_beam.runners.common.DoFnRunner.process