[ 
https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620743#comment-16620743
 ] 

Pierre Zemb edited comment on FLINK-10225 at 9/25/18 6:40 PM:
--------------------------------------------------------------

Hi! I've been reading the code a bit, and I have a question.

>From what I can see, only the TaskExecutor has enough knowledge to make a call 
>to the ResourceManager. He is also the only one that is updating the 
>ConcurrentHashMap used by the RPC handler. Meaning that when I'm inside the 
>RPC handler, I cannot find a way to nicely trigger a method of TaskExecutor.

 

I like the way the interface for KvStateClientProxy is implemented, and I don't 
want to change , do you have an idea on how could I implement this?

 

cc [~till.rohrmann]


was (Author: pierrez):
Hi! I've been reading the code a bit, and I have a question.

>From what I can see, only the TaskExecutor has enough knowledge to make a call 
>to the ResourceManager. He is also the only one that is updating the 
>ConcurrentHashMap used by the RPC handler. Meaning that when I'm inside the 
>RPC handler, I cannot find a way to nicely trigger a method of TaskExecutor.

 

I like the way the interface for KvStateClientProxy is implemented, and I don't 
want to change , do you have an idea on how could I implement this?

> Cannot access state from a empty taskmanager
> --------------------------------------------
>
>                 Key: FLINK-10225
>                 URL: https://issues.apache.org/jira/browse/FLINK-10225
>             Project: Flink
>          Issue Type: Bug
>          Components: Queryable State
>    Affects Versions: 1.5.3, 1.6.0
>         Environment: 4tm and 1jm for now on 1.6.0
>            Reporter: Pierre Zemb
>            Priority: Critical
>
> Hi!
> I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), 
> and deployed a small job on it. Because of the current load, job is 
> completely handled by a single tm. I've created a small proxy that is using 
> [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html]
>  to access the current state. It is working nicely, except under certain 
> circumstances. It seems to me that I can only access the state through a node 
> that is holding a part of the job. Here's an example:
>  * job on tm1. Pointing QueryableStateClient to tm1. State accessible
>  * job still on tm1. Pointing QueryableStateClient to tm2 (for example). 
> State inaccessible
>  * killing tm1, job is now on tm2. State accessible
>  * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible
>  * adding some parallelism to spread job on tm1 and tm2. Pointing 
> QueryableStateClient to either tm1 and tm2 is working
>  * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State 
> inaccessible
> When the state is inaccessible, I can see this (generated 
> [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]):
> {code:java}
> java.lang.RuntimeException: Failed request 0. Caused by: 
> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could 
> not retrieve location of state=repo-status of 
> job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is 
> not ready, or ii) the job does not exist. at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {code}
> Went a bit through the (master branch) code. Class KvStateClientProxy is 
> holding {color:#333333}kvStateLocationOracle the key-value state location 
> oracle for the given JobID. Here's the usage{color}{color:#333333}:{color}
>  * {color:#333333}updateKvStateLocationOracle() in registerQueryableState() 
> (TaskExecutor.java){color}
>  * {color:#333333}registerQueryableState() in associateWithJobManager() 
> (TaskExecutor.java){color}
>  * {color:#333333}associateWithJobManager in establishJobManagerConnection 
> (TaskExecutor.java){color}
>  * {color:#333333}establishJobManagerConnection in jobManagerGainedLeadership 
> (TaskExecutor.java){color}
>  * {color:#333333}jobManagerGainedLeadership in onRegistrationSuccess 
> (JobLeaderService.java){color}
> {color:#333333}It seems that the KvStateLocationOracle map is updated only 
> when the task manager is part of the job. {color}
> {color:#333333}For now, we are creating a List<CompletableFuture<...>> and 
> getting the first CompletableFuture.succeeded future, but that is a 
> workaround.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to