Thank you Aljoscha. I am able to Query state when I use the hostname of Job Manager instead of its IP Address. But I couldn't understand why it is not working if I give IP address.
On Thu, Jan 4, 2018 at 6:42 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > > Is my-machine:52650 the correct address for the JobManager running in > YARN? Also, Queryable State in Flink 1.3.2 is not easy to get to work when > you use YARN with HA mode. > > Best, > Aljoscha > > On 3. Jan 2018, at 16:02, Velu Mitwa <velumani.mit2...@gmail.com> wrote: > > Hi, > I am running a Flink Job which uses the Queryable State feature of Apache > Flink(1.3.2). I was able to do that in local mode. When I try to do that in > Cluster mode (Yarn Session), I am getting Actor not found Exception. > > Please help me to understand what is missing. > > *Exception Trace* > > > Query failed because of the following Exception: > akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka. > tcp://flink@my-machine:52650/), Path(/user/jobmanager)] > at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorS > election.scala:65) > at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorS > election.scala:63) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(Ba > tchingExecutor.scala:55) > at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor. > scala:73) > at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$. > unbatchedExecute(Future.scala:74) > at akka.dispatch.BatchingExecutor$class.execute( > BatchingExecutor.scala:120) > at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$. > execute(Future.scala:73) > at scala.concurrent.impl.CallbackRunnable.executeWithValue( > Promise.scala:40) > at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Pro > mise.scala:248) > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupp > ort.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at scala.concurrent.Future$InternalCallbackExecutor$.scala$ > concurrent$Future$InternalCallbackExecutor$$unbatchedExecute > (Future.scala:694) > at scala.concurrent.Future$InternalCallbackExecutor$.execute( > Future.scala:691) > at akka.actor.LightArrayRevolverScheduler$TaskHolder. > executeTask(Scheduler.scala:474) > at akka.actor.LightArrayRevolverScheduler$$anon$8. > executeBucket$1(Scheduler.scala:425) > at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick( > Scheduler.scala:429) > at akka.actor.LightArrayRevolverScheduler$$anon$8.run( > Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > > *Client Creation Snippet * > > * Configuration config = new Configuration();* > * config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, > jobManagerHost);* > * config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, > jobManagerPort);* > > * final HighAvailabilityServices highAvailabilityServices = > HighAvailabilityServicesUtils* > * .createHighAvailabilityServices(config, > Executors.newSingleThreadScheduledExecutor(),* > * > HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);* > > * this.client = new QueryableStateClient(config, > highAvailabilityServices);* > * }* > > > >