Hi Gordon,

I remembered that I had already seen this kind of exception once during the
testing of the current job and fortunately I had the complete stacktrace
still saved on my pc:

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
        at
org.apache.flink.runtime.state.heap.HeapReducingState.get(HeapReducingState.java:68)
        at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:498)
        at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
        at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
        at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
        at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)

I don't know why now the stacktrace is getting output only for the first
parts (handleWatermark and HeapReducingState).

So, it looks like something that has to do with the KryoSerializer. As a
KryoSerializer I'm using JodaDateTimeSerializer, registered as follows:

env.getConfig.addDefaultKryoSerializer(classOf[DateTime],
classOf[JodaDateTimeSerializer])

I hope this could help.

Regards,
Federico

2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio <
federico.dambro...@smartlab.ws>:

> Hi Gordon,
>
> I'm currently using Flink 1.3.2 in local mode.
>
> If it's any help I realized from the log that the complete task which is
> failing is:
>
> 2017-09-29 14:17:20,354 INFO  org.apache.flink.runtime.
> taskmanager.Task                     - latest_time -> (map_active_stream,
> map_history_stream) (1/1) (5a6c9f187326f678701f939665db6685) switched
> from RUNNING to FAILED.
>
> val events = keyedStreamByID
>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>   .maxBy("time").name("latest_time").uid("latest_time")
>
>
> val activeStream = events
>   //Serialization to JsValue
>   .map(event => event.toMongoActiveJsValue).name("map_active_stream").uid(
> "map_active_stream")
>   //Global windowing, the cause of exception should be above
>   .timeWindowAll(Time.seconds(10))
>   .apply(new MongoWindow(MongoWritingType.UPDATE)).name("active_stream_
> window").uid("active_stream_window")
>
> val historyStream = airtrafficEvents
>   //Serialization to JsValue
>   .map(event => event.toMongoHistoryJsValue).name("map_history_stream").
> uid("map_history_stream")
>   //Global windowing, the cause of exception should be above
>   .timeWindowAll(Time.seconds(10))
>   .apply(new MongoWindow(MongoWritingType.UPDATE)).name("history_stream_
> window").uid("history_stream_window")
>
>
>
> Regards,
> Federico
>
> 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>:
>
>> Hi,
>>
>> I’m looking into this. Could you let us know the Flink version in which
>> the exceptions occurred?
>>
>> Cheers,
>> Gordon
>>
>>
>> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio (
>> federico.dambro...@smartlab.ws) wrote:
>>
>> Hi, I'm coming across these Exceptions while running a pretty simple flink 
>> job.
>>
>> First one:
>> java.lang.RuntimeException: Exception occurred while processing valve output 
>> watermark:
>>         at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>         at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>         at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>         at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>
>> The second one:
>> java.io.IOException: Exception while applying ReduceFunction in reducing 
>> state
>>         at 
>> org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82)
>>         at 
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>>         at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>
>>
>> Since it looks like something is wrong in Watermark processing, in my case 
>> Watermarks are generated in my KafkaSource:
>>
>> val stream = env.addSource(
>>   new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), 
>> consumerConfig)
>>     .setStartFromLatest()
>>     .assignTimestampsAndWatermarks(
>>       new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
>>         def extractTimestamp(element: AirTrafficEvent): Long =
>>           element.instantValues.time.getMillis
>>       })
>> )
>>
>> These exceptions aren't really that informative per se and, from what I
>> see, the task triggering these exceptions is the following operator:
>>
>> val events = keyedStreamByID
>>   .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>>   .maxBy("timestamp").name("latest_time").uid("latest_time")
>>
>> What could be the problem here in your opinion? It's not emitting
>> watermarks correctly? I'm not even how I could reproduce this exceptions,
>> since it looks like they happen pretty much randomly.
>>
>> Thank you all,
>> Federico D'Ambrosio
>>
>>
>
>
> --
> Federico D'Ambrosio
>



-- 
Federico D'Ambrosio

Reply via email to