[jira] [Commented] (FLINK-9302) Checkpoints continues to fail when using filesystem state backend with CIRCULAR REFERENCE:java.io.IOException

2018-05-07 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466281#comment-16466281
 ] 

Narayanan Arunachalam commented on FLINK-9302:
--

Thanks [~srichter]. I ran some tests over the weekend and found that though the 
error is from S3, it was actually because of the size of the checkpoint. In one 
of my tests, after the job recovered from a last good checkpoint, the state 
size continued to grow at ~10G every 15 mins.

This was because, too much data was read in to the pipeline with 
`maxOutOfOrderness` set to 60 secs. The windows won't fire soon enough 
resulting in large states. One option is to scale the cluster to deal with many 
open windows. But I realize `AscendingTimestampExtractor` might be enough for 
my use case and running some tests using this setting.

In any case, this particular error is a side effect of the way my windows are 
setup and no direct evidence of any bug. Thought I will post this comment 
anyway to keep you all in sync.

 

> Checkpoints continues to fail when using filesystem state backend with 
> CIRCULAR REFERENCE:java.io.IOException
> -
>
> Key: FLINK-9302
> URL: https://issues.apache.org/jira/browse/FLINK-9302
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> *state backend: filesystem*
> *checkpoint.mode:EXACTLY_ONCE*
> +dag:+
> val streams = sEnv
> .addSource(makeKafkaSource(config))
> .map(makeEvent)
> .keyBy(_.get(EVENT_GROUP_ID))
> .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
> .trigger(PurgingTrigger.of(EventTimeTrigger.create()))
> .apply(makeEventsList)
> .addSink(makeNoOpSink)
>  * The job runs fine and checkpoints succeed for few hours. 
>  * Later it fails because of the following checkpoint error.
>  * Once the job is recovered from the last successful checkpoint, it 
> continues to fail with the same checkpoint error.
>  * This persists until the job is restarted with no checkpoint state or using 
> the checkpoint previous to the last good one.
> AsynchronousException\{java.lang.Exception: Could not materialize checkpoint 
> 42 for operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: 
> NoOpSink, makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).}
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 42 for 
> operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: NoOpSink, 
> makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0
>  in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
> ... 5 more
> Suppressed: java.lang.Exception: Could not properly cancel managed keyed 
> state future.
> at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
> ... 5 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0
>  in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at 
> 

[jira] [Created] (FLINK-9302) Checkpoints continues to fail when using filesystem state backend with CIRCULAR REFERENCE:java.io.IOException

2018-05-05 Thread Narayanan Arunachalam (JIRA)
Narayanan Arunachalam created FLINK-9302:


 Summary: Checkpoints continues to fail when using filesystem state 
backend with CIRCULAR REFERENCE:java.io.IOException
 Key: FLINK-9302
 URL: https://issues.apache.org/jira/browse/FLINK-9302
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.4.2
Reporter: Narayanan Arunachalam


*state backend: filesystem*

*checkpoint.mode:EXACTLY_ONCE*

+dag:+

val streams = sEnv
.addSource(makeKafkaSource(config))
.map(makeEvent)
.keyBy(_.get(EVENT_GROUP_ID))
.window(EventTimeSessionWindows.withGap(Time.seconds(60)))
.trigger(PurgingTrigger.of(EventTimeTrigger.create()))
.apply(makeEventsList)
.addSink(makeNoOpSink)
 * The job runs fine and checkpoints succeed for few hours. 
 * Later it fails because of the following checkpoint error.
 * Once the job is recovered from the last successful checkpoint, it continues 
to fail with the same checkpoint error.
 * This persists until the job is restarted with no checkpoint state or using 
the checkpoint previous to the last good one.

AsynchronousException\{java.lang.Exception: Could not materialize checkpoint 42 
for operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: 
NoOpSink, makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).}

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.Exception: Could not materialize checkpoint 42 for 
operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: NoOpSink, 
makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).

... 6 more

Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could 
not flush and close the file system output stream to 
s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0
 in order to obtain the stream state handle

at java.util.concurrent.FutureTask.report(FutureTask.java:122)

at java.util.concurrent.FutureTask.get(FutureTask.java:192)

at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)

... 5 more

Suppressed: java.lang.Exception: Could not properly cancel managed keyed state 
future.

at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)

... 5 more

Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could 
not flush and close the file system output stream to 
s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0
 in order to obtain the stream state handle

at java.util.concurrent.FutureTask.report(FutureTask.java:122)

at java.util.concurrent.FutureTask.get(FutureTask.java:192)

at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)

at 
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)

at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)

