Hi, Is my-machine:52650 the correct address for the JobManager running in YARN? Also, Queryable State in Flink 1.3.2 is not easy to get to work when you use YARN with HA mode.
Best, Aljoscha > On 3. Jan 2018, at 16:02, Velu Mitwa <velumani.mit2...@gmail.com> wrote: > > Hi, > I am running a Flink Job which uses the Queryable State feature of Apache > Flink(1.3.2). I was able to do that in local mode. When I try to do that in > Cluster mode (Yarn Session), I am getting Actor not found Exception. > > Please help me to understand what is missing. > > Exception Trace > > Query failed because of the following Exception: > akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@my-machine:52650/), > Path(/user/jobmanager)] > at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) > at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) > at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) > at > akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) > at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > > Client Creation Snippet > > Configuration config = new Configuration(); > config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, > jobManagerHost); > config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, > jobManagerPort); > > final HighAvailabilityServices highAvailabilityServices = > HighAvailabilityServicesUtils > .createHighAvailabilityServices(config, > Executors.newSingleThreadScheduledExecutor(), > > HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); > > this.client = new QueryableStateClient(config, highAvailabilityServices); > } >