Thank you Aljoscha.

I am able to Query state when I use the hostname of Job Manager instead of
its IP Address. But I couldn't understand why it is not working if I give
IP address.

On Thu, Jan 4, 2018 at 6:42 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> Is my-machine:52650 the correct address for the JobManager running in
> YARN? Also, Queryable State in Flink 1.3.2 is not easy to get to work when
> you use YARN with HA mode.
>
> Best,
> Aljoscha
>
> On 3. Jan 2018, at 16:02, Velu Mitwa <velumani.mit2...@gmail.com> wrote:
>
> Hi,
> I am running a Flink Job which uses the Queryable State feature of Apache
> Flink(1.3.2). I was able to do that in local mode. When I try to do that in
> Cluster mode (Yarn Session), I am getting Actor not found Exception.
>
> Please help me to understand what is missing.
>
> *Exception Trace*
>
>
> Query failed because of the following Exception:
> akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.
> tcp://flink@my-machine:52650/), Path(/user/jobmanager)]
>         at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorS
> election.scala:65)
>         at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorS
> election.scala:63)
>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>         at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(Ba
> tchingExecutor.scala:55)
>         at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.
> scala:73)
>         at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.
> unbatchedExecute(Future.scala:74)
>         at akka.dispatch.BatchingExecutor$class.execute(
> BatchingExecutor.scala:120)
>         at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.
> execute(Future.scala:73)
>         at scala.concurrent.impl.CallbackRunnable.executeWithValue(
> Promise.scala:40)
>         at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Pro
> mise.scala:248)
>         at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupp
> ort.scala:334)
>         at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>         at scala.concurrent.Future$InternalCallbackExecutor$.scala$
> concurrent$Future$InternalCallbackExecutor$$unbatchedExecute
> (Future.scala:694)
>         at scala.concurrent.Future$InternalCallbackExecutor$.execute(
> Future.scala:691)
>         at akka.actor.LightArrayRevolverScheduler$TaskHolder.
> executeTask(Scheduler.scala:474)
>         at akka.actor.LightArrayRevolverScheduler$$anon$8.
> executeBucket$1(Scheduler.scala:425)
>         at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(
> Scheduler.scala:429)
>         at akka.actor.LightArrayRevolverScheduler$$anon$8.run(
> Scheduler.scala:381)
>         at java.lang.Thread.run(Thread.java:745)
>
> *Client Creation Snippet *
>
> * Configuration config = new Configuration();*
> *    config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
> jobManagerHost);*
> *    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
> jobManagerPort);*
>
> *    final HighAvailabilityServices highAvailabilityServices =
> HighAvailabilityServicesUtils*
> *        .createHighAvailabilityServices(config,
> Executors.newSingleThreadScheduledExecutor(),*
> *
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);*
>
> *    this.client = new QueryableStateClient(config,
> highAvailabilityServices);*
> *  }*
>
>
>
>

Reply via email to