... 7 more

Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0
 in order to obtain the stream state handle

at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:385)

at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:397)

at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)

at 
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)

... 5 

[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator

2018-05-03 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462187#comment-16462187
 ] 

Narayanan Arunachalam commented on FLINK-9268:
--

True

> RockDB errors from WindowOperator
> -
>
> Key: FLINK-9268
> URL: https://issues.apache.org/jira/browse/FLINK-9268
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and 
> uses processing time. The job fails with the error given below after running 
> for few hours. The only way to recover from this error is to cancel the job 
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
>  .addSource(makeKafkaSource(config))
>  .map(makeEvent)
>  .keyBy(_.get(EVENT_GROUP_ID))
>  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
>  .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
>  .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=90
> checkpoint.timeout=30
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
>  at org.rocksdb.RocksDB.get(Native Method)
>  at org.rocksdb.RocksDB.get(RocksDB.java:810)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
>  at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator

2018-05-03 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462154#comment-16462154
 ] 

Narayanan Arunachalam commented on FLINK-9268:
--

The window when triggered, all the events in the list will be transformed to 
create a smaller list, of spans and sent to the sink. I checked some metrics I 
added recently and I see the number of events in this list is 80k in couple of 
occurrences. So it must be because of the duplicates then.

> RockDB errors from WindowOperator
> -
>
> Key: FLINK-9268
> URL: https://issues.apache.org/jira/browse/FLINK-9268
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and 
> uses processing time. The job fails with the error given below after running 
> for few hours. The only way to recover from this error is to cancel the job 
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
>  .addSource(makeKafkaSource(config))
>  .map(makeEvent)
>  .keyBy(_.get(EVENT_GROUP_ID))
>  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
>  .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
>  .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=90
> checkpoint.timeout=30
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
>  at org.rocksdb.RocksDB.get(Native Method)
>  at org.rocksdb.RocksDB.get(RocksDB.java:810)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
>  at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator

2018-05-03 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462080#comment-16462080
 ] 

Narayanan Arunachalam commented on FLINK-9268:
--

`Is it possible for a session window to collect duplicate data?` - The 
checkpoint config is now set to AT_LEAST_ONCE. I was wondering whether the job 
is getting in to a state where a window contains events restored from a 
checkpoint and duplicate events replayed again. After this, if the job happens 
to fail again, this list can infinitely grow because of duplicates. 

> RockDB errors from WindowOperator
> -
>
> Key: FLINK-9268
> URL: https://issues.apache.org/jira/browse/FLINK-9268
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and 
> uses processing time. The job fails with the error given below after running 
> for few hours. The only way to recover from this error is to cancel the job 
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
>  .addSource(makeKafkaSource(config))
>  .map(makeEvent)
>  .keyBy(_.get(EVENT_GROUP_ID))
>  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
>  .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
>  .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=90
> checkpoint.timeout=30
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
>  at org.rocksdb.RocksDB.get(Native Method)
>  at org.rocksdb.RocksDB.get(RocksDB.java:810)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
>  at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9290) The job is unable to recover from a checkpoint

2018-05-03 Thread Narayanan Arunachalam (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Narayanan Arunachalam resolved FLINK-9290.
--
Resolution: Duplicate

> The job is unable to recover from a checkpoint
> --
>
> Key: FLINK-9290
> URL: https://issues.apache.org/jira/browse/FLINK-9290
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Blocker
>
> Using rocksdb state backend.
> The jobs runs fine for more than 24 hours and attempts recovery because of an 
> error from the sink. It continues to fail at the time recovery with the 
> following error. The workaround is to cancel the job and start it again.
> java.lang.IllegalStateException: Could not initialize operator state backend.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
> Serialization trace:
> topic 
> (org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:300)
>   ... 6 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
>   at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>   at java.util.ArrayList.get(ArrayList.java:433)
>   at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>   at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator

2018-05-03 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462051#comment-16462051
 ] 

Narayanan Arunachalam commented on FLINK-9268:
--

Is the 2GB Rocksdb limit apply to the size of one window state or a group of 
windows? If it's per window, I don't see why it would hit that limit. Because 
in my case, at max there will be only few hundred events collected in one 
window and size of each event is only kilo bytes. Is it possible for a session 
window to collect duplicate data?

