Re: java.io.NotSerializableException: org.apache.kafka.common.TopicPartition

2016-08-21 Thread Chawla,Sumit
Thanks Dan.. today i was able to identify what the issue was.  Kafka
TopicPartition is marked Serializable in kafka-clients-0.9.0.1.jar.
Somehow i was pulling down kafka-clients-0.9.0.0.jar.

Regards
Sumit Chawla


On Sun, Aug 21, 2016 at 10:20 AM, Dan Halperin 
wrote:

> Explicit +Raghu
>
> On Fri, Aug 19, 2016 at 4:24 PM, Chawla,Sumit 
> wrote:
>
> > Hi All
> >
> > I am trying to use KafkaIO as unbounded source, but the translation is
> > failing.  I am using FlinkRunner for the pipe.  It complains about
> > the org.apache.kafka.common.TopicPartition being not-serializable.
> >
> > pipeline.apply(
> > KafkaIO.read()
> > .withTopics(ImmutableList.of("test-topic))
> > .withBootstrapServers("localhost:9200")
> >
> > Is it a known issue?  Here is the full exception details.
> >
> > Caused by: org.apache.flink.api.common.InvalidProgramException: Object
> > org.apache.beam.runners.flink.translation.wrappers.streaming.io.
> > UnboundedSourceWrapper@2a855331
> > not serializable
> > at
> > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(
> > ClosureCleaner.java:99)
> > ~[flink-java-1.0.3.jar:1.0.3]
> > at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:61)
> > ~[flink-java-1.0.3.jar:1.0.3]
> > at
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> > clean(StreamExecutionEnvironment.java:1219)
> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> > at
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> > addSource(StreamExecutionEnvironment.java:1131)
> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> > at
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> > addSource(StreamExecutionEnvironment.java:1075)
> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> > at
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> > addSource(StreamExecutionEnvironment.java:1057)
> > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> > at
> > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla
> > tors$UnboundedReadSourceTranslator.translateNode(
> > FlinkStreamingTransformTranslators.java:281)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla
> > tors$UnboundedReadSourceTranslator.translateNode(
> > FlinkStreamingTransformTranslators.java:244)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat
> > or.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:106)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat
> > or.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:87)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > TransformTreeNode.java:225)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > TransformTreeNode.java:220)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.sdk.runners.TransformTreeNode.visit(
> > TransformTreeNode.java:220)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.sdk.runners.TransformHierarchy.visit(
> > TransformHierarchy.java:104)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.
> > translate(FlinkPipelineTranslator.java:38)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironm
> ent.translate(
> > FlinkPipelineExecutionEnvironment.java:106)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.FlinkPipelineRunner.run(
> > FlinkPipelineRunner.java:106)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > org.apache.beam.runners.flink.FlinkPipelineRunner.run(
> > FlinkPipelineRunner.java:49)
> > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> > at
> > com.cisco.ndp.pipeline.common.runner.flink.PipelineMain.
> > main(PipelineMain.java:70)
> > ~[pipelinecommon-1.0.0.jar:?]
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > ~[?:1.8.0_92]
> > at
> > sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:
> > 62)
> > 

