Sorry, missing copy paste for the exception thrown:

10/17/2017 20:21:30     dropDetection -> (aggFlowDropDetectPrintln -> Sink: 
Unnamed, aggFlowDropDetectPrintln -> Sink: Unnamed, Sink: kafkaSink)(3/4) 
switched to CANCELED 
20:21:30,244 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       
 - Job Aggregate flows (313a46d5fd23e4c2d0d00d0033950b6d) switched from state 
FAILING to FAILED.
java.lang.NullPointerException: Keyed state can only be used on a 'keyed 
stream', i.e., after a 'keyBy()' operation.
        at 
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
        at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:151)
        at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:115)
        at 
FlowContractStitcherProcess.endState$lzycompute(FlowContractResolver.scala:30)
        at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
        at 
FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
        at 
FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
        at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
        at 
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)


--
Fritz


> On Oct 17, 2017, at 7:55 PM, Fritz Budiyanto <[email protected]> wrote:
> 
> Hi All,
> 
> If I have high parallelism and use processFunction to registerEventTimeTimer, 
> the timer never gets fired.
> After debugging, I found out the watermark isn't updated because I have keyBy 
> right after assignTimestampsAndWatermarks.
> And if I set assignTimestampsAndWatermarks right after the keyBy, an 
> exception is thrown.
> 
> val contractFlow = enrichedFlow
>      .keyBy(f => f.fiveTupleKey)
>      .assignTimestampsAndWatermarks(new AggFlowTimestampAssigner) <<<<<
>      .process(new FlowContractStitcherProcess)
>      .name("contractStitcher")
> 
> at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
>       at 
> FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
>       at 
> FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
>       at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>       at 
> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>       at java.lang.Thread.run(Thread.java:745)
> 
> 
> Any idea how to solve my problem ? How do I update the watermark after keyBy ?
> 
> Would I hit scaling issue if on large number of timer if I use 
> registerProcessingTimeTimer instead ? I'm using event time throughout the 
> pipeline, would mixing processing timer with event time might cause problem 
> down the line ?
> 
> --
> Fritz

Reply via email to