> RockDB errors from WindowOperator
> -
>
> Key: FLINK-9268
> URL: https://issues.apache.org/jira/browse/FLINK-9268
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and 
> uses processing time. The job fails with the error given below after running 
> for few hours. The only way to recover from this error is to cancel the job 
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
>  .addSource(makeKafkaSource(config))
>  .map(makeEvent)
>  .keyBy(_.get(EVENT_GROUP_ID))
>  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
>  .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
>  .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=90
> checkpoint.timeout=30
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
>  at org.rocksdb.RocksDB.get(Native Method)
>  at org.rocksdb.RocksDB.get(RocksDB.java:810)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
>  at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9291) Checkpoint failure (CIRCULAR REFERENCE:java.lang.NegativeArraySizeException)

2018-05-03 Thread Narayanan Arunachalam (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Narayanan Arunachalam resolved FLINK-9291.
--
Resolution: Duplicate

> Checkpoint failure (CIRCULAR REFERENCE:java.lang.NegativeArraySizeException)
> 
>
> Key: FLINK-9291
> URL: https://issues.apache.org/jira/browse/FLINK-9291
> Project: Flink
>  Issue Type: Bug
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> Using rocksdb for state and after running for few hours, checkpointing fails 
> with the following error. The job recovers fine after this.
> AsynchronousException\{java.lang.Exception: Could not materialize checkpoint 
> 215 for operator makeSalpTrace -> countTraces -> makeZipkinTrace -> (Map -> 
> Sink: bs, Sink: es) (14/80).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 215 for 
> operator makeSalpTrace -> countTraces -> makeZipkinTrace -> (Map -> Sink: bs, 
> Sink: es) (14/80).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.NegativeArraySizeException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.NegativeArraySizeException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>   ... 7 more
>   Caused by: java.lang.NegativeArraySizeException
>   at org.rocksdb.RocksIterator.value0(Native Method)
>   at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBMergeIterator.value(RocksDBKeyedStateBackend.java:1898)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateData(RocksDBKeyedStateBackend.java:704)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:556)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:466)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:424)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   [CIRCULAR REFERENCE:java.lang.NegativeArraySizeException]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9291) Checkpoint failure (CIRCULAR REFERENCE:java.lang.NegativeArraySizeException)

2018-05-03 Thread Narayanan Arunachalam (JIRA)
Narayanan Arunachalam created FLINK-9291:


 Summary: Checkpoint failure (CIRCULAR 
REFERENCE:java.lang.NegativeArraySizeException)
 Key: FLINK-9291
 URL: https://issues.apache.org/jira/browse/FLINK-9291
 Project: Flink
  Issue Type: Bug
Reporter: Narayanan Arunachalam


Using rocksdb for state and after running for few hours, checkpointing fails 
with the following error. The job recovers fine after this.
AsynchronousException\{java.lang.Exception: Could not materialize checkpoint 
215 for operator makeSalpTrace -> countTraces -> makeZipkinTrace -> (Map -> 
Sink: bs, Sink: es) (14/80).}
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 215 for 
operator makeSalpTrace -> countTraces -> makeZipkinTrace -> (Map -> Sink: bs, 
Sink: es) (14/80).
... 6 more
Caused by: java.util.concurrent.ExecutionException: 
java.lang.NegativeArraySizeException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
... 5 more
Suppressed: java.lang.Exception: Could not properly cancel managed 
keyed state future.
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
... 5 more
Caused by: java.util.concurrent.ExecutionException: 
java.lang.NegativeArraySizeException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at 
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
... 7 more
Caused by: java.lang.NegativeArraySizeException
at org.rocksdb.RocksIterator.value0(Native Method)
at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBMergeIterator.value(RocksDBKeyedStateBackend.java:1898)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateData(RocksDBKeyedStateBackend.java:704)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:556)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:466)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:424)
at 
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
... 5 more
[CIRCULAR REFERENCE:java.lang.NegativeArraySizeException]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator

2018-05-02 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16461215#comment-16461215
 ] 

Narayanan Arunachalam commented on FLINK-9268:
--

Not sure it is related but I don't see this error with incremental 
checkpointing disabled.

> RockDB errors from WindowOperator
> -
>
> Key: FLINK-9268
> URL: https://issues.apache.org/jira/browse/FLINK-9268
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and 
> uses processing time. The job fails with the error given below after running 
> for few hours. The only way to recover from this error is to cancel the job 
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
>  .addSource(makeKafkaSource(config))
>  .map(makeEvent)
>  .keyBy(_.get(EVENT_GROUP_ID))
>  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
>  .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
>  .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=90
> checkpoint.timeout=30
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
>  at org.rocksdb.RocksDB.get(Native Method)
>  at org.rocksdb.RocksDB.get(RocksDB.java:810)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
>  at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9290) The job is unable to recover from a checkpoint

