Hi,
I am trying to follow these tutorials
http://beam.apache.org/documentation/runners/flink/
For the Portable (Python)
I am not able to execute a Beam pipeline on a Flink cluster.
I am running a Flink Session Cluster with docker-compose,
This is my docker-compose file:
——————
version: "2.2"
services:
jobmanager:
image: flink:1.13.2-scala_2.11
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:1.13.2-scala_2.11
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
—————
I run the examples from a virtual environment, python3.8,
apache-beam==2.32.0
macOS Catalina 10.15.7
Docker desktop 4.1.1
When I run:
python -m apache_beam.examples.wordcount --input=text.txt
--output=out.txt --runner=FlinkRunner --flink_master=localhost:8081
--environment_type=LOOPBACK
I get this error:
/org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy/
/at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)/
/at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)/
/at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)/
/at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)/
/at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)/
/at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)/
/at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)/
/at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)/
/at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)/
/at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)/
/at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)/
/at java.lang.reflect.Method.invoke(Method.java:498)/
/at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)/
/at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)/
/at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)/
/at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)/
/at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)/
/at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)/
/at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)/
/at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)/
/at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)/
/at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)/
/at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)/
/at akka.actor.Actor$class.aroundReceive(Actor.scala:517)/
/at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)/
/at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)/
/at akka.actor.ActorCell.invoke(ActorCell.scala:561)/
/at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)/
/at akka.dispatch.Mailbox.run(Mailbox.scala:225)/
/at akka.dispatch.Mailbox.exec(Mailbox.scala:235)/
/at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)/
/at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)/
/at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)/
/at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)/
/Caused by: java.lang.Exception: The user defined 'open()' method
caused an exception:
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
UNAVAILABLE: io exception/
/at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:513)/
/at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)/
/at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)/
/at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)/
/at java.lang.Thread.run(Thread.java:748)/
/Caused by:
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
UNAVAILABLE: io exception/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)/
/at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)/
/at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)/
/at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)/
/at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)/
/at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)/
/at
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:157)/
/at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)/
/at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:508)/
/... 4 more/
/Caused by:
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
UNAVAILABLE: io exception/
/at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)/
/at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)/
/at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)/
/at
org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.startWorker(BeamFnExternalWorkerPoolGrpc.java:224)/
/at
org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:116)/
/at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)/
/at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)/
/... 16 more/
/Caused by:
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.AbstractChannel$AnnotatedConnectException:
finishConnect(..) failed: Connection refused:
localhost/127.0.0.1:57464 <http://127.0.0.1:57464/>/
/Caused by: java.net.ConnectException: finishConnect(..) failed:
Connection refused/
/at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.unix.Errors.throwConnectException(Errors.java:124)/
/at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.unix.Socket.finishConnect(Socket.java:251)/
/at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:672)/
/at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:649)/
/at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529)/
/at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465)/
/at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)/
/at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)/
/at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)/
/at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)/
/at java.lang.Thread.run(Thread.java:748)/
—————-
When I run it in default docker mode:
python -m apache_beam.examples.wordcount --input=text.txt
--output=out.txt --runner=FlinkRunner --flink_master=localhost:8081
I can see the job submitted to Flink UI on localhost:8081, but it fails.
I get this error:
/2021-10-15 17:52:12/
/org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy/
/at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)/
/at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)/
/at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)/
/at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)/
/at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)/
/at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)/
/at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)/
/at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)/
/at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)/
/at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)/
/at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)/
/at java.lang.reflect.Method.invoke(Method.java:498)/
/at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)/
/at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)/
/at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)/
/at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)/
/at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)/
/at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)/
/at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)/
/at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)/
/at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)/
/at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)/
/at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)/
/at akka.actor.Actor$class.aroundReceive(Actor.scala:517)/
/at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)/
/at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)/
/at akka.actor.ActorCell.invoke(ActorCell.scala:561)/
/at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)/
/at akka.dispatch.Mailbox.run(Mailbox.scala:225)/
/at akka.dispatch.Mailbox.exec(Mailbox.scala:235)/
/at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)/
/at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)/
/at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)/
/at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)/
/Caused by: java.lang.Exception: The user defined 'open()' method
caused an exception: java.io.IOException: Cannot run program "docker":
error=2, No such file or directory/
/at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:513)/
/at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)/
/at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)/
/at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)/
/at java.lang.Thread.run(Thread.java:748)/
/Caused by:
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.io.IOException: Cannot run program "docker": error=2, No such
file or directory/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)/
/at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)/
/at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)/
/at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)/
/at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)/
/at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)/
/at
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:157)/
/at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)/
/at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:508)/
/... 4 more/
/Caused by: java.io.IOException: Cannot run program "docker": error=2,
No such file or directory/
/at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)/
/at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)/
/at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)/
/at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)/
/at
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)/
/at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)/
/at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)/
/at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)/
/... 12 more/
/Caused by: java.io.IOException: error=2, No such file or directory/
/at java.lang.UNIXProcess.forkAndExec(Native Method)/
/at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)/
/at java.lang.ProcessImpl.start(ProcessImpl.java:134)/
/at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)/
/... 26 more/
————
Do you have any suggestions?
I also tried the Portable (Java/Python/Go) tutorial, with this difference:
docker run -p 8099:8099 -p 8098:8098 -p 8097:8097
apache/beam_flink1.13_job_server:latest --flink-master=localhost:8081
But I am ending up with similar issues.
Many thanks,
Chiara