Thanks, Chesnay,
I just created the 3 tickets (in my clumsy way):
* FLINK-26584<https://issues.apache.org/jira/browse/FLINK-26584> : State
Processor API fails to write savepoints exceeding 5MB
* FLINK-26585<https://issues.apache.org/jira/browse/FLINK-26585> : State
Processor API: Loading a state set buffers the whole state set in memory before
starting to process
* FLINK-26586<https://issues.apache.org/jira/browse/FLINK-26586> :
FileSystem uses unbuffered read I/O
I’ll be off the week starting Jan 21, but otherwise ready to discuss matters
Thias
From: Chesnay Schepler
Sent: Donnerstag, 10. März 2022 10:47
To: Schwalbe Matthias ; user@flink.apache.org
Subject: Re: Savepoint API challenged with large savepoints
That all sounds very interesting; I'd go ahead with creating tickets.
On 08/03/2022 13:43, Schwalbe Matthias wrote:
Dear Flink Team,
In the last weeks I was faced with a large savepoint (around 40GiB) that
contained lots of obsolete data points and overwhelmed our infrastructure (i.e.
failed to load/restart).
We could not afford to lose the state, hence I spent the time to transcode the
savepoint into something smaller (ended up with 2.5 GiB).
During my efforts I encountered a couple of points that make savepoint API
uneasy with larger savepoints, found simple solutions …
I would like to contribute my findings and ‘fixes’, however on my corporate
infrastructure I cannot fork/build Flink locally nor PR the changes later on.
Before creating Jira tickets I wanted to quickly discuss the matter.
Findings:
1. (We are currently on Flink 1.13 (RocksDB state backend) but all findings
apply as well to the latest version)
2. WritableSavepoint.write(…) falls back to JobManagerCheckpointStorage
which restricts savepoint size to 5MiB
* See relevant exception stack here [1]
* This is because SavepointTaskManagerRuntimeInfo.getConfiguration()
always returns empty Configuration, hence
* Neither “state.checkpoint-storage” nor “state.checkpoints.dir” are/can
be configured
* ‘fix’: provide SavepointTaskManagerRuntimeInfo.getConfiguration() with
a meaningful implementation and set configuration in
SavepointEnvironment.getTaskManagerInfo()
1. When loading a state, MultiStateKeyIterator load and bufferes the whole
state in memory before it event processes a single data point
* This is absolutely no problem for small state (hence the unit tests
work fine)
* MultiStateKeyIterator ctor sets up a java Stream that iterates all
state descriptors and flattens all datapoints contained within
* The java.util.stream.Stream#flatMap function causes the buffering of
the whole data set when enumerated later on
* See call stack [2]
* I our case this is 150e6 data points (> 1GiB just for the pointers
to the data, let alone the data itself ~30GiB)
* I’m not aware of some instrumentation if Stream in order to avoid the
problem, hence
* I coded an alternative implementation of MultiStateKeyIterator that
avoids using java Stream,
* I can contribute our implementation (MultiStateKeyIteratorNoStreams)
1. I found out that, at least when using LocalFileSystem on a windows
system, read I/O to load a savepoint is unbuffered,
* See example stack [3]
* i.e. in order to load only a long in a serializer, it needs to go into
kernel mode 8 times and load the 8 bytes one by one
* I coded a BufferedFSDataInputStreamWrapper that allows to opt-in
buffered reads on any FileSystem implementation
* In our setting savepoint load is now 30 times faster
* I’ve once seen a Jira ticket as to improve savepoint load time in
general (lost the link unfortunately), maybe this approach can help with it
* not sure if HDFS has got the same problem
* I can contribute my implementation
Looking forward to your comments
Matthias (Thias) Schwalbe
[1] exception stack:
8215140 [MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)#0] ERROR
BatchTask - Error in task code: MapPartition
(bb312595cb5ccc27fd3b2c729bbb9136) (2/4)
java.util.concurrent.ExecutionException: java.io.IOException: Size of the state
is larger than the maximum permitted memory-backed state. Size=180075318 ,
maxSize=5242880 . Consider using a different state backend, like the File
System State backend.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
at
org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:67)
at
org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStat