Hi,
I am running a streaming job with generating watermark like this :
public static class SessionAssigner implements
AssignerWithPunctuatedWatermarks<GenericRecord> {
@Override
public long extractTimestamp(GenericRecord record, long
previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
LOGGER.info("timestamp----", timestamp);
return timestamp;
}
@Override
public Watermark checkAndGetNextWatermark(GenericRecord record,
long extractedTimestamp) {
// simply emit a watermark with every event
LOGGER.info("extractedTimestamp ", extractedTimestamp);
return new Watermark(extractedTimestamp);
}
}
Please help me understand what this exception means:
java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at org.apache.flink.streaming.runtime.io.
StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
StreamOneInputProcessor.java:216)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve
.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve
.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve
.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processElement(StreamOneInputProcessor.java:169)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:279)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
.java:301)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
727)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
705)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect
(TimestampedCollector.java:51)
at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:137)
at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:116)
at org.apache.flink.streaming.runtime.operators.windowing.functions.
InternalIterableProcessWindowFunction.process(
InternalIterableProcessWindowFunction.java:50)
at org.apache.flink.streaming.runtime.operators.windowing.functions.
InternalIterableProcessWindowFunction.process(
InternalIterableProcessWindowFunction.java:32)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.emitWindowContents(WindowOperator.java:549)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.onEventTime(WindowOperator.java:457)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
.advanceWatermark(InternalTimerServiceImpl.java:276)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager
.advanceWatermark(InternalTimeServiceManager.java:128)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.processWatermark(AbstractStreamOperator.java:784)
at org.apache.flink.streaming.runtime.io.
StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
StreamOneInputProcessor.java:213)
... 10 more
--
Thanks & Regards,
Anuj Jain
<http://www.cse.iitm.ac.in/%7Eanujjain/>