Hi guys,

We have faced with the issue to run a Beam job with Flink Runner on EMR in 
application and per-job-cluster modes (while it works ok in session mode).
Job Manager starts, but task managers are not.

Beam 2.27, 2.28
Flink 1.11.2
EMR 6.2.0

Job Manager log:
…
INFO: JobManager runner for job word-count-beam 
(5ba81e02dda8f2efb27f7f984bf426b0) was granted leadership with session id 
00000000-0000-0000-0000-000000000000 at 
akka.tcp://[email protected]:35491/user/rpc/jobmanager_2.
Feb 26, 2021 12:02:51 PM org.apache.flink.runtime.jobmaster.JobMaster 
startJobExecution
INFO: Starting execution of job word-count-beam 
(5ba81e02dda8f2efb27f7f984bf426b0) under job master id 
00000000000000000000000000000000.
Feb 26, 2021 12:02:51 PM org.apache.flink.runtime.scheduler.DefaultScheduler 
startSchedulingInternal
INFO: Starting scheduling with scheduling strategy 
[org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
Feb 26, 2021 12:02:51 PM org.apache.flink.runtime.executiongraph.ExecutionGraph 
transitionState
INFO: Job word-count-beam (5ba81e02dda8f2efb27f7f984bf426b0) switched from 
state CREATED to RUNNING.
…
INFO: Could not resolve ResourceManager address 
akka.tcp://[email protected]:35491/user/rpc/resourcemanager_*,
 retrying in 10000 ms: Could not connect to rpc endpoint under address 
akka.tcp://[email protected]:35491/user/rpc/resourcemanager_*.
…
SEVERE: Unhandled exception.
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Number of retries has been exhausted.
        at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:302)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
        at 
scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:58)
        at 
scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:53)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.rpc.exceptions.RpcConnectionException: Could not 
connect to rpc endpoint under address 
akka.tcp://[email protected]:35491/user/rpc/resourcemanager_*.
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcService.lambda$resolveActorAddress$10(AkkaRpcService.java:520)
        at 
scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:59)
        ... 8 more
Caused by: org.apache.flink.runtime.rpc.exceptions.RpcConnectionException: 
Could not connect to rpc endpoint under address 
akka.tcp://[email protected]:35491/user/rpc/resourcemanager_*.
        ... 10 more
Caused by: akka.actor.ActorNotFound: Actor not found for: 
ActorSelection[Anchor(akka://flink/), Path(/user/rpc/resourcemanager_*)]
        at 
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:71)
        at 
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:69)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
        at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
        at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
        at 
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:81)


Using the same cluster with the same flink config, we can run flink example 
jobs, so I guess issue could be on Beam side.

Could you please advice has anyone faced with similar issues before/was able 
successfully run streaming Beam job on EMR YARN in application or 
per-job-cluster modes.

Best regards,

Dmytro Dragan | [email protected]<mailto:[email protected]> | Lead 
Big Data Engineer| SoftServe<http://www.softserveinc.com/>


Reply via email to