[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-12-11 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795627#comment-17795627
 ] 

Hangxiang Yu commented on FLINK-26585:
--

[~jingge] I saw 1.18.1 is releasing.

Could this PR be involved in 1.18.1 or next 1.18.2 ? 

It could help a lot in the case [~czchen] mentioned.

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-12-08 Thread 陳昌倬


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794591#comment-17794591
 ] 

ChangZhuo Chen (陳昌倬) commented on FLINK-26585:
--

Is there any change to merge this into 1.18 series? 

Since CaseClassSerializer has been deprecated in 1.18, the only way to migrate 
away from CaseClassSerializer is state processor api. Without this fix, state 
processor API cannot run when key size is large. 

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-09-26 Thread Matthias Schwalbe (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769042#comment-17769042
 ] 

Matthias Schwalbe commented on FLINK-26585:
---

Found out what went wrong: [https://github.com/orgs/community/discussions/10369 
, |https://github.com/orgs/community/discussions/10369]

I had to also submit my comments ... a quite annoying/counter-intuitive 
'feature' of GitHub ...

 

 

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
>  Labels: pull-request-available
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-09-25 Thread Matthias Schwalbe (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768751#comment-17768751
 ] 

Matthias Schwalbe commented on FLINK-26585:
---

Thanks [~masteryhx] ,

... Jira somehow swallowed my responses (me confused :)), I'll take of it again 
tomorrow (likely)

Thias

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
>  Labels: pull-request-available
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-09-25 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768607#comment-17768607
 ] 

Hangxiang Yu commented on FLINK-26585:
--

[~Matthias Schwalbe] I have taked a second review last month but not received 
response from you.

You could check the newest comments from me last month.

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
>  Labels: pull-request-available
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-09-25 Thread Matthias Schwalbe (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768604#comment-17768604
 ] 

Matthias Schwalbe commented on FLINK-26585:
---

Hi [~masteryhx] ,

The PR [#23239|https://github.com/apache/flink/pull/23239] seems to be stuck 
for a while now, awaiting second review. May I kindly ask you to take care of 
that.

I believe I answered all open questions...

 

Sincere greeting

Thias

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
>  Labels: pull-request-available
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-08-18 Thread Matthias Schwalbe (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755934#comment-17755934
 ] 

Matthias Schwalbe commented on FLINK-26585:
---

[~masteryhx] :

I opened this PR #23239 for review. 

I didn't do the impact tests on Azure for lack of time resources.

Many thanks for your support

Thias

PS: I'll be off work next week and can answer requests only afterwards.

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
>  Labels: pull-request-available
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-08-16 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17754928#comment-17754928
 ] 

Hangxiang Yu commented on FLINK-26585:
--

[~Matthias Schwalbe] 
Sure, That is great.
I have assigned to you, please go ahead.

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Assignee: Matthias Schwalbe
>Priority: Major
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-08-16 Thread Matthias Schwalbe (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17754922#comment-17754922
 ] 

Matthias Schwalbe commented on FLINK-26585:
---

Hi [~masteryhx] ,

I've got clearance from my employer to go ahead with this.

I think about a way to provide a test that fails with the original 
implementation but succeeds with the adjusted one (need to figure out how to 
restrict memory allowance for a test mini cluster)

Should we assign the ticket to me before or later?

Thias

PS: I'll be off work next week, try to get something working this week.

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Priority: Major
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-08-14 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17754401#comment-17754401
 ] 

Hangxiang Yu commented on FLINK-26585:
--

[~Matthias Schwalbe] 
Hi, Thanks for reporting this. 
Let me try to summay the procedure of PR:
1. You may have implemented the code in your local codebase.

2.  Use _git remote add_ to add your personal flink github repo and apache 
flink github repo in your local codebase.

3. Rebase your code with branch (e.g. master) in apache flink repo.

4. Push your code into a new branch in your personal flink github repo.
Then you will find a Pull Request button in your personal repo, you could 
follow the procedure which will be reminded.

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Priority: Major
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-08-11 Thread Yangyang ZHANG (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753087#comment-17753087
 ] 

Yangyang ZHANG commented on FLINK-26585:


