I think its this one https://issues.apache.org/jira/browse/KAFKA-824
On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <m...@apache.org> wrote: > I know this has been fixed already but, out of curiosity, could you > point me to the Kafka JIRA issue for this > bug? From the Flink issue it looks like this is a Zookeeper version > mismatch. > > On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <rmetz...@apache.org> > wrote: > > Hi Gyula, > > > > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the > > "release-0.10" branch to Apache's maven snapshot repository. > > > > > > I don't think Mihail's code will run when he's compiling it against > > 1.0-SNAPSHOT. > > > > > > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: > >> > >> Hi, > >> > >> I think Robert meant to write setting the connector dependency to > >> 1.0-SNAPSHOT. > >> > >> Cheers, > >> Gyula > >> > >> Robert Metzger <rmetz...@apache.org> ezt írta (időpont: 2015. dec. 1., > K, > >> 17:10): > >>> > >>> Hi Mihail, > >>> > >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for this > as > >>> well: https://issues.apache.org/jira/browse/FLINK-3067 > >>> > >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain > a > >>> fix. > >>> > >>> Since the kafka connector is not contained in the flink binary, you can > >>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven > will > >>> then download the code planned for the 0.10-SNAPSHOT release. > >>> > >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail <mihail.vi...@zalando.de > > > >>> wrote: > >>>> > >>>> Hi, > >>>> > >>>> we get the following NullPointerException after ~50 minutes when > running > >>>> a streaming job with windowing and state that reads data from Kafka > and > >>>> writes the result to local FS. > >>>> There are around 170 million messages to be processed, Flink 0.10.1 > >>>> stops at ~8 million. > >>>> Flink runs locally, started with the "start-cluster-streaming.sh" > >>>> script. > >>>> > >>>> 12/01/2015 15:06:24 Job execution switched to status RUNNING. > >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) > switched > >>>> to SCHEDULED > >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) > switched > >>>> to DEPLOYING > >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at > >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > >>>> SCHEDULED > >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at > >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > >>>> DEPLOYING > >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) > switched > >>>> to RUNNING > >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at > >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > >>>> RUNNING > >>>> 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at > >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > >>>> CANCELED > >>>> 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) > switched > >>>> to FAILED > >>>> java.lang.Exception > >>>> at > >>>> > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) > >>>> at > >>>> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) > >>>> at > >>>> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) > >>>> at > >>>> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) > >>>> at > >>>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) > >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > >>>> at java.lang.Thread.run(Thread.java:745) > >>>> Caused by: java.lang.NullPointerException > >>>> at > >>>> > org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) > >>>> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) > >>>> at > >>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > >>>> at > >>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) > >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) > >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) > >>>> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) > >>>> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) > >>>> at > >>>> > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) > >>>> at > >>>> > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) > >>>> at > >>>> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) > >>>> > >>>> > >>>> Any ideas on what could cause this behaviour? > >>>> > >>>> Best, > >>>> Mihail > >>> > >>> > > >