Re: [PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn

2016-08-21 Thread Eugene Kirpichov
Hi JB,

Yes, I'm assuming you're referring to the "magic" part on the transform
expansion diagram. This is indeed runner-specific, and timers+state are
likely the simplest way to do this for an SDF that does unbounded amount of
work.

On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré 
wrote:

> Anyway, from a runner perspective, we will have kind of API (part of the
> Runner API) to "orchestrate" the SDF as we discussed during the call,
> right ?
>
> Regards
> JB
>
> On 08/21/2016 07:24 PM, Eugene Kirpichov wrote:
> > Hi Aljoscha,
> > This is an excellent question! And the answer is, we don't need any new
> > concepts like "SDF executor" and can rely on the per-key state and timers
> > machinery that already exists in all runners because it's necessary to
> > implement windowing/triggering properly.
> >
> > Note that this is already somewhat addressed in the previously posted
> State
> > and Timers proposal https://s.apache.org/beam-state , under "per-key
> > workflows".
> >
> > Think of it this way, using the Kafka example: we'll expand it into a
> > transform:
> >
> > (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for
> > partition in topic.listPartitions() }
> > (2) GroupByKey
> > (3) ParDo { key, topic, partition, R -> Kafka reader code in the
> > proposal/slides }
> >   - R is the OffsetRange restriction which in this case will be always of
> > the form [startOffset, inf).
> >   - there'll be just 1 value per key, but we use GBK to just get access
> to
> > the per-key state/timers machinery. This may be runner-specific; maybe
> some
> > runners don't need a GBK to do that.
> >
> > Now suppose the topic has two partitions, P1 and P2, and they get
> assigned
> > unique keys K1, K2.
> > Then the input to (3) will be a collection of: (K1, topic, P1, [0, inf)),
> > (K2, topic, P2, [0, inf)).
> > Suppose we have just 1 worker with just 1 thread. Now, how will this
> thread
> > be able to produce elements from both P1 and P2? here's how.
> >
> > The thread will process (K1, topic, P1, [0, inf)), checkpoint after a
> > certain time or after a certain number of elements are output (just like
> > with the current UnboundedSource reading code) producing a residual
> > restriction R1' (basically a new start timestamp), put R11 into the
> per-key
> > state and set a timer T1 to resume.
> > Then it will process (K2, topic, P2, [0, inf)), do the same producing a
> > residual restriction R2' and setting a timer T2 to resume.
> > Then timer T1 will fire in the context of the key K1. The thread will
> call
> > processElement again, this time supplying R1' as the restriction; the
> > process repeats and after a while it checkpoints and stores R1'' into
> state
> > of K1.
> > Then timer T2 will fire in the context of K2, run processElement for a
> > while, set a new timer and store R2'' into the state of K2.
> > Etc.
> > If partition 1 goes away, the processElement call will return "do not
> > resume", so a timer will not be set and instead the state associated with
> > K1 will be GC'd.
> >
> > So basically it's almost like cooperative thread scheduling: things run
> for
> > a while, until the runner tells them to checkpoint, then they set a timer
> > to resume themselves, and the runner fires the timers, and the process
> > repeats. And, again, this only requires things that runners can already
> do
> > - state and timers, but no new concept of SDF executor (and consequently
> no
> > necessity to choose/tune how many you need).
> >
> > Makes sense?
> >
> > On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek 
> > wrote:
> >
> >> Hi,
> >> I have another question that I think wasn't addressed in the meeting. At
> >> least it wasn't mentioned in the notes.
> >>
> >> In the context of replacing sources by a combination of to SDFs, how do
> you
> >> determine how many "SDF executor" instances you need downstream? For the
> >> sake of argument assume that both SDFs are executed with parallelism 1
> (or
> >> one per worker). Now, if you have a file source that reads from a static
> >> set of files the first SDF would emit the filenames while the second SDF
> >> would receive the filenames and emit their contents. This works well and
> >> the downstream SDF can process one filename after the other. Now, think
> of
> >> something like a Kafka source. The first SDF would emit the partitions
> (say
> >> 4 partitions, in this example) and the second SDF would be responsible
> for
> >> reading from a topic and emitting elements. Reading from one topic never
> >> finishes so you can't process the topics in series. I think you would
> need
> >> to have 4 downstream "SDF executor" instances. The question now is: how
> do
> >> you determine whether you are in the first or the second situation?
> >>
> >> Probably I'm just overlooking something and this is already dealt with
> >> somewhere... :-)
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía 

