[jira] [Commented] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP

2017-10-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202503#comment-16202503
 ] 

Aljoscha Krettek commented on FLINK-7435:
-

In addition to the {{NFASerializer}} there is a bug introduced in this change: 
https://github.com/apache/flink/pull/1134

The {{CaseClassSerializer}} no longer does a proper deep copy of the child 
serialisers and the {{KryoSerializer}} is not thread-save, meaning that this 
breaks when used in combination with asynchronous snapshots.

> FsStateBackend with incremental backup enable does not work with Keyed CEP
> --
>
> Key: FLINK-7435
> URL: https://issues.apache.org/jira/browse/FLINK-7435
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
> Environment: AWS EMR YARN, use CEP with pattern start -> next 
> (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend 
> with Incremental option open. 
>Reporter: daiqing
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>   ... 7 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>   at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>   at 
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>   ... 17 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP

2017-10-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202139#comment-16202139
 ] 

Aljoscha Krettek commented on FLINK-7435:
-

[~lidaiqing1993] When you say incremental, did you mean asynchronous snapshots?

> FsStateBackend with incremental backup enable does not work with Keyed CEP
> --
>
> Key: FLINK-7435
> URL: https://issues.apache.org/jira/browse/FLINK-7435
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
> Environment: AWS EMR YARN, use CEP with pattern start -> next 
> (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend 
> with Incremental option open. 
>Reporter: daiqing
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>   ... 7 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>   at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>   at 
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>   ... 17 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP

2017-10-12 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202007#comment-16202007
 ] 

Kostas Kloudas commented on FLINK-7435:
---

Hi [~lidaiqing]. I am looking into this issue, and the problem is encountered 
when FlinkCEP tries to deserialize the conditions of your pattern 
(`NFASerializer.deserializeCondition`). 

Could you please share more details about your job and even some code. For 
example the code of your pattern, the state backend you are using? 

Also, as I said in the previous comment, there is definitely a problem with the 
`duplicate()` method of the serializer. Could you please try this branch 
https://github.com/kl0u/flink/tree/cep-nfa-serializer-bug to see if it solves 
the problem? The main change is that the `NFASerializer.duplicate()` method 
becomes now:

```
@Override
public TypeSerializer duplicate() {
return new NFASerializer<>(eventSerializer.duplicate());
}
```

So if you have Flink's source code, then you can make this change yourself and 
try it out.

> FsStateBackend with incremental backup enable does not work with Keyed CEP
> --
>
> Key: FLINK-7435
> URL: https://issues.apache.org/jira/browse/FLINK-7435
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
> Environment: AWS EMR YARN, use CEP with pattern start -> next 
> (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend 
> with Incremental option open. 
>Reporter: daiqing
>Assignee: Kostas Kloudas
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>   ... 7 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>   at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>   at 
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at 

[jira] [Commented] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP

2017-08-13 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124875#comment-16124875
 ] 

Kostas Kloudas commented on FLINK-7435:
---

The problem here seems to be on the {{copy()}} method of the {{NFA}} which 
currently returns {{this}} and not an actual copy.

> FsStateBackend with incremental backup enable does not work with Keyed CEP
> --
>
> Key: FLINK-7435
> URL: https://issues.apache.org/jira/browse/FLINK-7435
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
> Environment: AWS EMR YARN, use CEP with pattern start -> next 
> (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend 
> with Incremental option open. 
>Reporter: daiqing
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>   ... 7 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>   at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>   at 
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>   ... 17 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)