Hi,

Thank you for your reply! I will post it.
> On Aug 12, 2017, at 4:20 AM, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> Hi Daiqing,
> 
> I think Stefan is right and this will be fixed in the upcoming release.
> Could you open a JIRA for it with the Exception that you posted here?
> 
> Thanks,
> Kostas
> 
>> On Aug 12, 2017, at 10:05 AM, Stefan Richter <s.rich...@data-artisans.com 
>> <mailto:s.rich...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> from a quick look, I would say this is likely a problem with the 
>> NFASerializer: this class seems to be stateful, but its 'duplicate()‘ method 
>> is simply returning ‚this‘. This means that code which relies on duplication 
>> of serializers to shield against concurrent accesses can break, because 
>> multiple threads can work on the same internal serializer state and corrupt 
>> it. Will take a deeper look an monday.
>> 
>> Best,
>> Stefan
>> 
>>> Am 11.08.2017 um 20:55 schrieb Daiqing Li <lidaiqing1...@gmail.com 
>>> <mailto:lidaiqing1...@gmail.com>>:
>>> 
>>> Hi,
>>> 
>>> I am running fling 1.3.1 on EMR. But I am getting this exception after 
>>> running for a while.
>>> 
>>> 
>>> java.lang.RuntimeException: Exception occurred while processing valve 
>>> output watermark: 
>>>     at 
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>     at 
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>     at 
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>     at 
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.RuntimeException: Could not copy NFA.
>>>     at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>>>     at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>>>     at 
>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>>>     at 
>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>>>     at 
>>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>>>     at 
>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>>>     at 
>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>>>     at 
>>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>>>     at 
>>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>>>     at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>>>     at 
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>>>     ... 7 more
>>> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>>>     at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>>>     at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>>>     at 
>>> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>>>     at 
>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>>>     at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>>>     at 
>>> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>>>     at 
>>> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>>>     at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>>>     at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>>>     ... 17 more
>> 
> 

Reply via email to