RE: Savepoint API challenged with large savepoints

2022-03-10 Thread Schwalbe Matthias
Thanks, Chesnay,

I just created the 3 tickets (in my clumsy way):

  *   FLINK-26584<https://issues.apache.org/jira/browse/FLINK-26584> : State 
Processor API fails to write savepoints exceeding 5MB
  *   FLINK-26585<https://issues.apache.org/jira/browse/FLINK-26585> : State 
Processor API: Loading a state set buffers the whole state set in memory before 
starting to process
  *   FLINK-26586<https://issues.apache.org/jira/browse/FLINK-26586> : 
FileSystem uses unbuffered read I/O

I’ll be off the week starting Jan  21, but otherwise ready to discuss matters

Thias




From: Chesnay Schepler 
Sent: Donnerstag, 10. März 2022 10:47
To: Schwalbe Matthias ; user@flink.apache.org
Subject: Re: Savepoint API challenged with large savepoints

That all sounds very interesting; I'd go ahead with creating tickets.

On 08/03/2022 13:43, Schwalbe Matthias wrote:
Dear Flink Team,

In the last weeks I was faced with a large savepoint (around 40GiB) that 
contained lots of obsolete data points and overwhelmed our infrastructure (i.e. 
failed to load/restart).
We could not afford to lose the state, hence I spent the time to transcode the 
savepoint into something smaller (ended up with 2.5 GiB).
During my efforts I encountered a couple of points that make savepoint API 
uneasy with larger savepoints, found simple solutions …

I would like to contribute my findings and ‘fixes’, however on my corporate 
infrastructure I cannot fork/build Flink locally nor PR the changes later on.

Before creating Jira tickets I wanted to quickly discuss the matter.

Findings:


  1.  (We are currently on Flink 1.13 (RocksDB state backend) but all findings 
apply as well to the latest version)
  2.  WritableSavepoint.write(…) falls back to JobManagerCheckpointStorage 
which restricts savepoint size to 5MiB

 *   See relevant exception stack here [1]
 *   This is because SavepointTaskManagerRuntimeInfo.getConfiguration() 
always returns empty Configuration, hence
 *   Neither “state.checkpoint-storage” nor “state.checkpoints.dir” are/can 
be configured
 *   ‘fix’: provide SavepointTaskManagerRuntimeInfo.getConfiguration() with 
a meaningful implementation and set configuration in 
SavepointEnvironment.getTaskManagerInfo()

  1.  When loading a state, MultiStateKeyIterator load and bufferes the whole 
state in memory before it event processes a single data point

 *   This is absolutely no problem for small state (hence the unit tests 
work fine)
 *   MultiStateKeyIterator ctor sets up a java Stream that iterates all 
state descriptors and flattens all datapoints contained within
 *   The java.util.stream.Stream#flatMap function causes the buffering of 
the whole data set when enumerated later on
 *   See call stack [2]

*   I our case this is 150e6 data points (> 1GiB just for the pointers 
to the data, let alone the data itself ~30GiB)

 *   I’m not aware of some instrumentation if Stream in order to avoid the 
problem, hence
 *   I coded an alternative implementation of MultiStateKeyIterator that 
avoids using java Stream,
 *   I can contribute our implementation (MultiStateKeyIteratorNoStreams)

  1.  I found out that, at least when using LocalFileSystem on a windows 
system, read I/O to load a savepoint is unbuffered,

 *   See example stack [3]
 *   i.e. in order to load only a long in a serializer, it needs to go into 
kernel mode 8 times and load the 8 bytes one by one
 *   I coded a BufferedFSDataInputStreamWrapper that allows to opt-in 
buffered reads on any FileSystem implementation
 *   In our setting savepoint load is now 30 times faster
 *   I’ve once seen a Jira ticket as to improve savepoint load time in 
general (lost the link unfortunately), maybe this approach can help with it
 *   not sure if HDFS has got the same problem
 *   I can contribute my implementation

Looking forward to your comments


Matthias (Thias) Schwalbe