Re: [PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn

2016-08-21 Thread Aljoscha Krettek
Hi Eugene,
thanks for the long description! With the interleaving of execution it
completely makes sense.

Best,
Aljoscha

On Sun, 21 Aug 2016 at 21:14 Jean-Baptiste Onofré  wrote:

> Anyway, from a runner perspective, we will have kind of API (part of the
> Runner API) to "orchestrate" the SDF as we discussed during the call,
> right ?
>
> Regards
> JB
>
> On 08/21/2016 07:24 PM, Eugene Kirpichov wrote:
> > Hi Aljoscha,
> > This is an excellent question! And the answer is, we don't need any new
> > concepts like "SDF executor" and can rely on the per-key state and timers
> > machinery that already exists in all runners because it's necessary to
> > implement windowing/triggering properly.
> >
> > Note that this is already somewhat addressed in the previously posted
> State
> > and Timers proposal https://s.apache.org/beam-state , under "per-key
> > workflows".
> >
> > Think of it this way, using the Kafka example: we'll expand it into a
> > transform:
> >
> > (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for
> > partition in topic.listPartitions() }
> > (2) GroupByKey
> > (3) ParDo { key, topic, partition, R -> Kafka reader code in the
> > proposal/slides }
> >   - R is the OffsetRange restriction which in this case will be always of
> > the form [startOffset, inf).
> >   - there'll be just 1 value per key, but we use GBK to just get access
> to
> > the per-key state/timers machinery. This may be runner-specific; maybe
> some
> > runners don't need a GBK to do that.
> >
> > Now suppose the topic has two partitions, P1 and P2, and they get
> assigned
> > unique keys K1, K2.
> > Then the input to (3) will be a collection of: (K1, topic, P1, [0, inf)),
> > (K2, topic, P2, [0, inf)).
> > Suppose we have just 1 worker with just 1 thread. Now, how will this
> thread
> > be able to produce elements from both P1 and P2? here's how.
> >
> > The thread will process (K1, topic, P1, [0, inf)), checkpoint after a
> > certain time or after a certain number of elements are output (just like
> > with the current UnboundedSource reading code) producing a residual
> > restriction R1' (basically a new start timestamp), put R11 into the
> per-key
> > state and set a timer T1 to resume.
> > Then it will process (K2, topic, P2, [0, inf)), do the same producing a
> > residual restriction R2' and setting a timer T2 to resume.
> > Then timer T1 will fire in the context of the key K1. The thread will
> call
> > processElement again, this time supplying R1' as the restriction; the
> > process repeats and after a while it checkpoints and stores R1'' into
> state
> > of K1.
> > Then timer T2 will fire in the context of K2, run processElement for a
> > while, set a new timer and store R2'' into the state of K2.
> > Etc.
> > If partition 1 goes away, the processElement call will return "do not
> > resume", so a timer will not be set and instead the state associated with
> > K1 will be GC'd.
> >
> > So basically it's almost like cooperative thread scheduling: things run
> for
> > a while, until the runner tells them to checkpoint, then they set a timer
> > to resume themselves, and the runner fires the timers, and the process
> > repeats. And, again, this only requires things that runners can already
> do
> > - state and timers, but no new concept of SDF executor (and consequently
> no
> > necessity to choose/tune how many you need).
> >
> > Makes sense?
> >
> > On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek 
> > wrote:
> >
> >> Hi,
> >> I have another question that I think wasn't addressed in the meeting. At
> >> least it wasn't mentioned in the notes.
> >>
> >> In the context of replacing sources by a combination of to SDFs, how do
> you
> >> determine how many "SDF executor" instances you need downstream? For the
> >> sake of argument assume that both SDFs are executed with parallelism 1
> (or
> >> one per worker). Now, if you have a file source that reads from a static
> >> set of files the first SDF would emit the filenames while the second SDF
> >> would receive the filenames and emit their contents. This works well and
> >> the downstream SDF can process one filename after the other. Now, think
> of
> >> something like a Kafka source. The first SDF would emit the partitions
> (say
> >> 4 partitions, in this example) and the second SDF would be responsible
> for
> >> reading from a topic and emitting elements. Reading from one topic never
> >> finishes so you can't process the topics in series. I think you would
> need
> >> to have 4 downstream "SDF executor" instances. The question now is: how
> do
> >> you determine whether you are in the first or the second situation?
> >>
> >> Probably I'm just overlooking something and this is already dealt with
> >> somewhere... :-)
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía  wrote:
> >>
> >>> Hello,
> >>>
> >>> Thanks for the notes both Dan and Eugene, and for taking the 

Re: [PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn

2016-08-21 Thread Jean-Baptiste Onofré
Anyway, from a runner perspective, we will have kind of API (part of the 
Runner API) to "orchestrate" the SDF as we discussed during the call, 
right ?


Regards
JB

On 08/21/2016 07:24 PM, Eugene Kirpichov wrote:

Hi Aljoscha,
This is an excellent question! And the answer is, we don't need any new
concepts like "SDF executor" and can rely on the per-key state and timers
machinery that already exists in all runners because it's necessary to
implement windowing/triggering properly.

Note that this is already somewhat addressed in the previously posted State
and Timers proposal https://s.apache.org/beam-state , under "per-key
workflows".

Think of it this way, using the Kafka example: we'll expand it into a
transform:

(1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for
partition in topic.listPartitions() }
(2) GroupByKey
(3) ParDo { key, topic, partition, R -> Kafka reader code in the
proposal/slides }
  - R is the OffsetRange restriction which in this case will be always of
the form [startOffset, inf).
  - there'll be just 1 value per key, but we use GBK to just get access to
the per-key state/timers machinery. This may be runner-specific; maybe some
runners don't need a GBK to do that.

Now suppose the topic has two partitions, P1 and P2, and they get assigned
unique keys K1, K2.
Then the input to (3) will be a collection of: (K1, topic, P1, [0, inf)),
(K2, topic, P2, [0, inf)).
Suppose we have just 1 worker with just 1 thread. Now, how will this thread
be able to produce elements from both P1 and P2? here's how.

The thread will process (K1, topic, P1, [0, inf)), checkpoint after a
certain time or after a certain number of elements are output (just like
with the current UnboundedSource reading code) producing a residual
restriction R1' (basically a new start timestamp), put R11 into the per-key
state and set a timer T1 to resume.
Then it will process (K2, topic, P2, [0, inf)), do the same producing a
residual restriction R2' and setting a timer T2 to resume.
Then timer T1 will fire in the context of the key K1. The thread will call
processElement again, this time supplying R1' as the restriction; the
process repeats and after a while it checkpoints and stores R1'' into state
of K1.
Then timer T2 will fire in the context of K2, run processElement for a
while, set a new timer and store R2'' into the state of K2.
Etc.
If partition 1 goes away, the processElement call will return "do not
resume", so a timer will not be set and instead the state associated with
K1 will be GC'd.

So basically it's almost like cooperative thread scheduling: things run for
a while, until the runner tells them to checkpoint, then they set a timer
to resume themselves, and the runner fires the timers, and the process
repeats. And, again, this only requires things that runners can already do
- state and timers, but no new concept of SDF executor (and consequently no
necessity to choose/tune how many you need).

Makes sense?

On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek 
wrote:


Hi,
I have another question that I think wasn't addressed in the meeting. At
least it wasn't mentioned in the notes.

In the context of replacing sources by a combination of to SDFs, how do you
determine how many "SDF executor" instances you need downstream? For the
sake of argument assume that both SDFs are executed with parallelism 1 (or
one per worker). Now, if you have a file source that reads from a static
set of files the first SDF would emit the filenames while the second SDF
would receive the filenames and emit their contents. This works well and
the downstream SDF can process one filename after the other. Now, think of
something like a Kafka source. The first SDF would emit the partitions (say
4 partitions, in this example) and the second SDF would be responsible for
reading from a topic and emitting elements. Reading from one topic never
finishes so you can't process the topics in series. I think you would need
to have 4 downstream "SDF executor" instances. The question now is: how do
you determine whether you are in the first or the second situation?

Probably I'm just overlooking something and this is already dealt with
somewhere... :-)

