[jira] [Commented] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP
[ https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202503#comment-16202503 ] Aljoscha Krettek commented on FLINK-7435: - In addition to the {{NFASerializer}} there is a bug introduced in this change: https://github.com/apache/flink/pull/1134 The {{CaseClassSerializer}} no longer does a proper deep copy of the child serialisers and the {{KryoSerializer}} is not thread-save, meaning that this breaks when used in combination with asynchronous snapshots. > FsStateBackend with incremental backup enable does not work with Keyed CEP > -- > > Key: FLINK-7435 > URL: https://issues.apache.org/jira/browse/FLINK-7435 > Project: Flink > Issue Type: Sub-task > Components: CEP >Affects Versions: 1.3.1, 1.3.2 > Environment: AWS EMR YARN, use CEP with pattern start -> next > (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend > with Incremental option open. >Reporter: daiqing >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.4.0 > > > java.lang.RuntimeException: Exception occurred while processing valve output > watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Could not copy NFA. > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908) > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) > ... 7 more > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620) > at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719) > at > java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903) > ... 17 more -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP
[ https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202139#comment-16202139 ] Aljoscha Krettek commented on FLINK-7435: - [~lidaiqing1993] When you say incremental, did you mean asynchronous snapshots? > FsStateBackend with incremental backup enable does not work with Keyed CEP > -- > > Key: FLINK-7435 > URL: https://issues.apache.org/jira/browse/FLINK-7435 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1, 1.3.2 > Environment: AWS EMR YARN, use CEP with pattern start -> next > (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend > with Incremental option open. >Reporter: daiqing >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.4.0 > > > java.lang.RuntimeException: Exception occurred while processing valve output > watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Could not copy NFA. > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908) > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) > ... 7 more > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620) > at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719) > at > java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903) > ... 17 more -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP
[ https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202007#comment-16202007 ] Kostas Kloudas commented on FLINK-7435: --- Hi [~lidaiqing]. I am looking into this issue, and the problem is encountered when FlinkCEP tries to deserialize the conditions of your pattern (`NFASerializer.deserializeCondition`). Could you please share more details about your job and even some code. For example the code of your pattern, the state backend you are using? Also, as I said in the previous comment, there is definitely a problem with the `duplicate()` method of the serializer. Could you please try this branch https://github.com/kl0u/flink/tree/cep-nfa-serializer-bug to see if it solves the problem? The main change is that the `NFASerializer.duplicate()` method becomes now: ``` @Override public TypeSerializerduplicate() { return new NFASerializer<>(eventSerializer.duplicate()); } ``` So if you have Flink's source code, then you can make this change yourself and try it out. > FsStateBackend with incremental backup enable does not work with Keyed CEP > -- > > Key: FLINK-7435 > URL: https://issues.apache.org/jira/browse/FLINK-7435 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1, 1.3.2 > Environment: AWS EMR YARN, use CEP with pattern start -> next > (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend > with Incremental option open. >Reporter: daiqing >Assignee: Kostas Kloudas > > java.lang.RuntimeException: Exception occurred while processing valve output > watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Could not copy NFA. > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908) > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) > ... 7 more > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620) > at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719) > at > java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at
[jira] [Commented] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP
[ https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124875#comment-16124875 ] Kostas Kloudas commented on FLINK-7435: --- The problem here seems to be on the {{copy()}} method of the {{NFA}} which currently returns {{this}} and not an actual copy. > FsStateBackend with incremental backup enable does not work with Keyed CEP > -- > > Key: FLINK-7435 > URL: https://issues.apache.org/jira/browse/FLINK-7435 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1, 1.3.2 > Environment: AWS EMR YARN, use CEP with pattern start -> next > (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend > with Incremental option open. >Reporter: daiqing > > java.lang.RuntimeException: Exception occurred while processing valve output > watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Could not copy NFA. > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908) > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) > ... 7 more > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620) > at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719) > at > java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903) > ... 17 more -- This message was sent by Atlassian JIRA (v6.4.14#64029)