Hi Federico, I'm guessing the job is still working without asynchronous watermarks? I'm very eager to figure out what is actually going wrong with asynchronous checkpoints.
Best, Aljoscha > On 2. Oct 2017, at 11:57, Federico D'Ambrosio > <federico.dambro...@smartlab.ws> wrote: > > As a followup: > > the flink job has currently an uptime of almost 24 hours, with no checkpoint > failed or restart whereas, with async snapshots, it would have already > crashed 50 or so times. > > Regards, > Federico > > 2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio > <federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>>: > Thank you very much, Gordon. > > I'll try to run the job without the asynchronous snapshots first thing. > > As for the Event data type: it's a case class with 2 fields: a String ID and > a composite case class (let's call it RealEvent) containing 3 fields of the > following types: Information, which is a case class with String fields, > Coordinates, a nested case class with 2 Double and InstantValues, with 3 > Integers and a DateTime.This DateTime field in InstantValues is the one being > evalued in the maxBy (via InstantValues and RealEvent compareTo > implementations, because dot notation is not working in scala as of 1.3.2, > FLINK-7629 <https://issues.apache.org/jira/browse/FLINK-7629>) and that was > the reason in the first place I had to register the JodaDateTimeSerializer > with Kryo. > > Regards, > Federico > > > > > 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org > <mailto:tzuli...@apache.org>>: > Hi, > > Thanks for the extra info, it was helpful (I’m not sure why your first logs > didn’t have the full trace, though). > > I spent some time digging through the error trace, and currently have some > observations I would like to go through first: > > 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while > trying to access the state and making a copy (via serialization) in the > CopyOnWriteStateTable. > 2. The state that caused the exception seems to be the state of the reducing > window function (i.e. the maxBy). The state type should be the same as the > records in your `events` DataStream, which seems to be a Scala case class > with some nested field that requires Kryo for serialization. > 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when trying to > copy that field .. > > My current guess would perhaps be that the serializer internally used may > have been incorrectly shared, which is probably why this exception happens > randomly for you. > I recall that there were similar issues that occurred before due to the fact > that some KryoSerializers aren't thread-safe and was incorrectly shared in > Flink. > > I may need some help from you to be able to look at this a bit more: > - Is it possible that you disable asynchronous snapshots and try running this > job a bit more to see if the problem still occurs? This is mainly to > eliminate my guess on whether or not there is some incorrect serializer usage > in the CopyOnWriteStateTable. > - Could you let us know what your `events` DataStream records type case class > looks like? > > Also looping in Aljoscha and Stefan here, as they would probably have more > insights in this. > > Cheers, > Gordon > > On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio > (federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>) > wrote: > >> Hi Gordon, >> >> I remembered that I had already seen this kind of exception once during the >> testing of the current job and fortunately I had the complete stacktrace >> still saved on my pc: >> >> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 >> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) >> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) >> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176) >> at >> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) >> at >> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) >> at >> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) >> at >> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) >> at >> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) >> at >> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) >> at >> org.apache.flink.runtime.state.heap.HeapReducingState.get(HeapReducingState.java:68) >> at >> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:498) >> at >> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) >> at >> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) >> at org.apache.flink.streaming.runtime.io >> <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) >> at org.apache.flink.streaming.runtime.io >> <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) >> at >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) >> at >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) >> at org.apache.flink.streaming.runtime.io >> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:188) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >> at java.lang.Thread.run(Thread.java:748) >> >> I don't know why now the stacktrace is getting output only for the first >> parts (handleWatermark and HeapReducingState). >> >> So, it looks like something that has to do with the KryoSerializer. As a >> KryoSerializer I'm using JodaDateTimeSerializer, registered as follows: >> >> env.getConfig.addDefaultKryoSerializer(classOf[DateTime], >> classOf[JodaDateTimeSerializer]) >> >> I hope this could help. >> >> Regards, >> Federico >> >> 2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio >> <federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>>: >> Hi Gordon, >> >> I'm currently using Flink 1.3.2 in local mode. >> >> If it's any help I realized from the log that the complete task which is >> failing is: >> >> 2017-09-29 14:17:20,354 INFO org.apache.flink.runtime.taskmanager.Task >> - latest_time -> (map_active_stream, map_history_stream) >> (1/1) (5a6c9f187326f678701f939665db6685) switched from RUNNING to FAILED. >> >> val events = keyedStreamByID >> .window(TumblingEventTimeWindows.of(Time.seconds(20))) >> .maxBy("time").name("latest_time").uid("latest_time") >> >> >> val activeStream = events >> //Serialization to JsValue >> .map(event => >> event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream") >> //Global windowing, the cause of exception should be above >> .timeWindowAll(Time.seconds(10)) >> .apply(new >> MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window") >> >> val historyStream = airtrafficEvents >> //Serialization to JsValue >> .map(event => >> event.toMongoHistoryJsValue).name("map_history_stream").uid("map_history_stream") >> //Global windowing, the cause of exception should be above >> .timeWindowAll(Time.seconds(10)) >> .apply(new >> MongoWindow(MongoWritingType.UPDATE)).name("history_stream_window").uid("history_stream_window") >> >> >> >> Regards, >> Federico >> >> 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org >> <mailto:tzuli...@apache.org>>: >> Hi, >> >> I’m looking into this. Could you let us know the Flink version in which the >> exceptions occurred? >> >> Cheers, >> Gordon >> >> >> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio >> (federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>) >> wrote: >> >>> Hi, I'm coming across these Exceptions while running a pretty simple flink >>> job. >>> First one: >>> java.lang.RuntimeException: Exception occurred while processing valve >>> output watermark: >>> at org.apache.flink.streaming.runtime.io >>> <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) >>> at >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) >>> at >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) >>> at org.apache.flink.streaming.runtime.io >>> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:188) >>> at >>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.lang.ArrayIndexOutOfBoundsException >>> >>> The second one: >>> java.io.IOException: Exception while applying ReduceFunction in reducing >>> state >>> at >>> org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442) >>> at org.apache.flink.streaming.runtime.io >>> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:206) >>> at >>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.lang.ArrayIndexOutOfBoundsException >>> >>> >>> Since it looks like something is wrong in Watermark processing, in my case >>> Watermarks are generated in my KafkaSource: >>> >>> val stream = env.addSource( >>> new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), >>> consumerConfig) >>> .setStartFromLatest() >>> .assignTimestampsAndWatermarks( >>> new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) { >>> def extractTimestamp(element: AirTrafficEvent): Long = >>> element.instantValues.time.getMillis >>> }) >>> ) >>> These exceptions aren't really that informative per se and, from what I >>> see, the task triggering these exceptions is the following operator: >>> >>> val events = keyedStreamByID >>> .window(TumblingEventTimeWindows.of(Time.seconds(20))) >>> .maxBy("timestamp").name("latest_time").uid("latest_time") >>> >>> What could be the problem here in your opinion? It's not emitting >>> watermarks correctly? I'm not even how I could reproduce this exceptions, >>> since it looks like they happen pretty much randomly. >>> >>> Thank you all, >>> Federico D'Ambrosio >> >> >> >> -- >> Federico D'Ambrosio >> >> >> >> -- >> Federico D'Ambrosio > > > > -- > Federico D'Ambrosio > > > > -- > Federico D'Ambrosio