[jira] [Commented] (FLINK-20646) ReduceTransformation does not work with RocksDBStateBackend

2021-01-13 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17263993#comment-17263993
 ] 

Xintong Song commented on FLINK-20646:
--

Fixed via:
* master (1.13): cf71008984dd68b25db3612a9ca39f197b7e09c8
* release-1.12: 4d1c9927f77b11f6990e447856fac6627a46bdcf

> ReduceTransformation does not work with RocksDBStateBackend
> ---
>
> Key: FLINK-20646
> URL: https://issues.apache.org/jira/browse/FLINK-20646
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.12.0
>Reporter: Xintong Song
>Assignee: Aljoscha Krettek
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.1
>
>
> The intra-slot managed memory sharing (FLIP-141) requires transformations to 
> properly declare their managed memory use cases.
> For RocksDB state backend, it requires all {{Transformation}}-s on a keyed 
> stream (with non-null {{KeySelector}}) to call 
> {{Transformation#updateManagedMemoryStateBackendUseCase}}, which the newly 
> introduced {{ReduceTransformation}} did not.
> As a result, Flink will not reserve managed memory for operators converted 
> from {{ReduceTransformation}} (FLINK-19931), leading to the following failure 
> when RocksDB state backend is used.
> {code}
> 16:58:49,373 WARN  
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Exception while restoring keyed state backend for 
> StreamGroupedReduceOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from 
> alternative (1/1), will retry while more alternatives are available.
> java.io.IOException: Failed to acquire shared cache resource for RocksDB
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:264)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
> [flink-runtime_2.11-1.12.0.jar:1.12.0]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
> [flink-runtime_2.11-1.12.0.jar:1.12.0]
>   at java.lang.Thread.run(Thread.java:832) [?:?]
> Caused by: java.lang.IllegalArgumentException: The fraction of memory to 
> allocate should not be 0. Please make sure that all types of managed memory 
> consumers contained in the job are configured with a non-negative weight via 
> `taskmanager.memory.managed.consumer-weights`.
>   at 
> 

[jira] [Commented] (FLINK-20646) ReduceTransformation does not work with RocksDBStateBackend

2020-12-17 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250962#comment-17250962
 ] 

Xintong Song commented on FLINK-20646:
--

[~aljoscha],

I think they are just playing with 1.12.0. The user posted his codes when 
running into this problem. It's just a word count.
https://paste.ubuntu.com/p/9WrBz3Xrc6/

Also, I'm wondering does it make sense to you to have something like 
{{AbstractKeyedTransformation}}? Not saying we should do it right now. We can 
do it after the quick fix. It should help improve the testability and prevent 
similar problems if we introduce new transformations later.

> ReduceTransformation does not work with RocksDBStateBackend
> ---
>
> Key: FLINK-20646
> URL: https://issues.apache.org/jira/browse/FLINK-20646
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.12.0
>Reporter: Xintong Song
>Assignee: Aljoscha Krettek
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.1
>
>
> The intra-slot managed memory sharing (FLIP-141) requires transformations to 
> properly declare their managed memory use cases.
> For RocksDB state backend, it requires all {{Transformation}}-s on a keyed 
> stream (with non-null {{KeySelector}}) to call 
> {{Transformation#updateManagedMemoryStateBackendUseCase}}, which the newly 
> introduced {{ReduceTransformation}} did not.
> As a result, Flink will not reserve managed memory for operators converted 
> from {{ReduceTransformation}} (FLINK-19931), leading to the following failure 
> when RocksDB state backend is used.
> {code}
> 16:58:49,373 WARN  
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Exception while restoring keyed state backend for 
> StreamGroupedReduceOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from 
> alternative (1/1), will retry while more alternatives are available.
> java.io.IOException: Failed to acquire shared cache resource for RocksDB
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:264)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
> [flink-runtime_2.11-1.12.0.jar:1.12.0]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
> [flink-runtime_2.11-1.12.0.jar:1.12.0]
>   at 

[jira] [Commented] (FLINK-20646) ReduceTransformation does not work with RocksDBStateBackend

2020-12-17 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250953#comment-17250953
 ] 

