Hi Omkar,
sorry for the late reply. This sounds like a serious issue. It looks
like some of the RocksDB data is corrupt. Are you sure this is not a
problem of you storage layer?
Otherwise I would investigate whether the serializers work correctly.
Maybe Beam did put a corrupt data into Flink's state?
Regards,
Timo
On 26.01.21 20:06, Deshpande, Omkar wrote:
Hello,
I am using flink 1.9 with beam 2.26 and rocksdb state backend. I am
getting this exception -
org.apache.flink.util.SerializedThrowable: Caught exception while
processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:978)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:952)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable:
org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException:
Error reading state.
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
... 7 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable:
java.lang.RuntimeException: Error reading state.
at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at
com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn$OnTimerInvoker$tsbufferexpiry$dHMtYnVmZmVyX2V4cGlyeQ.invokeOnTimer(Unknown
Source)
at
org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:235)
at
org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:226)
at
org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:237)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79)
at
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1010)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:995)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:990)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
... 7 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: Error reading state.
at
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:494)
at
com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn.onEventBufferExpiry(SessionEnrichDoFn.java:92)
Caused by: org.apache.flink.util.SerializedThrowable: Error while
retrieving data from RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:471)
at
com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn.onEventBufferExpiry(SessionEnrichDoFn.java:92)
at
com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn$OnTimerInvoker$tsbufferexpiry$dHMtYnVmZmVyX2V4cGlyeQ.invokeOnTimer(Unknown
Source)
at
org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:235)
at
org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:226)
at
org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:237)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79)
at
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1010)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:995)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:990)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: bad entry in block
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:810)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118)
... 22 common frames omitted
Deleting the state and starting clean resolved the issue. What would be
the root cause for this?
How do I debug this?
Thanks,
Omkar