Hi,

I think this looks like the same problem as in this issue: 
https://issues.apache.org/jira/browse/FLINK-11420 
<https://issues.apache.org/jira/browse/FLINK-11420>

Best,
Stefan


> On 13. Mar 2019, at 09:41, Konstantin Knauf <konstan...@ververica.com> wrote:
> 
> Hi Andrew, 
> 
> generally, this looks like a concurrency problem. 
> 
> Are you using asynchronous checkpointing? If so, could you check if this 
> issue also occurs with synchronous checkpointing. There have been reports 
> recently, that there might be a problem with some Kryo types.
> 
> Can you set the logging level to DEBUG? We have some checks enabled in that 
> case in the Kryo serializer to verify that the KryoSerializer is really 
> concurrently accessed.
> 
> Are you using any Scala types, in particular collections or "Try"?
> 
> Cheers, 
> 
> Konstantin
> 
> On Sat, Mar 9, 2019 at 6:22 AM Andrew Roberts <arobe...@fuze.com 
> <mailto:arobe...@fuze.com>> wrote:
> This is with flink 1.6.4. I was on 1.6.2 and saw Kryo issues in many more 
> circumstances. 
> 
> On Mar 8, 2019, at 4:25 PM, Konstantin Knauf <konstan...@ververica.com 
> <mailto:konstan...@ververica.com>> wrote:
> 
>> Hi Andrew, 
>> 
>> which Flink version do you use? This sounds a bit like 
>> https://issues.apache.org/jira/browse/FLINK-8836 
>> <https://issues.apache.org/jira/browse/FLINK-8836>.
>> 
>> Cheers, 
>> 
>> Konstantin
>> 
>> On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts <arobe...@fuze.com 
>> <mailto:arobe...@fuze.com>> wrote:
>> Hello,
>> 
>> I’m trying to convert some of our larger stateful computations into 
>> something that aligns more with the Flink windowing framework, and 
>> particularly, start using “event time” instead of “ingest time” as a time 
>> characteristics.
>> 
>> My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka 
>> source), and while my data is generally time-ordered, there are some 
>> upstream races, so I’m attempting to assign timestamps and watermarks using 
>> BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When 
>> I assign timestamps directly in the Kafka sources (I’m also connecting two 
>> Kafka streams here) using 
>> FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my 
>> extractor has to do a bunch of “faking” because not every record that is 
>> produced will have a valid timestamp - for example, a record that can’t be 
>> parsed won’t.
>> 
>> When I assign timestamps downstream, after filtering the stream down to just 
>> records that are going to be windowed, I see errors in my Flink job:
>> 
>> java.io.IOException: Exception while applying AggregateFunction in 
>> aggregating state
>>         at 
>> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
>>         at 
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
>>         at org.apache.flink.streaming.runtime.io 
>> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>         at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>         at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>>         at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>>         at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>>         at scala.collection.immutable.List.foreach(List.scala:392)
>>         at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>>         at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>>         at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>>         at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>>         at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
>>         at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
>>         at 
>> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
>>         ... 6 more
>> 
>> I am calling aggregate() on my windows, but otherwise I see very little 
>> information that I can use to dig into this issue. Can anyone give me any 
>> insight into what is going wrong here? I’d much prefer assigning timestamps 
>> after filtering, rather than in the Kafka source, because I can filter down 
>> to only records that I know will have timestamps.
>> 
>> When experimenting with the lateness in my timestamp/watermark assigner, I 
>> also saw a similarly opaque exception:
>> 
>> java.lang.RuntimeException: Exception occurred while processing valve output 
>> watermark: 
>>         at org.apache.flink.streaming.runtime.io 
>> <http://runtime.io/>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>>         at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>>         at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>>         at org.apache.flink.streaming.runtime.io 
>> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 183
>>         at com.esotericsoftware.kryo.util.IntMap.get(IntMap.java:302)
>>         at 
>> com.esotericsoftware.kryo.util.DefaultClassResolver.getRegistration(DefaultClassResolver.java:70)
>>         at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
>>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
>>         at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
>>         at 
>> org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
>>         …
>> 
>> 
>> Any tips?
>> 
>> 
>> Thanks,
>> 
>> Andrew
>> -- 
>> *Confidentiality Notice: The information contained in this e-mail and any
>> 
>> attachments may be confidential. If you are not an intended recipient, you
>> 
>> are hereby notified that any dissemination, distribution or copying of this
>> 
>> e-mail is strictly prohibited. If you have received this e-mail in error,
>> 
>> please notify the sender and permanently delete the e-mail and any
>> 
>> attachments immediately. You should not retain, copy or use this e-mail or
>> 
>> any attachment for any purpose, nor disclose all or any part of the
>> 
>> contents to any other person. Thank you.*
>> 
>> 
>> -- 
>> Konstantin Knauf | Solutions Architect
>> +49 160 91394525
>>  <https://www.ververica.com/>
>> Follow us @VervericaData
>> --
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference
>> Stream Processing | Event Driven | Real Time
>> --
>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> --
>> Data Artisans GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen    
> 
> *Confidentiality Notice: The information contained in this e-mail and any
> attachments may be confidential. If you are not an intended recipient, you
> are hereby notified that any dissemination, distribution or copying of this
> e-mail is strictly prohibited. If you have received this e-mail in error,
> please notify the sender and permanently delete the e-mail and any
> attachments immediately. You should not retain, copy or use this e-mail or
> any attachment for any purpose, nor disclose all or any part of the
> contents to any other person. Thank you.*
> 
> 
> -- 
> Konstantin Knauf | Solutions Architect
> +49 160 91394525
>  <https://www.ververica.com/>
> Follow us @VervericaData
> --
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen    

Reply via email to