Aljoscha Krettek commented on FLINK-20646:
--

I think the windowless {{reduce()}} is not a common operation because it's not 
super useful in {{STREAMING}} mode. You just get an infinite stream of updates.

Do you know how they discovered the bug?

> ReduceTransformation does not work with RocksDBStateBackend
> ---
>
> Key: FLINK-20646
> URL: https://issues.apache.org/jira/browse/FLINK-20646
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.12.0
>Reporter: Xintong Song
>Assignee: Aljoscha Krettek
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.1
>
>
> The intra-slot managed memory sharing (FLIP-141) requires transformations to 
> properly declare their managed memory use cases.
> For RocksDB state backend, it requires all {{Transformation}}-s on a keyed 
> stream (with non-null {{KeySelector}}) to call 
> {{Transformation#updateManagedMemoryStateBackendUseCase}}, which the newly 
> introduced {{ReduceTransformation}} did not.
> As a result, Flink will not reserve managed memory for operators converted 
> from {{ReduceTransformation}} (FLINK-19931), leading to the following failure 
> when RocksDB state backend is used.
> {code}
> 16:58:49,373 WARN  
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Exception while restoring keyed state backend for 
> StreamGroupedReduceOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from 
> alternative (1/1), will retry while more alternatives are available.
> java.io.IOException: Failed to acquire shared cache resource for RocksDB
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:264)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
> [flink-runtime_2.11-1.12.0.jar:1.12.0]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
> [flink-runtime_2.11-1.12.0.jar:1.12.0]
>   at java.lang.Thread.run(Thread.java:832) [?:?]
> Caused by: java.lang.IllegalArgumentException: The fraction of memory to 
> allocate should not be 0. Please make sure that all types of managed memory 
> consumers contained in the job are configured with a non-negative weight via 

[jira] [Commented] (FLINK-20646) ReduceTransformation does not work with RocksDBStateBackend

2020-12-17 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250942#comment-17250942
 ] 

Aljoscha Krettek commented on FLINK-20646:
--

A very quick fix for this is to just call
{code}
updateManagedMemoryStateBackendUseCase(true);
{code}

in {{ReduceTransformation}}. I wouldn't add a test for it now and it seems we 
also don't have equivalent tests for other transformations.

One thing I noticed is that there are some conflicts in how we assign the 
memory requirements. We allow operators/transformations to do it but we also 
hard-set some things for batch execution mode. This comes from the fact that 
the Blink-derived Table runner basically re-implements the newer 
BATCH-execution machinery we now have for general operators/transformations. In 
the long run, we should get rid of this complication.

> ReduceTransformation does not work with RocksDBStateBackend
> ---
>
> Key: FLINK-20646
> URL: https://issues.apache.org/jira/browse/FLINK-20646
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.12.0
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Blocker
> Fix For: 1.12.1
>
>
> The intra-slot managed memory sharing (FLIP-141) requires transformations to 
> properly declare their managed memory use cases.
> For RocksDB state backend, it requires all {{Transformation}}-s on a keyed 
> stream (with non-null {{KeySelector}}) to call 
> {{Transformation#updateManagedMemoryStateBackendUseCase}}, which the newly 
> introduced {{ReduceTransformation}} did not.
> As a result, Flink will not reserve managed memory for operators converted 
> from {{ReduceTransformation}} (FLINK-19931), leading to the following failure 
> when RocksDB state backend is used.
> {code}
> 16:58:49,373 WARN  
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Exception while restoring keyed state backend for 
> StreamGroupedReduceOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from 
> alternative (1/1), will retry while more alternatives are available.
> java.io.IOException: Failed to acquire shared cache resource for RocksDB
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:264)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
> 

[jira] [Commented] (FLINK-20646) ReduceTransformation does not work with RocksDBStateBackend

2020-12-17 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250920#comment-17250920
 ] 