Hi, [~hangxiang], Could you give some tips and help [~Matthias Schwalbe] for 
his first PR?

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Priority: Major
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-08-11 Thread Matthias Schwalbe (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753068#comment-17753068
 ] 

Matthias Schwalbe commented on FLINK-26585:
---

Hello [~zhangyy91] : thanks for picking up on this. Locally I've found my way 
around this issue, but I also would like to contribute this solution.

The only thing is that I'm lacking practical experience in forking, PR-ing, and 
the other Flink contribution rituals, and I didn't find yet any hands-on 
instructions like a tutorial.

(I've read [https://flink.apache.org/how-to-contribute/contribute-code/,] but 
that leaves too many questions open to me)

 

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Priority: Major
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2023-08-10 Thread Yangyang ZHANG (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17752770#comment-17752770
 ] 

Yangyang ZHANG commented on FLINK-26585:


[~Matthias Schwalbe] Thanks for your feedback. Would you like to create a pull 
request for this issue?

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Priority: Major
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process

2022-05-05 Thread Matthias Schwalbe (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532201#comment-17532201
 ] 

Matthias Schwalbe commented on FLINK-26585:
---

I waited for the release of Flink 1.15.0 in order to see whether the problem 
remains (it does).

I found the root cause in this Java Bug ticket [1]which apparently won't be 
fixed.

The proposed solution (replaced MultiStateIterator implementation) would help.

As said, the bug gets annoying only when loading bigger state primitives, which 
can even cause OOM Exceptions in the underlying java.util.stream.SpinedBuffer.

 

 

[1] https://bugs.openjdk.java.net/browse/JDK-8267359

> State Processor API: Loading a state set buffers the whole state set in 
> memory before starting to process
> -
>
> Key: FLINK-26585
> URL: https://issues.apache.org/jira/browse/FLINK-26585
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: 1.13.0, 1.14.0, 1.15.0
>Reporter: Matthias Schwalbe
>Priority: Major
> Attachments: MultiStateKeyIteratorNoStreams.java
>
>
> * When loading a state, MultiStateKeyIterator load and bufferes the whole 
> state in memory before it event processes a single data point 
>  ** This is absolutely no problem for small state (hence the unit tests work 
> fine)
>  ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state 
> descriptors and flattens all datapoints contained within
>  ** The java.util.stream.Stream#flatMap function causes the buffering of the 
> whole data set when enumerated later on
>  ** See call stack [1] 
>  *** I our case this is 150e6 data points (> 1GiB just for the pointers to 
> the data, let alone the data itself ~30GiB)
>  ** I’m not aware of some instrumentation of Stream in order to avoid the 
> problem, hence
>  ** I coded an alternative implementation of MultiStateKeyIterator that 
> avoids using java Stream,
>  ** I can contribute our implementation (MultiStateKeyIteratorNoStreams)
> [1]
> Streams call stack:
> hasNext:77, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> next:82, RocksStateKeysIterator 
> (org.apache.flink.contrib.streaming.state.iterator)
> forEachRemaining:116, Iterator (java.util)
> forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util)
> forEach:580, ReferencePipeline$Head (java.util.stream)
> accept:270, ReferencePipeline$7$1 (java.util.stream)  
>  #  Stream flatMap(final Function Stream> var1)
> accept:373, ReferencePipeline$11$1 (java.util.stream) 
>  # Stream peek(final Consumer var1)
> accept:193, ReferencePipeline$3$1 (java.util.stream)      
>  #  Stream map(final Function 
> var1)
> tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util)
> lambda$initPartialTraversalState$0:294, 
> StreamSpliterators$WrappingSpliterator (java.util.stream)
> getAsBoolean:-1, 1528195520 
> (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57)
> fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator 
> (java.util.stream)
> tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream)
> hasNext:681, Spliterators$1Adapter (java.util)
> hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input)
> hasNext:162, KeyedStateReaderOperator$NamespaceDecorator 
> (org.apache.flink.state.api.input.operator)
> reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input)
> invoke:191, DataSourceTask (org.apache.flink.runtime.operators)
> doRun:776, Task (org.apache.flink.runtime.taskmanager)
> run:563, Task (org.apache.flink.runtime.taskmanager)
> run:748, Thread (java.lang)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)