Thanks, Zhijiang and Gordon. I will see the logs to find out more.
On Wed, Mar 18, 2020 at 1:44 PM Zhijiang <[email protected]> wrote: > Agree with Gordon's below explanation! > > Besides that, maybe you can also check the job master's log which might > probably show the specific exception to cause this failure. > > I was thinking whether it is necessary to improve > ExceptionInChainedOperatorException to also provide the message from the > wrapped real exception, > then users can easily get the root cause directly, not only for the > current message "Could not forward element to next operator". > > Best, > Zhijiang > > ------------------------------------------------------------------ > From:Tzu-Li (Gordon) Tai <[email protected]> > Send Time:2020 Mar. 18 (Wed.) 00:01 > To:aj <[email protected]> > Cc:user <[email protected]> > Subject:Re: Help me understand this Exception > > Hi, > > The exception stack you posted simply means that the next operator in the > chain failed to process the output watermark. > There should be another exception, which would explain why some operator > was closed / failed and eventually leading to the above exception. > That would provide more insight to exactly why your job is failing. > > Cheers, > Gordon > > On Tue, Mar 17, 2020 at 11:27 PM aj <[email protected]> wrote: > Hi, > I am running a streaming job with generating watermark like this : > > public static class SessionAssigner implements > AssignerWithPunctuatedWatermarks<GenericRecord> { > @Override > public long extractTimestamp(GenericRecord record, long > previousElementTimestamp) { > long timestamp = (long) record.get("event_ts"); > LOGGER.info("timestamp----", timestamp); > return timestamp; > } > > @Override > public Watermark checkAndGetNextWatermark(GenericRecord record, long > extractedTimestamp) { > // simply emit a watermark with every event > LOGGER.info("extractedTimestamp ", extractedTimestamp); > return new Watermark(extractedTimestamp); > } > } > > Please help me understand what this exception means: > > java.lang.RuntimeException: Exception occurred while processing valve > output watermark: > at org.apache.flink.streaming.runtime.io. > StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark( > StreamOneInputProcessor.java:216) > at org.apache.flink.streaming.runtime.streamstatus. > StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels( > StatusWatermarkValve.java:189) > at org.apache.flink.streaming.runtime.streamstatus. > StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processElement(StreamOneInputProcessor.java:169) > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processInput(StreamOneInputProcessor.java:143) > at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( > StreamTask.java:279) > at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask > .java:301) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:406) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707) > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: > 727) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: > 705) > at org.apache.flink.streaming.api.operators.TimestampedCollector. > collect(TimestampedCollector.java:51) > at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java: > 137) > at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java: > 116) > at org.apache.flink.streaming.runtime.operators.windowing.functions. > InternalIterableProcessWindowFunction.process( > InternalIterableProcessWindowFunction.java:50) > at org.apache.flink.streaming.runtime.operators.windowing.functions. > InternalIterableProcessWindowFunction.process( > InternalIterableProcessWindowFunction.java:32) > at org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.emitWindowContents(WindowOperator.java:549) > at org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.onEventTime(WindowOperator.java:457) > at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl > .advanceWatermark(InternalTimerServiceImpl.java:276) > at org.apache.flink.streaming.api.operators.InternalTimeServiceManager > .advanceWatermark(InternalTimeServiceManager.java:128) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .processWatermark(AbstractStreamOperator.java:784) > at org.apache.flink.streaming.runtime.io. > StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark( > StreamOneInputProcessor.java:213) > ... 10 more > > -- > Thanks & Regards, > Anuj Jain > > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> > > > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>