Xintong Song commented on FLINK-20646:
--

I think this but reveals two problems.
* Absence of common abstraction for "stateful transformations". Relying on 
various concrete transformation implementations to maintain the key selectors 
and declare the memory use case is fragile. New transformation implementations 
can easily overlook them, which is what's happening now.
* Lack of testings. I'm wondering how this problem escaped from our release 
testing. Reduce operation + RocksDB state backend should not be a that rare 
case. Obviously we don't have test coverage for such scenarios.

As for a quick fix, we can simply call 
{{updateManagedMemoryStateBackendUseCase}} in {{ReduceTransformation}}. I 
reviewed the type hierarchy of {{Transformation}} and do not see other 
sub-classes with this problem. 

> ReduceTransformation does not work with RocksDBStateBackend
> ---
>
> Key: FLINK-20646
> URL: https://issues.apache.org/jira/browse/FLINK-20646
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.12.0
>Reporter: Xintong Song
>Priority: Blocker
> Fix For: 1.12.1
>
>
> The intra-slot managed memory sharing (FLIP-141) requires transformations to 
> properly declare their managed memory use cases.
> For RocksDB state backend, it requires all {{Transformation}}-s on a keyed 
> stream (with non-null {{KeySelector}}) to call 
> {{Transformation#updateManagedMemoryStateBackendUseCase}}, which the newly 
> introduced {{ReduceTransformation}} did not.
> As an result, Flink will not reserve managed memory for operators converted 
> from {{ReduceTransformation}} (FLINK-19931), leading to the following failure 
> when RocksDB state backend is used.
> {code}
> 16:58:49,373 WARN  
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Exception while restoring keyed state backend for 
> StreamGroupedReduceOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from 
> alternative (1/1), will retry while more alternatives are available.
> java.io.IOException: Failed to acquire shared cache resource for RocksDB
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:264)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 

[jira] [Commented] (FLINK-20646) ReduceTransformation does not work with RocksDBStateBackend

2020-12-17 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250916#comment-17250916
 ] 

Xintong Song commented on FLINK-20646:
--

[~dwysakowicz],
Could you inform how widely is {{ReduceTransformation}} used? I'm afraid we 
would need to have a quick bugfix release if it's very widely used.

> ReduceTransformation does not work with RocksDBStateBackend
> ---
>
> Key: FLINK-20646
> URL: https://issues.apache.org/jira/browse/FLINK-20646
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.12.0
>Reporter: Xintong Song
>Priority: Blocker
> Fix For: 1.12.1
>
>
> The intra-slot managed memory sharing (FLIP-141) requires transformations to 
> properly declare their managed memory use cases.
> For RocksDB state backend, it requires all {{Transformation}}-s on a keyed 
> stream (with non-null {{KeySelector}}) to call 
> {{Transformation#updateManagedMemoryStateBackendUseCase}}, which the newly 
> introduced {{ReduceTransformation}} did not.
> As an result, Flink will not reserve managed memory for operators converted 
> from {{ReduceTransformation}} (FLINK-19931), leading to the following failure 
> when RocksDB state backend is used.
> {code}
> 16:58:49,373 WARN  
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - 
> Exception while restoring keyed state backend for 
> StreamGroupedReduceOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from 
> alternative (1/1), will retry while more alternatives are available.
> java.io.IOException: Failed to acquire shared cache resource for RocksDB
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:264)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
>  ~[flink-statebackend-rocksdb_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>  ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>  [flink-streaming-java_2.11-1.12.0.jar:1.12.0]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
> [flink-runtime_2.11-1.12.0.jar:1.12.0]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
> [flink-runtime_2.11-1.12.0.jar:1.12.0]
>   at java.lang.Thread.run(Thread.java:832) [?:?]
> Caused by: java.lang.IllegalArgumentException: The fraction of memory to 
> allocate should not be 0. Please make sure that all types of managed memory 
> consumers contained in the job are configured with a non-negative weight via 
> `taskmanager.memory.managed.consumer-weights`.
>   at 
>