Re: java.io.NotSerializableException: org.apache.kafka.common.TopicPartition
Thanks Dan.. today i was able to identify what the issue was. Kafka TopicPartition is marked Serializable in kafka-clients-0.9.0.1.jar. Somehow i was pulling down kafka-clients-0.9.0.0.jar. Regards Sumit Chawla On Sun, Aug 21, 2016 at 10:20 AM, Dan Halperinwrote: > Explicit +Raghu > > On Fri, Aug 19, 2016 at 4:24 PM, Chawla,Sumit > wrote: > > > Hi All > > > > I am trying to use KafkaIO as unbounded source, but the translation is > > failing. I am using FlinkRunner for the pipe. It complains about > > the org.apache.kafka.common.TopicPartition being not-serializable. > > > > pipeline.apply( > > KafkaIO.read() > > .withTopics(ImmutableList.of("test-topic)) > > .withBootstrapServers("localhost:9200") > > > > Is it a known issue? Here is the full exception details. > > > > Caused by: org.apache.flink.api.common.InvalidProgramException: Object > > org.apache.beam.runners.flink.translation.wrappers.streaming.io. > > UnboundedSourceWrapper@2a855331 > > not serializable > > at > > org.apache.flink.api.java.ClosureCleaner.ensureSerializable( > > ClosureCleaner.java:99) > > ~[flink-java-1.0.3.jar:1.0.3] > > at org.apache.flink.api.java.ClosureCleaner.clean( > ClosureCleaner.java:61) > > ~[flink-java-1.0.3.jar:1.0.3] > > at > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. > > clean(StreamExecutionEnvironment.java:1219) > > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] > > at > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. > > addSource(StreamExecutionEnvironment.java:1131) > > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] > > at > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. > > addSource(StreamExecutionEnvironment.java:1075) > > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] > > at > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. > > addSource(StreamExecutionEnvironment.java:1057) > > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] > > at > > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla > > tors$UnboundedReadSourceTranslator.translateNode( > > FlinkStreamingTransformTranslators.java:281) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla > > tors$UnboundedReadSourceTranslator.translateNode( > > FlinkStreamingTransformTranslators.java:244) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat > > or.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:106) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat > > or.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:87) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.sdk.runners.TransformTreeNode.visit( > > TransformTreeNode.java:225) > > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.sdk.runners.TransformTreeNode.visit( > > TransformTreeNode.java:220) > > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.sdk.runners.TransformTreeNode.visit( > > TransformTreeNode.java:220) > > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.sdk.runners.TransformHierarchy.visit( > > TransformHierarchy.java:104) > > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292) > > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.runners.flink.translation.FlinkPipelineTranslator. > > translate(FlinkPipelineTranslator.java:38) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironm > ent.translate( > > FlinkPipelineExecutionEnvironment.java:106) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.runners.flink.FlinkPipelineRunner.run( > > FlinkPipelineRunner.java:106) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at > > org.apache.beam.runners.flink.FlinkPipelineRunner.run( > > FlinkPipelineRunner.java:49) > > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) > > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > > at > > com.cisco.ndp.pipeline.common.runner.flink.PipelineMain. > > main(PipelineMain.java:70) > > ~[pipelinecommon-1.0.0.jar:?] > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > ~[?:1.8.0_92] > > at > > sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java: > > 62) > >
Re: [PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn
Hi JB, Yes, I'm assuming you're referring to the "magic" part on the transform expansion diagram. This is indeed runner-specific, and timers+state are likely the simplest way to do this for an SDF that does unbounded amount of work. On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofréwrote: > Anyway, from a runner perspective, we will have kind of API (part of the > Runner API) to "orchestrate" the SDF as we discussed during the call, > right ? > > Regards > JB > > On 08/21/2016 07:24 PM, Eugene Kirpichov wrote: > > Hi Aljoscha, > > This is an excellent question! And the answer is, we don't need any new > > concepts like "SDF executor" and can rely on the per-key state and timers > > machinery that already exists in all runners because it's necessary to > > implement windowing/triggering properly. > > > > Note that this is already somewhat addressed in the previously posted > State > > and Timers proposal https://s.apache.org/beam-state , under "per-key > > workflows". > > > > Think of it this way, using the Kafka example: we'll expand it into a > > transform: > > > > (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for > > partition in topic.listPartitions() } > > (2) GroupByKey > > (3) ParDo { key, topic, partition, R -> Kafka reader code in the > > proposal/slides } > > - R is the OffsetRange restriction which in this case will be always of > > the form [startOffset, inf). > > - there'll be just 1 value per key, but we use GBK to just get access > to > > the per-key state/timers machinery. This may be runner-specific; maybe > some > > runners don't need a GBK to do that. > > > > Now suppose the topic has two partitions, P1 and P2, and they get > assigned > > unique keys K1, K2. > > Then the input to (3) will be a collection of: (K1, topic, P1, [0, inf)), > > (K2, topic, P2, [0, inf)). > > Suppose we have just 1 worker with just 1 thread. Now, how will this > thread > > be able to produce elements from both P1 and P2? here's how. > > > > The thread will process (K1, topic, P1, [0, inf)), checkpoint after a > > certain time or after a certain number of elements are output (just like > > with the current UnboundedSource reading code) producing a residual > > restriction R1' (basically a new start timestamp), put R11 into the > per-key > > state and set a timer T1 to resume. > > Then it will process (K2, topic, P2, [0, inf)), do the same producing a > > residual restriction R2' and setting a timer T2 to resume. > > Then timer T1 will fire in the context of the key K1. The thread will > call > > processElement again, this time supplying R1' as the restriction; the > > process repeats and after a while it checkpoints and stores R1'' into > state > > of K1. > > Then timer T2 will fire in the context of K2, run processElement for a > > while, set a new timer and store R2'' into the state of K2. > > Etc. > > If partition 1 goes away, the processElement call will return "do not > > resume", so a timer will not be set and instead the state associated with > > K1 will be GC'd. > > > > So basically it's almost like cooperative thread scheduling: things run > for > > a while, until the runner tells them to checkpoint, then they set a timer > > to resume themselves, and the runner fires the timers, and the process > > repeats. And, again, this only requires things that runners can already > do > > - state and timers, but no new concept of SDF executor (and consequently > no > > necessity to choose/tune how many you need). > > > > Makes sense? > > > > On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek > > wrote: > > > >> Hi, > >> I have another question that I think wasn't addressed in the meeting. At > >> least it wasn't mentioned in the notes. > >> > >> In the context of replacing sources by a combination of to SDFs, how do > you > >> determine how many "SDF executor" instances you need downstream? For the > >> sake of argument assume that both SDFs are executed with parallelism 1 > (or > >> one per worker). Now, if you have a file source that reads from a static > >> set of files the first SDF would emit the filenames while the second SDF > >> would receive the filenames and emit their contents. This works well and > >> the downstream SDF can process one filename after the other. Now, think > of > >> something like a Kafka source. The first SDF would emit the partitions > (say > >> 4 partitions, in this example) and the second SDF would be responsible > for > >> reading from a topic and emitting elements. Reading from one topic never > >> finishes so you can't process the topics in series. I think you would > need > >> to have 4 downstream "SDF executor" instances. The question now is: how > do > >> you determine whether you are in the first or the second situation? > >> > >> Probably I'm just overlooking something and this is already dealt with > >> somewhere... :-) > >> > >> Cheers, > >> Aljoscha > >> > >> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía
Re: [PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn
Hi Eugene, thanks for the long description! With the interleaving of execution it completely makes sense. Best, Aljoscha On Sun, 21 Aug 2016 at 21:14 Jean-Baptiste Onofréwrote: > Anyway, from a runner perspective, we will have kind of API (part of the > Runner API) to "orchestrate" the SDF as we discussed during the call, > right ? > > Regards > JB > > On 08/21/2016 07:24 PM, Eugene Kirpichov wrote: > > Hi Aljoscha, > > This is an excellent question! And the answer is, we don't need any new > > concepts like "SDF executor" and can rely on the per-key state and timers > > machinery that already exists in all runners because it's necessary to > > implement windowing/triggering properly. > > > > Note that this is already somewhat addressed in the previously posted > State > > and Timers proposal https://s.apache.org/beam-state , under "per-key > > workflows". > > > > Think of it this way, using the Kafka example: we'll expand it into a > > transform: > > > > (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for > > partition in topic.listPartitions() } > > (2) GroupByKey > > (3) ParDo { key, topic, partition, R -> Kafka reader code in the > > proposal/slides } > > - R is the OffsetRange restriction which in this case will be always of > > the form [startOffset, inf). > > - there'll be just 1 value per key, but we use GBK to just get access > to > > the per-key state/timers machinery. This may be runner-specific; maybe > some > > runners don't need a GBK to do that. > > > > Now suppose the topic has two partitions, P1 and P2, and they get > assigned > > unique keys K1, K2. > > Then the input to (3) will be a collection of: (K1, topic, P1, [0, inf)), > > (K2, topic, P2, [0, inf)). > > Suppose we have just 1 worker with just 1 thread. Now, how will this > thread > > be able to produce elements from both P1 and P2? here's how. > > > > The thread will process (K1, topic, P1, [0, inf)), checkpoint after a > > certain time or after a certain number of elements are output (just like > > with the current UnboundedSource reading code) producing a residual > > restriction R1' (basically a new start timestamp), put R11 into the > per-key > > state and set a timer T1 to resume. > > Then it will process (K2, topic, P2, [0, inf)), do the same producing a > > residual restriction R2' and setting a timer T2 to resume. > > Then timer T1 will fire in the context of the key K1. The thread will > call > > processElement again, this time supplying R1' as the restriction; the > > process repeats and after a while it checkpoints and stores R1'' into > state > > of K1. > > Then timer T2 will fire in the context of K2, run processElement for a > > while, set a new timer and store R2'' into the state of K2. > > Etc. > > If partition 1 goes away, the processElement call will return "do not > > resume", so a timer will not be set and instead the state associated with > > K1 will be GC'd. > > > > So basically it's almost like cooperative thread scheduling: things run > for > > a while, until the runner tells them to checkpoint, then they set a timer > > to resume themselves, and the runner fires the timers, and the process > > repeats. And, again, this only requires things that runners can already > do > > - state and timers, but no new concept of SDF executor (and consequently > no > > necessity to choose/tune how many you need). > > > > Makes sense? > > > > On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek > > wrote: > > > >> Hi, > >> I have another question that I think wasn't addressed in the meeting. At > >> least it wasn't mentioned in the notes. > >> > >> In the context of replacing sources by a combination of to SDFs, how do > you > >> determine how many "SDF executor" instances you need downstream? For the > >> sake of argument assume that both SDFs are executed with parallelism 1 > (or > >> one per worker). Now, if you have a file source that reads from a static > >> set of files the first SDF would emit the filenames while the second SDF > >> would receive the filenames and emit their contents. This works well and > >> the downstream SDF can process one filename after the other. Now, think > of > >> something like a Kafka source. The first SDF would emit the partitions > (say > >> 4 partitions, in this example) and the second SDF would be responsible > for > >> reading from a topic and emitting elements. Reading from one topic never > >> finishes so you can't process the topics in series. I think you would > need > >> to have 4 downstream "SDF executor" instances. The question now is: how > do > >> you determine whether you are in the first or the second situation? > >> > >> Probably I'm just overlooking something and this is already dealt with > >> somewhere... :-) > >> > >> Cheers, > >> Aljoscha > >> > >> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía wrote: > >> > >>> Hello, > >>> > >>> Thanks for the notes both Dan and Eugene, and for taking the
Re: [PROPOSAL] Splittable DoFn - Replacing the Source API with non-monolithic element processing in DoFn
Anyway, from a runner perspective, we will have kind of API (part of the Runner API) to "orchestrate" the SDF as we discussed during the call, right ? Regards JB On 08/21/2016 07:24 PM, Eugene Kirpichov wrote: Hi Aljoscha, This is an excellent question! And the answer is, we don't need any new concepts like "SDF executor" and can rely on the per-key state and timers machinery that already exists in all runners because it's necessary to implement windowing/triggering properly. Note that this is already somewhat addressed in the previously posted State and Timers proposal https://s.apache.org/beam-state , under "per-key workflows". Think of it this way, using the Kafka example: we'll expand it into a transform: (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for partition in topic.listPartitions() } (2) GroupByKey (3) ParDo { key, topic, partition, R -> Kafka reader code in the proposal/slides } - R is the OffsetRange restriction which in this case will be always of the form [startOffset, inf). - there'll be just 1 value per key, but we use GBK to just get access to the per-key state/timers machinery. This may be runner-specific; maybe some runners don't need a GBK to do that. Now suppose the topic has two partitions, P1 and P2, and they get assigned unique keys K1, K2. Then the input to (3) will be a collection of: (K1, topic, P1, [0, inf)), (K2, topic, P2, [0, inf)). Suppose we have just 1 worker with just 1 thread. Now, how will this thread be able to produce elements from both P1 and P2? here's how. The thread will process (K1, topic, P1, [0, inf)), checkpoint after a certain time or after a certain number of elements are output (just like with the current UnboundedSource reading code) producing a residual restriction R1' (basically a new start timestamp), put R11 into the per-key state and set a timer T1 to resume. Then it will process (K2, topic, P2, [0, inf)), do the same producing a residual restriction R2' and setting a timer T2 to resume. Then timer T1 will fire in the context of the key K1. The thread will call processElement again, this time supplying R1' as the restriction; the process repeats and after a while it checkpoints and stores R1'' into state of K1. Then timer T2 will fire in the context of K2, run processElement for a while, set a new timer and store R2'' into the state of K2. Etc. If partition 1 goes away, the processElement call will return "do not resume", so a timer will not be set and instead the state associated with K1 will be GC'd. So basically it's almost like cooperative thread scheduling: things run for a while, until the runner tells them to checkpoint, then they set a timer to resume themselves, and the runner fires the timers, and the process repeats. And, again, this only requires things that runners can already do - state and timers, but no new concept of SDF executor (and consequently no necessity to choose/tune how many you need). Makes sense? On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettekwrote: Hi, I have another question that I think wasn't addressed in the meeting. At least it wasn't mentioned in the notes. In the context of replacing sources by a combination of to SDFs, how do you determine how many "SDF executor" instances you need downstream? For the sake of argument assume that both SDFs are executed with parallelism 1 (or one per worker). Now, if you have a file source that reads from a static set of files the first SDF would emit the filenames while the second SDF would receive the filenames and emit their contents. This works well and the downstream SDF can process one filename after the other. Now, think of something like a Kafka source. The first SDF would emit the partitions (say 4 partitions, in this example) and the second SDF would be responsible for reading from a topic and emitting elements. Reading from one topic never finishes so you can't process the topics in series. I think you would need to have 4 downstream "SDF executor" instances. The question now is: how do you determine whether you are in the first or the second situation? Probably I'm just overlooking something and this is already dealt with somewhere... :-) Cheers, Aljoscha On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía wrote: Hello, Thanks for the notes both Dan and Eugene, and for taking the time to do the presentation and answer our questions. I mentioned the ongoing work on dynamic scaling on Flink because I suppose that it will address dynamic rebalancing eventually (there are multiple changes going on for dynamic scaling). https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4 https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-8 Anyway I am far from an expert on flink, but probably the flink guys can give their opinion about this and refer to a more precise document that the ones I mentioned.. Thanks again,
Re: java.io.NotSerializableException: org.apache.kafka.common.TopicPartition
Explicit +Raghu On Fri, Aug 19, 2016 at 4:24 PM, Chawla,Sumitwrote: > Hi All > > I am trying to use KafkaIO as unbounded source, but the translation is > failing. I am using FlinkRunner for the pipe. It complains about > the org.apache.kafka.common.TopicPartition being not-serializable. > > pipeline.apply( > KafkaIO.read() > .withTopics(ImmutableList.of("test-topic)) > .withBootstrapServers("localhost:9200") > > Is it a known issue? Here is the full exception details. > > Caused by: org.apache.flink.api.common.InvalidProgramException: Object > org.apache.beam.runners.flink.translation.wrappers.streaming.io. > UnboundedSourceWrapper@2a855331 > not serializable > at > org.apache.flink.api.java.ClosureCleaner.ensureSerializable( > ClosureCleaner.java:99) > ~[flink-java-1.0.3.jar:1.0.3] > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61) > ~[flink-java-1.0.3.jar:1.0.3] > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. > clean(StreamExecutionEnvironment.java:1219) > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. > addSource(StreamExecutionEnvironment.java:1131) > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. > addSource(StreamExecutionEnvironment.java:1075) > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment. > addSource(StreamExecutionEnvironment.java:1057) > ~[flink-streaming-java_2.10-1.0.0.jar:1.0.0] > at > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla > tors$UnboundedReadSourceTranslator.translateNode( > FlinkStreamingTransformTranslators.java:281) > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > at > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla > tors$UnboundedReadSourceTranslator.translateNode( > FlinkStreamingTransformTranslators.java:244) > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > at > org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat > or.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:106) > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > at > org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslat > or.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:87) > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > at > org.apache.beam.sdk.runners.TransformTreeNode.visit( > TransformTreeNode.java:225) > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > at > org.apache.beam.sdk.runners.TransformTreeNode.visit( > TransformTreeNode.java:220) > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > at > org.apache.beam.sdk.runners.TransformTreeNode.visit( > TransformTreeNode.java:220) > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > at > org.apache.beam.sdk.runners.TransformHierarchy.visit( > TransformHierarchy.java:104) > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292) > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > at > org.apache.beam.runners.flink.translation.FlinkPipelineTranslator. > translate(FlinkPipelineTranslator.java:38) > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate( > FlinkPipelineExecutionEnvironment.java:106) > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run( > FlinkPipelineRunner.java:106) > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run( > FlinkPipelineRunner.java:49) > ~[beam-runners-flink_2.10-0.1.0-incubating.jar:0.1.0-incubating] > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) > ~[beam-sdks-java-core-0.1.0-incubating.jar:0.1.0-incubating] > at > com.cisco.ndp.pipeline.common.runner.flink.PipelineMain. > main(PipelineMain.java:70) > ~[pipelinecommon-1.0.0.jar:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_92] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: > 62) > ~[?:1.8.0_92] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_92] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_92] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod( > PackagedProgram.java:505) > ~[flink-clients_2.10-1.0.3.jar:1.0.3] > ... 9 more > Caused by: java.io.NotSerializableException: > org.apache.kafka.common.TopicPartition > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > ~[?:1.8.0_92] > at