Cheers,
Aljoscha

On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía  wrote:


Hello,

Thanks for the notes both Dan and Eugene, and for taking the time to do

the

presentation and  answer our questions.

I mentioned the ongoing work on dynamic scaling on Flink because I

suppose

that it will address dynamic rebalancing eventually (there are multiple
changes going on for dynamic scaling).




https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4


https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-8

Anyway I am far from an expert on flink, but probably the flink guys can
give their opinion about this and refer to a more precise document that

the

ones I mentioned..

​Thanks again,

Re: java.io.NotSerializableException: org.apache.kafka.common.TopicPartition

2016-08-21 Thread Dan Halperin
Explicit +Raghu

On Fri, Aug 19, 2016 at 4:24 PM, Chawla,Sumit 
wrote:

> Hi All
>
> I am trying to use KafkaIO as unbounded source, but the translation is
> failing.  I am using FlinkRunner for the pipe.  It complains about
> the org.apache.kafka.common.TopicPartition being not-serializable.
>
> pipeline.apply(
> KafkaIO.read()
> .withTopics(ImmutableList.of("test-topic))
> .withBootstrapServers("localhost:9200")
>
> Is it a known issue?  Here is the full exception details.
>
> Caused by: org.apache.flink.api.common.InvalidProgramException: Object
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.
> UnboundedSourceWrapper@2a855331
> not serializable
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(
> ClosureCleaner.java:99)
> ~[flink-java-1.0.3.jar:1.0.3]
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
> ~[flink-java-1.0.3.jar:1.0.3]
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> clean(StreamExecutionEnvironment.java:1219)
> ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> addSource(StreamExecutionEnvironment.java:1131)
> ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> addSource(StreamExecutionEnvironment.java:1075)
> ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.
> addSource(StreamExecutionEnvironment.java:1057)
> ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0]
> at
> org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla
> tors$UnboundedReadSourceTranslator.translateNode(
> FlinkStreamingTransformTranslators.java:281)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla
> tors$UnboundedReadSourceTranslator.translateNode(
> FlinkStreamingTransformTranslators.java:244)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat
> or.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:106)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat
> or.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:87)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(
> TransformTreeNode.java:225)
> ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(
> TransformTreeNode.java:220)
> ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(
> TransformTreeNode.java:220)
> ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(
> TransformHierarchy.java:104)
> ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)
> ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.
> translate(FlinkPipelineTranslator.java:38)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(
> FlinkPipelineExecutionEnvironment.java:106)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(
> FlinkPipelineRunner.java:106)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(
> FlinkPipelineRunner.java:49)
> ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating]
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating]
> at
> com.cisco.ndp.pipeline.common.runner.flink.PipelineMain.
> main(PipelineMain.java:70)
> ~[pipelinecommon-1.0.0.jar:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_92]
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
> ~[?:1.8.0_92]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_92]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_92]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:505)
> ~[flink-clients_2.10-1.0.3.jar:1.0.3]
> ... 9 more
> Caused by: java.io.NotSerializableException:
> org.apache.kafka.common.TopicPartition
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[?:1.8.0_92]
> at