[ 
https://issues.apache.org/jira/browse/FLINK-25400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784691#comment-17784691
 ] 

Jinzhong Li commented on FLINK-25400:
-------------------------------------

I think this issue still exits in 1.19. 
The task configuration of SavepointEnvironment should be passed to 
SavepointTaskManagerRuntimeInfo,
so that SavepointEnvironment.getTaskManagerInfo().getConfiguration() can obtain 
the task configuration.

If my analysis is correct, I want to fix this bug.

> RocksDBStateBackend configurations does not work with SavepointEnvironment
> --------------------------------------------------------------------------
>
>                 Key: FLINK-25400
>                 URL: https://issues.apache.org/jira/browse/FLINK-25400
>             Project: Flink
>          Issue Type: Bug
>          Components: API / State Processor
>    Affects Versions: 1.12.2
>            Reporter: wuzhiyu
>            Priority: Major
>
> Hi~
> I'm trying to use flink-state-processor-api to do state migrations by reading 
> states from an existing savepoint, and writing them into a new savepoint 
> after certain transformations.
> However, the reading rate does not  meet my expectation.
> When I tried to tune RocksDB by enabling RocksDB native metrics, I found it 
> did not work.
> So I did some debug, I found when the job is running under a 
> SavepointEnvironment, no RocksDBStatebackend configurations will be passed to 
> RocksDBStateBackend.
> The whole process is described as below (code demonstrated is under version 
> release-1.12.2):
> First, when 
> org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend is 
> invoked:
>  
> {code:java}
> // org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend
> private StateBackend createStateBackend() throws Exception {
>     final StateBackend fromApplication =
>             configuration.getStateBackend(getUserCodeClassLoader());
>     return StateBackendLoader.fromApplicationOrConfigOrDefault(
>             fromApplication,
>             getEnvironment().getTaskManagerInfo().getConfiguration(),
>             getUserCodeClassLoader(),
>             LOG); {code}
> *getEnvironment()* returns a SavepointEnvironment instance.
>  
> And then 
> *org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo* 
> is invoked, it returns a new 
> *org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo* instance.
>  
> {code:java}
> // org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo
> @Override
> public TaskManagerRuntimeInfo getTaskManagerInfo() {
>     return new SavepointTaskManagerRuntimeInfo(getIOManager());
> } {code}
>  
> At last, 
> *org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration*
>  is invoked. It returns an empty configuration, which means all 
> configurations will be lost.
> {code:java}
> // 
> org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration
> @Override
> public Configuration getConfiguration() {
>     return new Configuration();
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to