Thank you, Robert! The issue with Kafka is now solved with the
0.10-SNAPSHOT dependency.

We have run into an OutOfMemory exception though, which appears to be
related to the state. As my colleague, Javier Lopez, mentioned in a
previous thread, state handling is crucial for our use case. And as the
jobs are intended to run for months, stability plays an important role in
choosing a stream processing framework.

12/02/2015 10:03:53    Fast TumblingTimeWindows(5000) of Reduce at
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
FAILED
java.lang.OutOfMemoryError: Java heap space
    at java.util.HashMap.resize(HashMap.java:703)
    at java.util.HashMap.putVal(HashMap.java:662)
    at java.util.HashMap.put(HashMap.java:611)
    at
org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
    at
de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121)
    at
de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108)
    at
org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196)
    at
org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50)
    at
org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210)
    at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)




2015-12-01 17:42 GMT+01:00 Maximilian Michels <m...@apache.org>:

> Thanks! I've linked the issue in JIRA.
>
> On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
> > I think its this one https://issues.apache.org/jira/browse/KAFKA-824
> >
> > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <m...@apache.org>
> wrote:
> >>
> >> I know this has been fixed already but, out of curiosity, could you
> >> point me to the Kafka JIRA issue for this
> >> bug? From the Flink issue it looks like this is a Zookeeper version
> >> mismatch.
> >>
> >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <rmetz...@apache.org>
> >> wrote:
> >> > Hi Gyula,
> >> >
> >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the
> >> > "release-0.10" branch to Apache's maven snapshot repository.
> >> >
> >> >
> >> > I don't think Mihail's code will run when he's compiling it against
> >> > 1.0-SNAPSHOT.
> >> >
> >> >
> >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <gyula.f...@gmail.com>
> wrote:
> >> >>
> >> >> Hi,
> >> >>
> >> >> I think Robert meant to write setting the connector dependency to
> >> >> 1.0-SNAPSHOT.
> >> >>
> >> >> Cheers,
> >> >> Gyula
> >> >>
> >> >> Robert Metzger <rmetz...@apache.org> ezt írta (időpont: 2015. dec.
> 1.,
> >> >> K,
> >> >> 17:10):
> >> >>>
> >> >>> Hi Mihail,
> >> >>>
> >> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for
> this
> >> >>> as
> >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067
> >> >>>
> >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will
> contain
> >> >>> a
> >> >>> fix.
> >> >>>
> >> >>> Since the kafka connector is not contained in the flink binary, you
> >> >>> can
> >> >>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven
> >> >>> will
> >> >>> then download the code planned for the 0.10-SNAPSHOT release.
> >> >>>
> >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail
> >> >>> <mihail.vi...@zalando.de>
> >> >>> wrote:
> >> >>>>
> >> >>>> Hi,
> >> >>>>
> >> >>>> we get the following NullPointerException after ~50 minutes when
> >> >>>> running
> >> >>>> a streaming job with windowing and state that reads data from Kafka
> >> >>>> and
> >> >>>> writes the result to local FS.
> >> >>>> There are around 170 million messages to be processed, Flink 0.10.1
> >> >>>> stops at ~8 million.
> >> >>>> Flink runs locally, started with the "start-cluster-streaming.sh"
> >> >>>> script.
> >> >>>>
> >> >>>> 12/01/2015 15:06:24    Job execution switched to status RUNNING.
> >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1)
> >> >>>> switched
> >> >>>> to SCHEDULED
> >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1)
> >> >>>> switched
> >> >>>> to DEPLOYING
> >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched
> to
> >> >>>> SCHEDULED
> >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched
> to
> >> >>>> DEPLOYING
> >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1)
> >> >>>> switched
> >> >>>> to RUNNING
> >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched
> to
> >> >>>> RUNNING
> >> >>>> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at
> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched
> to
> >> >>>> CANCELED
> >> >>>> 12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1)
> >> >>>> switched
> >> >>>> to FAILED
> >> >>>> java.lang.Exception
> >> >>>>     at
> >> >>>>
> >> >>>>
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
> >> >>>>     at
> >> >>>>
> >> >>>>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
> >> >>>>     at
> >> >>>>
> >> >>>>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> >> >>>>     at
> >> >>>>
> >> >>>>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
> >> >>>>     at
> >> >>>>
> >> >>>>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
> >> >>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> >> >>>>     at java.lang.Thread.run(Thread.java:745)
> >> >>>> Caused by: java.lang.NullPointerException
> >> >>>>     at
> >> >>>>
> >> >>>>
> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
> >> >>>>     at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
> >> >>>>     at
> >> >>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> >> >>>>     at
> >> >>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
> >> >>>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
> >> >>>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> >> >>>>     at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
> >> >>>>     at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
> >> >>>>     at
> >> >>>>
> >> >>>>
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
> >> >>>>     at
> >> >>>>
> >> >>>>
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
> >> >>>>     at
> >> >>>>
> >> >>>>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
> >> >>>>
> >> >>>>
> >> >>>> Any ideas on what could cause this behaviour?
> >> >>>>
> >> >>>> Best,
> >> >>>> Mihail
> >> >>>
> >> >>>
> >> >
> >
> >
>

Reply via email to