[jira] [Commented] (FLINK-9302) Checkpoints continues to fail when using filesystem state backend with CIRCULAR REFERENCE:java.io.IOException
[ https://issues.apache.org/jira/browse/FLINK-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466281#comment-16466281 ] Narayanan Arunachalam commented on FLINK-9302: -- Thanks [~srichter]. I ran some tests over the weekend and found that though the error is from S3, it was actually because of the size of the checkpoint. In one of my tests, after the job recovered from a last good checkpoint, the state size continued to grow at ~10G every 15 mins. This was because, too much data was read in to the pipeline with `maxOutOfOrderness` set to 60 secs. The windows won't fire soon enough resulting in large states. One option is to scale the cluster to deal with many open windows. But I realize `AscendingTimestampExtractor` might be enough for my use case and running some tests using this setting. In any case, this particular error is a side effect of the way my windows are setup and no direct evidence of any bug. Thought I will post this comment anyway to keep you all in sync. > Checkpoints continues to fail when using filesystem state backend with > CIRCULAR REFERENCE:java.io.IOException > - > > Key: FLINK-9302 > URL: https://issues.apache.org/jira/browse/FLINK-9302 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > *state backend: filesystem* > *checkpoint.mode:EXACTLY_ONCE* > +dag:+ > val streams = sEnv > .addSource(makeKafkaSource(config)) > .map(makeEvent) > .keyBy(_.get(EVENT_GROUP_ID)) > .window(EventTimeSessionWindows.withGap(Time.seconds(60))) > .trigger(PurgingTrigger.of(EventTimeTrigger.create())) > .apply(makeEventsList) > .addSink(makeNoOpSink) > * The job runs fine and checkpoints succeed for few hours. > * Later it fails because of the following checkpoint error. > * Once the job is recovered from the last successful checkpoint, it > continues to fail with the same checkpoint error. > * This persists until the job is restarted with no checkpoint state or using > the checkpoint previous to the last good one. > AsynchronousException\{java.lang.Exception: Could not materialize checkpoint > 42 for operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: > NoOpSink, makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 42 for > operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: NoOpSink, > makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed keyed > state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at >
[jira] [Created] (FLINK-9302) Checkpoints continues to fail when using filesystem state backend with CIRCULAR REFERENCE:java.io.IOException
Narayanan Arunachalam created FLINK-9302: Summary: Checkpoints continues to fail when using filesystem state backend with CIRCULAR REFERENCE:java.io.IOException Key: FLINK-9302 URL: https://issues.apache.org/jira/browse/FLINK-9302 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.4.2 Reporter: Narayanan Arunachalam *state backend: filesystem* *checkpoint.mode:EXACTLY_ONCE* +dag:+ val streams = sEnv .addSource(makeKafkaSource(config)) .map(makeEvent) .keyBy(_.get(EVENT_GROUP_ID)) .window(EventTimeSessionWindows.withGap(Time.seconds(60))) .trigger(PurgingTrigger.of(EventTimeTrigger.create())) .apply(makeEventsList) .addSink(makeNoOpSink) * The job runs fine and checkpoints succeed for few hours. * Later it fails because of the following checkpoint error. * Once the job is recovered from the last successful checkpoint, it continues to fail with the same checkpoint error. * This persists until the job is restarted with no checkpoint state or using the checkpoint previous to the last good one. AsynchronousException\{java.lang.Exception: Could not materialize checkpoint 42 for operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: NoOpSink, makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 42 for operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: NoOpSink, makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) ... 5 more Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) ... 7 more Caused by: java.io.IOException: Could not flush and close the file system output stream to s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:385) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:397) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339) at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) ... 5
[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator
[ https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462187#comment-16462187 ] Narayanan Arunachalam commented on FLINK-9268: -- True > RockDB errors from WindowOperator > - > > Key: FLINK-9268 > URL: https://issues.apache.org/jira/browse/FLINK-9268 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > The job has no sinks, one Kafka source, does a windowing based on session and > uses processing time. The job fails with the error given below after running > for few hours. The only way to recover from this error is to cancel the job > and start a new one. > Using S3 backend for externalized checkpoints. > A representative job DAG: > val streams = sEnv > .addSource(makeKafkaSource(config)) > .map(makeEvent) > .keyBy(_.get(EVENT_GROUP_ID)) > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60))) > .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create())) > .apply(makeEventsList) > .addSink(makeNoOpSink) > A representative config: > state.backend=rocksDB > checkpoint.enabled=true > external.checkpoint.enabled=true > checkpoint.mode=AT_LEAST_ONCE > checkpoint.interval=90 > checkpoint.timeout=30 > Error: > TimerException\{java.lang.NegativeArraySizeException} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NegativeArraySizeException > at org.rocksdb.RocksDB.get(Native Method) > at org.rocksdb.RocksDB.get(RocksDB.java:810) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator
[ https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462154#comment-16462154 ] Narayanan Arunachalam commented on FLINK-9268: -- The window when triggered, all the events in the list will be transformed to create a smaller list, of spans and sent to the sink. I checked some metrics I added recently and I see the number of events in this list is 80k in couple of occurrences. So it must be because of the duplicates then. > RockDB errors from WindowOperator > - > > Key: FLINK-9268 > URL: https://issues.apache.org/jira/browse/FLINK-9268 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > The job has no sinks, one Kafka source, does a windowing based on session and > uses processing time. The job fails with the error given below after running > for few hours. The only way to recover from this error is to cancel the job > and start a new one. > Using S3 backend for externalized checkpoints. > A representative job DAG: > val streams = sEnv > .addSource(makeKafkaSource(config)) > .map(makeEvent) > .keyBy(_.get(EVENT_GROUP_ID)) > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60))) > .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create())) > .apply(makeEventsList) > .addSink(makeNoOpSink) > A representative config: > state.backend=rocksDB > checkpoint.enabled=true > external.checkpoint.enabled=true > checkpoint.mode=AT_LEAST_ONCE > checkpoint.interval=90 > checkpoint.timeout=30 > Error: > TimerException\{java.lang.NegativeArraySizeException} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NegativeArraySizeException > at org.rocksdb.RocksDB.get(Native Method) > at org.rocksdb.RocksDB.get(RocksDB.java:810) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator
[ https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462080#comment-16462080 ] Narayanan Arunachalam commented on FLINK-9268: -- `Is it possible for a session window to collect duplicate data?` - The checkpoint config is now set to AT_LEAST_ONCE. I was wondering whether the job is getting in to a state where a window contains events restored from a checkpoint and duplicate events replayed again. After this, if the job happens to fail again, this list can infinitely grow because of duplicates. > RockDB errors from WindowOperator > - > > Key: FLINK-9268 > URL: https://issues.apache.org/jira/browse/FLINK-9268 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > The job has no sinks, one Kafka source, does a windowing based on session and > uses processing time. The job fails with the error given below after running > for few hours. The only way to recover from this error is to cancel the job > and start a new one. > Using S3 backend for externalized checkpoints. > A representative job DAG: > val streams = sEnv > .addSource(makeKafkaSource(config)) > .map(makeEvent) > .keyBy(_.get(EVENT_GROUP_ID)) > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60))) > .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create())) > .apply(makeEventsList) > .addSink(makeNoOpSink) > A representative config: > state.backend=rocksDB > checkpoint.enabled=true > external.checkpoint.enabled=true > checkpoint.mode=AT_LEAST_ONCE > checkpoint.interval=90 > checkpoint.timeout=30 > Error: > TimerException\{java.lang.NegativeArraySizeException} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NegativeArraySizeException > at org.rocksdb.RocksDB.get(Native Method) > at org.rocksdb.RocksDB.get(RocksDB.java:810) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9290) The job is unable to recover from a checkpoint
[ https://issues.apache.org/jira/browse/FLINK-9290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Narayanan Arunachalam resolved FLINK-9290. -- Resolution: Duplicate > The job is unable to recover from a checkpoint > -- > > Key: FLINK-9290 > URL: https://issues.apache.org/jira/browse/FLINK-9290 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Blocker > > Using rocksdb state backend. > The jobs runs fine for more than 24 hours and attempts recovery because of an > error from the sink. It continues to fail at the time recovery with the > following error. The workaround is to cancel the job and start it again. > java.lang.IllegalStateException: Could not initialize operator state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: com.esotericsoftware.kryo.KryoException: > java.lang.IndexOutOfBoundsException: Index: 2, Size: 1 > Serialization trace: > topic > (org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:300) > ... 6 more > Caused by: java.lang.IndexOutOfBoundsException: Index: 2, Size: 1 > at java.util.ArrayList.rangeCheck(ArrayList.java:657) > at java.util.ArrayList.get(ArrayList.java:433) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator
[ https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462051#comment-16462051 ] Narayanan Arunachalam commented on FLINK-9268: -- Is the 2GB Rocksdb limit apply to the size of one window state or a group of windows? If it's per window, I don't see why it would hit that limit. Because in my case, at max there will be only few hundred events collected in one window and size of each event is only kilo bytes. Is it possible for a session window to collect duplicate data? > RockDB errors from WindowOperator > - > > Key: FLINK-9268 > URL: https://issues.apache.org/jira/browse/FLINK-9268 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > The job has no sinks, one Kafka source, does a windowing based on session and > uses processing time. The job fails with the error given below after running > for few hours. The only way to recover from this error is to cancel the job > and start a new one. > Using S3 backend for externalized checkpoints. > A representative job DAG: > val streams = sEnv > .addSource(makeKafkaSource(config)) > .map(makeEvent) > .keyBy(_.get(EVENT_GROUP_ID)) > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60))) > .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create())) > .apply(makeEventsList) > .addSink(makeNoOpSink) > A representative config: > state.backend=rocksDB > checkpoint.enabled=true > external.checkpoint.enabled=true > checkpoint.mode=AT_LEAST_ONCE > checkpoint.interval=90 > checkpoint.timeout=30 > Error: > TimerException\{java.lang.NegativeArraySizeException} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NegativeArraySizeException > at org.rocksdb.RocksDB.get(Native Method) > at org.rocksdb.RocksDB.get(RocksDB.java:810) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9291) Checkpoint failure (CIRCULAR REFERENCE:java.lang.NegativeArraySizeException)
[ https://issues.apache.org/jira/browse/FLINK-9291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Narayanan Arunachalam resolved FLINK-9291. -- Resolution: Duplicate > Checkpoint failure (CIRCULAR REFERENCE:java.lang.NegativeArraySizeException) > > > Key: FLINK-9291 > URL: https://issues.apache.org/jira/browse/FLINK-9291 > Project: Flink > Issue Type: Bug >Reporter: Narayanan Arunachalam >Priority: Major > > Using rocksdb for state and after running for few hours, checkpointing fails > with the following error. The job recovers fine after this. > AsynchronousException\{java.lang.Exception: Could not materialize checkpoint > 215 for operator makeSalpTrace -> countTraces -> makeZipkinTrace -> (Map -> > Sink: bs, Sink: es) (14/80).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 215 for > operator makeSalpTrace -> countTraces -> makeZipkinTrace -> (Map -> Sink: bs, > Sink: es) (14/80). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.NegativeArraySizeException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.NegativeArraySizeException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) > ... 7 more > Caused by: java.lang.NegativeArraySizeException > at org.rocksdb.RocksIterator.value0(Native Method) > at org.rocksdb.RocksIterator.value(RocksIterator.java:50) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBMergeIterator.value(RocksDBKeyedStateBackend.java:1898) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateData(RocksDBKeyedStateBackend.java:704) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:556) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:466) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:424) > at > org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ... 5 more > [CIRCULAR REFERENCE:java.lang.NegativeArraySizeException] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9291) Checkpoint failure (CIRCULAR REFERENCE:java.lang.NegativeArraySizeException)
Narayanan Arunachalam created FLINK-9291: Summary: Checkpoint failure (CIRCULAR REFERENCE:java.lang.NegativeArraySizeException) Key: FLINK-9291 URL: https://issues.apache.org/jira/browse/FLINK-9291 Project: Flink Issue Type: Bug Reporter: Narayanan Arunachalam Using rocksdb for state and after running for few hours, checkpointing fails with the following error. The job recovers fine after this. AsynchronousException\{java.lang.Exception: Could not materialize checkpoint 215 for operator makeSalpTrace -> countTraces -> makeZipkinTrace -> (Map -> Sink: bs, Sink: es) (14/80).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 215 for operator makeSalpTrace -> countTraces -> makeZipkinTrace -> (Map -> Sink: bs, Sink: es) (14/80). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.NegativeArraySizeException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) ... 5 more Caused by: java.util.concurrent.ExecutionException: java.lang.NegativeArraySizeException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) ... 7 more Caused by: java.lang.NegativeArraySizeException at org.rocksdb.RocksIterator.value0(Native Method) at org.rocksdb.RocksIterator.value(RocksIterator.java:50) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBMergeIterator.value(RocksDBKeyedStateBackend.java:1898) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateData(RocksDBKeyedStateBackend.java:704) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:556) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:466) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:424) at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) ... 5 more [CIRCULAR REFERENCE:java.lang.NegativeArraySizeException] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator
[ https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16461215#comment-16461215 ] Narayanan Arunachalam commented on FLINK-9268: -- Not sure it is related but I don't see this error with incremental checkpointing disabled. > RockDB errors from WindowOperator > - > > Key: FLINK-9268 > URL: https://issues.apache.org/jira/browse/FLINK-9268 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > The job has no sinks, one Kafka source, does a windowing based on session and > uses processing time. The job fails with the error given below after running > for few hours. The only way to recover from this error is to cancel the job > and start a new one. > Using S3 backend for externalized checkpoints. > A representative job DAG: > val streams = sEnv > .addSource(makeKafkaSource(config)) > .map(makeEvent) > .keyBy(_.get(EVENT_GROUP_ID)) > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60))) > .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create())) > .apply(makeEventsList) > .addSink(makeNoOpSink) > A representative config: > state.backend=rocksDB > checkpoint.enabled=true > external.checkpoint.enabled=true > checkpoint.mode=AT_LEAST_ONCE > checkpoint.interval=90 > checkpoint.timeout=30 > Error: > TimerException\{java.lang.NegativeArraySizeException} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NegativeArraySizeException > at org.rocksdb.RocksDB.get(Native Method) > at org.rocksdb.RocksDB.get(RocksDB.java:810) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9290) The job is unable to recover from a checkpoint
Narayanan Arunachalam created FLINK-9290: Summary: The job is unable to recover from a checkpoint Key: FLINK-9290 URL: https://issues.apache.org/jira/browse/FLINK-9290 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.4.2 Reporter: Narayanan Arunachalam Using rocksdb state backend. The jobs runs fine for more than 24 hours and attempts recovery because of an error from the sink. It continues to fail at the time recovery with the following error. The workaround is to cancel the job and start it again. java.lang.IllegalStateException: Could not initialize operator state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 2, Size: 1 Serialization trace: topic (org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399) at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:300) ... 6 more Caused by: java.lang.IndexOutOfBoundsException: Index: 2, Size: 1 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.get(ArrayList.java:433) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator
[ https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457672#comment-16457672 ] Narayanan Arunachalam commented on FLINK-9268: -- Looks like I can't control using a set vs list. Because keyBy causes the values to be treated as list. > RockDB errors from WindowOperator > - > > Key: FLINK-9268 > URL: https://issues.apache.org/jira/browse/FLINK-9268 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > The job has no sinks, one Kafka source, does a windowing based on session and > uses processing time. The job fails with the error given below after running > for few hours. The only way to recover from this error is to cancel the job > and start a new one. > Using S3 backend for externalized checkpoints. > A representative job DAG: > val streams = sEnv > .addSource(makeKafkaSource(config)) > .map(makeEvent) > .keyBy(_.get(EVENT_GROUP_ID)) > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60))) > .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create())) > .apply(makeEventsList) > .addSink(makeNoOpSink) > A representative config: > state.backend=rocksDB > checkpoint.enabled=true > external.checkpoint.enabled=true > checkpoint.mode=AT_LEAST_ONCE > checkpoint.interval=90 > checkpoint.timeout=30 > Error: > TimerException\{java.lang.NegativeArraySizeException} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NegativeArraySizeException > at org.rocksdb.RocksDB.get(Native Method) > at org.rocksdb.RocksDB.get(RocksDB.java:810) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator
[ https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457641#comment-16457641 ] Narayanan Arunachalam commented on FLINK-9268: -- One possibility is that upon some kind of failure after the job is restarted, a window is seeing the same events again and before the window would close a job failure then would lead in to this state. I think I could try turn the list to a set or configure checkpointing mode to EXACTLY_ONCE. > RockDB errors from WindowOperator > - > > Key: FLINK-9268 > URL: https://issues.apache.org/jira/browse/FLINK-9268 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > The job has no sinks, one Kafka source, does a windowing based on session and > uses processing time. The job fails with the error given below after running > for few hours. The only way to recover from this error is to cancel the job > and start a new one. > Using S3 backend for externalized checkpoints. > A representative job DAG: > val streams = sEnv > .addSource(makeKafkaSource(config)) > .map(makeEvent) > .keyBy(_.get(EVENT_GROUP_ID)) > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60))) > .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create())) > .apply(makeEventsList) > .addSink(makeNoOpSink) > A representative config: > state.backend=rocksDB > checkpoint.enabled=true > external.checkpoint.enabled=true > checkpoint.mode=AT_LEAST_ONCE > checkpoint.interval=90 > checkpoint.timeout=30 > Error: > TimerException\{java.lang.NegativeArraySizeException} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NegativeArraySizeException > at org.rocksdb.RocksDB.get(Native Method) > at org.rocksdb.RocksDB.get(RocksDB.java:810) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9268) RockDB errors from WindowOperator
[ https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Narayanan Arunachalam updated FLINK-9268: - Affects Version/s: 1.4.2 > RockDB errors from WindowOperator > - > > Key: FLINK-9268 > URL: https://issues.apache.org/jira/browse/FLINK-9268 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > The job has no sinks, one Kafka source, does a windowing based on session and > uses processing time. The job fails with the error given below after running > for few hours. The only way to recover from this error is to cancel the job > and start a new one. > Using S3 backend for externalized checkpoints. > A representative job DAG: > val streams = sEnv > .addSource(makeKafkaSource(config)) > .map(makeEvent) > .keyBy(_.get(EVENT_GROUP_ID)) > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60))) > .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create())) > .apply(makeEventsList) > .addSink(makeNoOpSink) > A representative config: > state.backend=rocksDB > checkpoint.enabled=true > external.checkpoint.enabled=true > checkpoint.mode=AT_LEAST_ONCE > checkpoint.interval=90 > checkpoint.timeout=30 > Error: > TimerException\{java.lang.NegativeArraySizeException} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NegativeArraySizeException > at org.rocksdb.RocksDB.get(Native Method) > at org.rocksdb.RocksDB.get(RocksDB.java:810) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9268) RockDB errors from WindowOperator
Narayanan Arunachalam created FLINK-9268: Summary: RockDB errors from WindowOperator Key: FLINK-9268 URL: https://issues.apache.org/jira/browse/FLINK-9268 Project: Flink Issue Type: Bug Components: DataStream API, State Backends, Checkpointing Reporter: Narayanan Arunachalam The job has no sinks, one Kafka source, does a windowing based on session and uses processing time. The job fails with the error given below after running for few hours. The only way to recover from this error is to cancel the job and start a new one. Using S3 backend for externalized checkpoints. A representative job DAG: val streams = sEnv .addSource(makeKafkaSource(config)) .map(makeEvent) .keyBy(_.get(EVENT_GROUP_ID)) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60))) .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create())) .apply(makeEventsList) .addSink(makeNoOpSink) A representative config: state.backend=rocksDB checkpoint.enabled=true external.checkpoint.enabled=true checkpoint.mode=AT_LEAST_ONCE checkpoint.interval=90 checkpoint.timeout=30 Error: TimerException\{java.lang.NegativeArraySizeException} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NegativeArraySizeException at org.rocksdb.RocksDB.get(Native Method) at org.rocksdb.RocksDB.get(RocksDB.java:810) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9258) ConcurrentModificationException in ComponentMetricGroup.getAllVariables
Narayanan Arunachalam created FLINK-9258: Summary: ConcurrentModificationException in ComponentMetricGroup.getAllVariables Key: FLINK-9258 URL: https://issues.apache.org/jira/browse/FLINK-9258 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.4.2 Reporter: Narayanan Arunachalam Seeing this exception at the job startup time. Looks like there is a race condition when the metrics variables are constructed. The error is intermittent. Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) at java.util.HashMap$EntryIterator.next(HashMap.java:1471) at java.util.HashMap$EntryIterator.next(HashMap.java:1469) at java.util.HashMap.putMapEntries(HashMap.java:511) at java.util.HashMap.putAll(HashMap.java:784) at org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63) at org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63) at com.netflix.spaas.metrics.MetricsReporterRegistry.getTags(MetricsReporterRegistry.java:147) at com.netflix.spaas.metrics.MetricsReporterRegistry.mergeWithSourceAndSinkTags(MetricsReporterRegistry.java:170) at com.netflix.spaas.metrics.MetricsReporterRegistry.addReporter(MetricsReporterRegistry.java:75) at com.netflix.spaas.nfflink.connector.kafka.source.Kafka010Consumer.createFetcher(Kafka010Consumer.java:69) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:549) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval
[ https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450075#comment-16450075 ] Narayanan Arunachalam commented on FLINK-9138: -- I will try the changes. Upgrading is going to take a while for us, so I will have to try using custom sink or see if we can backport it. > Enhance BucketingSink to also flush data by time interval > - > > Key: FLINK-9138 > URL: https://issues.apache.org/jira/browse/FLINK-9138 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > BucketingSink now supports flushing data to the file system by size limit and > by period of inactivity. It will be useful to also flush data by a specified > time period. This way, the data will be written out when write throughput is > low but there is no significant time period gaps between the writes. This > reduces ETA for the data in the file system and should help move the > checkpoints faster as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval
[ https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16427293#comment-16427293 ] Narayanan Arunachalam commented on FLINK-9138: -- [~glaksh100] are you considering the PR for 1.4? > Enhance BucketingSink to also flush data by time interval > - > > Key: FLINK-9138 > URL: https://issues.apache.org/jira/browse/FLINK-9138 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > BucketingSink now supports flushing data to the file system by size limit and > by period of inactivity. It will be useful to also flush data by a specified > time period. This way, the data will be written out when write throughput is > low but there is no significant time period gaps between the writes. This > reduces ETA for the data in the file system and should help move the > checkpoints faster as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval
[ https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16427287#comment-16427287 ] Narayanan Arunachalam commented on FLINK-9138: -- This looks good. I suppose the event time is not applicable here. We will need a setter to pass the value for the config 'rollIntervalThreshold'. > Enhance BucketingSink to also flush data by time interval > - > > Key: FLINK-9138 > URL: https://issues.apache.org/jira/browse/FLINK-9138 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > BucketingSink now supports flushing data to the file system by size limit and > by period of inactivity. It will be useful to also flush data by a specified > time period. This way, the data will be written out when write throughput is > low but there is no significant time period gaps between the writes. This > reduces ETA for the data in the file system and should help move the > checkpoints faster as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval
[ https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16427227#comment-16427227 ] Narayanan Arunachalam commented on FLINK-9138: -- Thanks [~glaksh100] > Enhance BucketingSink to also flush data by time interval > - > > Key: FLINK-9138 > URL: https://issues.apache.org/jira/browse/FLINK-9138 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > BucketingSink now supports flushing data to the file system by size limit and > by period of inactivity. It will be useful to also flush data by a specified > time period. This way, the data will be written out when write throughput is > low but there is no significant time period gaps between the writes. This > reduces ETA for the data in the file system and should help move the > checkpoints faster as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9138) Enhance BucketingSink to also flush data by time interval
Narayanan Arunachalam created FLINK-9138: Summary: Enhance BucketingSink to also flush data by time interval Key: FLINK-9138 URL: https://issues.apache.org/jira/browse/FLINK-9138 Project: Flink Issue Type: Improvement Components: filesystem-connector Affects Versions: 1.4.2 Reporter: Narayanan Arunachalam BucketingSink now supports flushing data to the file system by size limit and by period of inactivity. It will be useful to also flush data by a specified time period. This way, the data will be written out when write throughput is low but there is no significant time period gaps between the writes. This reduces ETA for the data in the file system and should help move the checkpoints faster as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8600) BucketingSink errors out when used with Presto filesystem
Narayanan Arunachalam created FLINK-8600: Summary: BucketingSink errors out when used with Presto filesystem Key: FLINK-8600 URL: https://issues.apache.org/jira/browse/FLINK-8600 Project: Flink Issue Type: Bug Components: FileSystem Affects Versions: 1.4.0 Reporter: Narayanan Arunachalam BucketingSink passes a non-qualified path when attempting to test the truncate behavior as you can see [here|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L606]. Because of this "Path is not absolute: uuid" error is thrown when used with the [PrestoS3FileSystem|https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java] -- This message was sent by Atlassian JIRA (v7.6.3#76005)