2018-05-02 Thread Narayanan Arunachalam (JIRA)
Narayanan Arunachalam created FLINK-9290:


 Summary: The job is unable to recover from a checkpoint
 Key: FLINK-9290
 URL: https://issues.apache.org/jira/browse/FLINK-9290
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.4.2
Reporter: Narayanan Arunachalam


Using rocksdb state backend.

The jobs runs fine for more than 24 hours and attempts recovery because of an 
error from the sink. It continues to fail at the time recovery with the 
following error. The workaround is to cancel the job and start it again.
java.lang.IllegalStateException: Could not initialize operator state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
Serialization trace:
topic 
(org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:584)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:399)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:300)
... 6 more
Caused by: java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator

2018-04-28 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457672#comment-16457672
 ] 

Narayanan Arunachalam commented on FLINK-9268:
--

Looks like I can't control using a set vs list. Because keyBy causes the values 
to be treated as list.

> RockDB errors from WindowOperator
> -
>
> Key: FLINK-9268
> URL: https://issues.apache.org/jira/browse/FLINK-9268
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and 
> uses processing time. The job fails with the error given below after running 
> for few hours. The only way to recover from this error is to cancel the job 
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
>  .addSource(makeKafkaSource(config))
>  .map(makeEvent)
>  .keyBy(_.get(EVENT_GROUP_ID))
>  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
>  .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
>  .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=90
> checkpoint.timeout=30
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
>  at org.rocksdb.RocksDB.get(Native Method)
>  at org.rocksdb.RocksDB.get(RocksDB.java:810)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
>  at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator

2018-04-28 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457641#comment-16457641
 ] 

Narayanan Arunachalam commented on FLINK-9268:
--

One possibility is that upon some kind of failure after the job is restarted, a 
window is seeing the same events again and before the window would close a job 
failure then would lead in to this state.

I think I could try turn the list to a set or configure checkpointing mode to 
EXACTLY_ONCE.

> RockDB errors from WindowOperator
> -
>
> Key: FLINK-9268
> URL: https://issues.apache.org/jira/browse/FLINK-9268
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and 
> uses processing time. The job fails with the error given below after running 
> for few hours. The only way to recover from this error is to cancel the job 
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
>  .addSource(makeKafkaSource(config))
>  .map(makeEvent)
>  .keyBy(_.get(EVENT_GROUP_ID))
>  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
>  .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
>  .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=90
> checkpoint.timeout=30
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
>  at org.rocksdb.RocksDB.get(Native Method)
>  at org.rocksdb.RocksDB.get(RocksDB.java:810)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
>  at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9268) RockDB errors from WindowOperator

2018-04-28 Thread Narayanan Arunachalam (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Narayanan Arunachalam updated FLINK-9268:
-
Affects Version/s: 1.4.2

> RockDB errors from WindowOperator
> -
>
> Key: FLINK-9268
> URL: https://issues.apache.org/jira/browse/FLINK-9268
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and 
> uses processing time. The job fails with the error given below after running 
> for few hours. The only way to recover from this error is to cancel the job 
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
>  .addSource(makeKafkaSource(config))
>  .map(makeEvent)
>  .keyBy(_.get(EVENT_GROUP_ID))
>  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
>  .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
>  .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=90
> checkpoint.timeout=30
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
>  at org.rocksdb.RocksDB.get(Native Method)
>  at org.rocksdb.RocksDB.get(RocksDB.java:810)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
>  at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9268) RockDB errors from WindowOperator

2018-04-27 Thread Narayanan Arunachalam (JIRA)
Narayanan Arunachalam created FLINK-9268:


 Summary: RockDB errors from WindowOperator
 Key: FLINK-9268
 URL: https://issues.apache.org/jira/browse/FLINK-9268
 Project: Flink
  Issue Type: Bug
  Components: DataStream API, State Backends, Checkpointing
Reporter: Narayanan Arunachalam


The job has no sinks, one Kafka source, does a windowing based on session and 
uses processing time. The job fails with the error given below after running 
for few hours. The only way to recover from this error is to cancel the job and 
start a new one.

Using S3 backend for externalized checkpoints.

A representative job DAG:

val streams = sEnv
 .addSource(makeKafkaSource(config))
 .map(makeEvent)
 .keyBy(_.get(EVENT_GROUP_ID))
 .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
 .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
 .apply(makeEventsList)
.addSink(makeNoOpSink)

A representative config:

state.backend=rocksDB

checkpoint.enabled=true
external.checkpoint.enabled=true
checkpoint.mode=AT_LEAST_ONCE
checkpoint.interval=90
checkpoint.timeout=30

Error:

TimerException\{java.lang.NegativeArraySizeException}
 at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
 at org.rocksdb.RocksDB.get(Native Method)
 at org.rocksdb.RocksDB.get(RocksDB.java:810)
 at 
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
 at 
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
 at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
 at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
 at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9258) ConcurrentModificationException in ComponentMetricGroup.getAllVariables

2018-04-25 Thread Narayanan Arunachalam (JIRA)
Narayanan Arunachalam created FLINK-9258:


 Summary: ConcurrentModificationException in 
ComponentMetricGroup.getAllVariables
 Key: FLINK-9258
 URL: https://issues.apache.org/jira/browse/FLINK-9258
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.4.2
Reporter: Narayanan Arunachalam


Seeing this exception at the job startup time. Looks like there is a race 
condition when the metrics variables are constructed.

The error is intermittent.

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

    at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)

    at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)

    at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)

    at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)

    at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)

    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)

    at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)

    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

    at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

    at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

    at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.util.ConcurrentModificationException

    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)

    at java.util.HashMap$EntryIterator.next(HashMap.java:1471)

    at java.util.HashMap$EntryIterator.next(HashMap.java:1469)

    at java.util.HashMap.putMapEntries(HashMap.java:511)

    at java.util.HashMap.putAll(HashMap.java:784)

    at 
org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)

    at 
org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)

    at 
com.netflix.spaas.metrics.MetricsReporterRegistry.getTags(MetricsReporterRegistry.java:147)

    at 
com.netflix.spaas.metrics.MetricsReporterRegistry.mergeWithSourceAndSinkTags(MetricsReporterRegistry.java:170)

    at 
com.netflix.spaas.metrics.MetricsReporterRegistry.addReporter(MetricsReporterRegistry.java:75)

    at 
com.netflix.spaas.nfflink.connector.kafka.source.Kafka010Consumer.createFetcher(Kafka010Consumer.java:69)

    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:549)

    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)

    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)

    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)

    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)

    at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-04-24 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450075#comment-16450075
 ] 

Narayanan Arunachalam commented on FLINK-9138:
--

I will try the changes. Upgrading is going to take a while for us, so I will 
have to try using custom sink or see if we can backport it.

> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-04-05 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16427293#comment-16427293
 ] 

Narayanan Arunachalam commented on FLINK-9138:
--

[~glaksh100] are you considering the PR for 1.4?

> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-04-05 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16427287#comment-16427287
 ] 

Narayanan Arunachalam commented on FLINK-9138:
--

This looks good. I suppose the event time is not applicable here. We will need 
a setter to pass the value for the config 'rollIntervalThreshold'.

> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-04-05 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16427227#comment-16427227
 ] 

Narayanan Arunachalam commented on FLINK-9138:
--

Thanks [~glaksh100]

> Enhance BucketingSink to also flush data by time interval
> -
>
> Key: FLINK-9138
> URL: https://issues.apache.org/jira/browse/FLINK-9138
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> BucketingSink now supports flushing data to the file system by size limit and 
> by period of inactivity. It will be useful to also flush data by a specified 
> time period. This way, the data will be written out when write throughput is 
> low but there is no significant time period gaps between the writes. This 
> reduces ETA for the data in the file system and should help move the 
> checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9138) Enhance BucketingSink to also flush data by time interval

2018-04-04 Thread Narayanan Arunachalam (JIRA)
Narayanan Arunachalam created FLINK-9138:


 Summary: Enhance BucketingSink to also flush data by time interval
 Key: FLINK-9138
 URL: https://issues.apache.org/jira/browse/FLINK-9138
 Project: Flink
  Issue Type: Improvement
  Components: filesystem-connector
Affects Versions: 1.4.2
Reporter: Narayanan Arunachalam


BucketingSink now supports flushing data to the file system by size limit and 
by period of inactivity. It will be useful to also flush data by a specified 
time period. This way, the data will be written out when write throughput is 
low but there is no significant time period gaps between the writes. This 
reduces ETA for the data in the file system and should help move the 
checkpoints faster as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8600) BucketingSink errors out when used with Presto filesystem

2018-02-07 Thread Narayanan Arunachalam (JIRA)
Narayanan Arunachalam created FLINK-8600:


 Summary: BucketingSink errors out when used with Presto filesystem
 Key: FLINK-8600
 URL: https://issues.apache.org/jira/browse/FLINK-8600
 Project: Flink
  Issue Type: Bug
  Components: FileSystem
Affects Versions: 1.4.0
Reporter: Narayanan Arunachalam


BucketingSink passes a non-qualified path when attempting to test the truncate 
behavior as you can see 
[here|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L606].

Because of this "Path is not absolute: uuid" error is thrown when used with the 
[PrestoS3FileSystem|https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)