Agree with Gordon's below explanation! Besides that, maybe you can also check the job master's log which might probably show the specific exception to cause this failure.
I was thinking whether it is necessary to improve ExceptionInChainedOperatorException to also provide the message from the wrapped real exception, then users can easily get the root cause directly, not only for the current message "Could not forward element to next operator". Best, Zhijiang ------------------------------------------------------------------ From:Tzu-Li (Gordon) Tai <[email protected]> Send Time:2020 Mar. 18 (Wed.) 00:01 To:aj <[email protected]> Cc:user <[email protected]> Subject:Re: Help me understand this Exception Hi, The exception stack you posted simply means that the next operator in the chain failed to process the output watermark. There should be another exception, which would explain why some operator was closed / failed and eventually leading to the above exception. That would provide more insight to exactly why your job is failing. Cheers, Gordon On Tue, Mar 17, 2020 at 11:27 PM aj <[email protected]> wrote: Hi, I am running a streaming job with generating watermark like this : public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord> { @Override public long extractTimestamp(GenericRecord record, long previousElementTimestamp) { long timestamp = (long) record.get("event_ts"); LOGGER.info("timestamp----", timestamp); return timestamp; } @Override public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) { // simply emit a watermark with every event LOGGER.info("extractedTimestamp ", extractedTimestamp); return new Watermark(extractedTimestamp); } } Please help me understand what this exception means: java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:137) at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:116) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:784) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213) ... 10 more -- Thanks & Regards, Anuj Jain