[1] exception stack:
8215140 [MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)#0] ERROR 
BatchTask - Error in task code:  MapPartition 
(bb312595cb5ccc27fd3b2c729bbb9136) (2/4)
java.util.concurrent.ExecutionException: java.io.IOException: Size of the state 
is larger than the maximum permitted memory-backed state. Size=180075318 , 
maxSize=5242880 . Consider using a different state backend, like the File 
System State backend.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
at 
org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:67)
at 
org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStat

Re: Savepoint API challenged with large savepoints

2022-03-10 Thread Chesnay Schepler

That all sounds very interesting; I'd go ahead with creating tickets.

On 08/03/2022 13:43, Schwalbe Matthias wrote:


Dear Flink Team,

In the last weeks I was faced with a large savepoint (around 40GiB) 
that contained lots of obsolete data points and overwhelmed our 
infrastructure (i.e. failed to load/restart).


We could not afford to lose the state, hence I spent the time to 
transcode the savepoint into something smaller (ended up with 2.5 GiB).


During my efforts I encountered a couple of points that make savepoint 
API uneasy with larger savepoints, found simple solutions …


I would like to contribute my findings and ‘fixes’, however on my 
corporate infrastructure I cannot fork/build Flink locally nor PR the 
changes later on.


Before creating Jira tickets I wanted to quickly discuss the matter.

Findings:

  * /(We are currently on Flink 1.13 (RocksDB state backend) but all
findings apply as well to the latest version)/
  * WritableSavepoint.write(…) falls back to
JobManagerCheckpointStorage which restricts savepoint size to 5MiB
  o See relevant exception stack here [1]
  o This is because
SavepointTaskManagerRuntimeInfo.getConfiguration() always
returns empty Configuration, hence
  o Neither “state.checkpoint-storage” nor “state.checkpoints.dir”
are/can be configured
  o ‘fix’: provide
SavepointTaskManagerRuntimeInfo.getConfiguration() with a
meaningful implementation and set configuration in
SavepointEnvironment.getTaskManagerInfo()
  * When loading a state, MultiStateKeyIterator load and bufferes the
whole state in memory before it event processes a single data point
  o This is absolutely no problem for small state (hence the unit
tests work fine)
  o MultiStateKeyIterator ctor sets up a java Stream that iterates
all state descriptors and flattens all datapoints contained within
  o The java.util.stream.Stream#flatMap function causes the
buffering of the whole data set when enumerated later on
  o See call stack [2]
  + I our case this is 150e6 data points (> 1GiB just for the
pointers to the data, let alone the data itself ~30GiB)
  o I’m not aware of some instrumentation if Stream in order to
avoid the problem, hence
  o I coded an alternative implementation of MultiStateKeyIterator
that avoids using java Stream,
  o I can contribute our implementation
(MultiStateKeyIteratorNoStreams)
  * I found out that, at least when using LocalFileSystem on a windows
system, read I/O to load a savepoint is unbuffered,
  o See example stack [3]
  o i.e. in order to load only a long in a serializer, it needs to
go into kernel mode 8 times and load the 8 bytes one by one
  o I coded a BufferedFSDataInputStreamWrapper that allows to
opt-in buffered reads on any FileSystem implementation
  o In our setting savepoint load is now 30 times faster
  o I’ve once seen a Jira ticket as to improve savepoint load time
in general (lost the link unfortunately), maybe this approach
can help with it
  o not sure if HDFS has got the same problem
  o I can contribute my implementation

Looking forward to your comments

Matthias (Thias) Schwalbe

[1] exception stack:

8215140 [MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)#0] 
ERROR BatchTask - Error in task code:  MapPartition 
(bb312595cb5ccc27fd3b2c729bbb9136) (2/4)


java.util.concurrent.ExecutionException: java.io.IOException: Size of 
the state is larger than the maximum permitted memory-backed state. 
Size=180075318 , maxSize=5242880 . Consider using a different state 
backend, like the File System State backend.


    at java.util.concurrent.FutureTask.report(FutureTask.java:122)

    at java.util.concurrent.FutureTask.get(FutureTask.java:192)

    at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)


    at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)


    at 
org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:67)


    at 
org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:90)


    at 
org.apache.flink.state.api.output.BoundedStreamTask.processInput(BoundedStreamTask.java:107)


    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)


    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)


    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)


    at 
org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:80)


    at 
org.apache.flink.runtime.