Hi Ori,

In your code, are you using the process() API?

.process(new MyProcessWindowFunction());

if you do, the ProcessWindowFunction is getting as argument an Iterable
with ALL elements collected along the session. This will make the state per
key potentially huge (like you're experiencing).

As Aljoscha Krettek suggested in the JIRA, if you can use the aggregate()
API and store in state only an aggregate that is getting incrementally
updated on every incoming event (this could be ONE Class / Map / Tuple /
etc) rather than keeping ALL elements.

See example here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction

Thanks,
Rafi


On Sat, Jul 11, 2020 at 10:29 AM Congxian Qiu <qcx978132...@gmail.com>
wrote:

> Hi Ori
>
> AFAIK, current the 2GB limit is still there. as a workaround, maybe you
> can reduce the state size. If this can not be done using the window
> operator, can the keyedprocessfunction[1] be ok for you?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
>
> Best,
> Congxian
>
>
> Ori Popowski <ori....@gmail.com> 于2020年7月8日周三 下午8:30写道:
>
>> I've asked this question in
>> https://issues.apache.org/jira/browse/FLINK-9268 but it's been inactive
>> for two years so I'm not sure it will be visible.
>>
>> While creating a savepoint I get a org.apache.flink.util.SerializedThrowable:
>> java.lang.NegativeArraySizeException. It's happening because some of my
>> windows have a keyed state of more than 2GiB, hitting RocksDB memory limit.
>>
>> How can I prevent this?
>>
>> As I understand it, I need somehow to limit the accumulated size of the
>> window I'm using, which is EventTimeWindow. However, I have no way of
>> doing so, because the WindowOperator manages its state on its own.
>>
>> Below is a full stack trace.
>>
>> org.apache.flink.util.SerializedThrowable: Could not materialize
>> checkpoint 139 for operator Window(EventTimeSessionWindows(1800000),
>> EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink:
>> Unnamed (23/189).
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.SerializedThrowable:
>> java.lang.NegativeArraySizeException
>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
>> at
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
>> ... 3 common frames omitted
>> Caused by: org.apache.flink.util.SerializedThrowable: null
>> at org.rocksdb.RocksIterator.value0(Native Method)
>> at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
>> at
>> org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:102)
>> at
>> org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:168)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:366)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
>> at
>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
>> at
>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
>> ... 5 common frames omitted
>>
>

Reply via email to