Hi Mike,

I think the root cause is that the size of java bytes array still exceed VM 
limit.
The exception message is not friendly and not covered by sanity check [1] as it 
uses different code path [2]:
The native method org.rocksdb.RocksIterator.$$YJP$$value0 would allocate the 
byte array directly without check.

If you want to walk around the problem, please consider to reduce the size of 
listState#add to avoid too large value.



[1] https://github.com/facebook/rocksdb/pull/3850
[2] 
https://github.com/ververica/frocksdb/blob/8608d75d85f8e1b3b64b73a4fb6d19baec61ba5c/java/rocksjni/iterator.cc#L239-L245

Best
Yun Tang

________________________________
From: Martijn Visser <martijnvis...@apache.org>
Sent: Monday, June 13, 2022 21:47
To: Mike Barborak <mi...@ec.ai>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: NegativeArraySizeException trying to take a savepoint

Hi Mike,

It would be worthwhile to check if this still occurs in Flink 1.14, since Flink 
bumped to a newer version of RocksDB in that version. Is that a possibility for 
you?

Best regards,

Martijn

Op ma 13 jun. 2022 om 15:21 schreef Mike Barborak 
<mi...@ec.ai<mailto:mi...@ec.ai>>:

When trying to savepoint our job, we are getting the stack trace below. Is 
there a way to know more about this failure? Like which function in the job 
graph is associated with the problematic state and which key (assuming it is 
keyed state)?



Or is there a fix for this exception? The only mention of this exception that I 
can find is in [1] and [2]. [1] has a message at the bottom saying that the 
issue was fixed in RocksDb in 2018. And while we do have a part of the job 
graph that matches the pattern discussed in these two links, our attempts to 
reproduce the problem by pumping messages through at a rate millions of times 
higher than normal have not worked.



We are using Flink version 1.13.5.



Thanks,

Mike



[1] https://issues.apache.org/jira/browse/FLINK-9268

[2] https://www.mail-archive.com/user@flink.apache.org/msg34915.html



Caused by: java.lang.Exception: Could not materialize checkpoint 49768 for 
operator KeyedProcess -> KeyedProcess -> re-operator-output -> Sink: Kafka sink 
to ec.platform.braid.responses-rtw (9/15)#0.

                at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:257)

                ... 4 more

Caused by: java.util.concurrent.ExecutionException: 
java.lang.NegativeArraySizeException: -785722504

                at java.base/java.util.concurrent.FutureTask.report(Unknown 
Source)

                at java.base/java.util.concurrent.FutureTask.get(Unknown Source)

                at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)

                at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)

                at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:128)

                ... 3 more

Caused by: java.lang.NegativeArraySizeException: -785722504

                at org.rocksdb.RocksIterator.$$YJP$$value0(Native Method)

                at org.rocksdb.RocksIterator.value0(RocksIterator.java)

                at org.rocksdb.RocksIterator.value(RocksIterator.java:50)

                at 
org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:103)

                at 
org.apache.flink.contrib.streaming.state.iterator.RocksSingleStateIterator.value(RocksSingleStateIterator.java:66)

                at 
org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:202)

                at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeKVStateData(FullSnapshotAsyncWriter.java:210)

                at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeSnapshotToOutputStream(FullSnapshotAsyncWriter.java:107)

                at 
org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:77)

                at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)

                at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)

                at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)

                at java.base/java.util.concurrent.FutureTask.run(Unknown Source)

                at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:633)

Reply via email to