Matthias Schwalbe created FLINK-26585: -----------------------------------------
Summary: State Processor API: Loading a state set buffers the whole state set in memory before starting to process Key: FLINK-26585 URL: https://issues.apache.org/jira/browse/FLINK-26585 Project: Flink Issue Type: Improvement Components: API / State Processor Affects Versions: 1.14.0, 1.13.0 Reporter: Matthias Schwalbe * When loading a state, MultiStateKeyIterator load and bufferes the whole state in memory before it event processes a single data point ** This is absolutely no problem for small state (hence the unit tests work fine) ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state descriptors and flattens all datapoints contained within ** The java.util.stream.Stream#flatMap function causes the buffering of the whole data set when enumerated later on ** See call stack [1] *** I our case this is 150e6 data points (> 1GiB just for the pointers to the data, let alone the data itself ~30GiB) ** I’m not aware of some instrumentation if Stream in order to avoid the problem, hence ** I coded an alternative implementation of MultiStateKeyIterator that avoids using java Stream, ** I can contribute our implementation (MultiStateKeyIteratorNoStreams) [1] Streams call stack: hasNext:77, RocksStateKeysIterator (org.apache.flink.contrib.streaming.state.iterator) next:82, RocksStateKeysIterator (org.apache.flink.contrib.streaming.state.iterator) forEachRemaining:116, Iterator (java.util) forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util) forEach:580, ReferencePipeline$Head (java.util.stream) accept:270, ReferencePipeline$7$1 (java.util.stream) # <R> Stream<R> flatMap(final Function<? super P_OUT, ? extends Stream<? extends R>> var1) accept:373, ReferencePipeline$11$1 (java.util.stream) # Stream<P_OUT> peek(final Consumer<? super P_OUT> var1) accept:193, ReferencePipeline$3$1 (java.util.stream) # <R> Stream<R> map(final Function<? super P_OUT, ? extends R> var1) tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util) lambda$initPartialTraversalState$0:294, StreamSpliterators$WrappingSpliterator (java.util.stream) getAsBoolean:-1, 1528195520 (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57) fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator (java.util.stream) doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator (java.util.stream) tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream) hasNext:681, Spliterators$1Adapter (java.util) hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input) hasNext:162, KeyedStateReaderOperator$NamespaceDecorator (org.apache.flink.state.api.input.operator) reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input) invoke:191, DataSourceTask (org.apache.flink.runtime.operators) doRun:776, Task (org.apache.flink.runtime.taskmanager) run:563, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang) -- This message was sent by Atlassian Jira (v8.20.1#820001)