Hello Kyle,
I did finally manage to run this setup on gcp after spinning up a Debian server
and doing the necessary setup to submit python beam pipeline on flink job
server and flink cluster ( I still cannot run directly using FlinkRunner).
The jobs are getting submitted on flink cluster , but they are failing with
these errors( attached logs).
I have setup the -artifacts-dir as gs://<bucket>/prefix while running the flink
job server, but still it fails. I don't think this is access issue as the
instance on which the flink cluster is running has full access to gcs.
I tried following this
https://stackoverflow.com/questions/59429897/beam-running-on-flink-with-python-sdk-and-using-google-cloud-storage-for-artifac
But it seems, there are other things to do to fix this.
Please suggest.
From: Ashish Raghav <[email protected]>
Sent: 30 May 2020 10:12
To: [email protected]
Subject: Re: Issue while submitting python beam pipeline on flink - local
Hello Kyle, reply below.
Also, what is the stack to run this as production setup on gcloud?? I can try
that setup to see if this works.
Get Outlook for Android<https://aka.ms/ghei36>
________________________________
From: Kyle Weaver <[email protected]<mailto:[email protected]>>
Sent: Thursday, May 28, 2020, 10:34 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: Issue while submitting python beam pipeline on flink - local
EXTERNAL EMAIL
Do not click links or open attachments unless you recognise the sender and know
the content is safe. Report suspicious email to
[email protected]<mailto:[email protected]>.
> You are using the LOOPBACK environment which requires that the Flink
> cluster can connect back to your local machine. Since the loopback
> environment by defaults binds to localhost that should not be possible.
On the Flink runner page, we recommend using --net=host to avoid the kinds of
networking issues Ashish is experiencing. But the Docker documentation says the
host networking option is only available on Linux
(https://docs.docker.com/network/host/). You will either have to:
a) expose the required ports, or ==> I tried exposing 8099 and not using
--net=host on docker desktop for windows and it did not work. I also tried
running job server on gcloud shell and submitting pipeline over gcloud shell
after setting up all requirements. it fails with same errors as local.
b) run the job server in a local process instead of a Docker container, as
described in older versions of the documentation ==> I havent tried it yet.
(https://web.archive.org/web/20191228110311/https://beam.apache.org/documentation/runners/flink/<https://web.archive.org/web/20191228110311/https:/beam.apache.org/documentation/runners/flink/>)
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
LegacyArtifactStagingService started on localhost:8098
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
Java ExpansionService started on localhost:8097
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
JobService started on localhost:8099
[grpc-default-executor-1] INFO org.apache.beam.runners.flink.FlinkJobInvoker -
Invoking job beam_to_flink_cluster_f66fa319-20ac-41a8-ae78-576f795e6c41 with
pipeline runner org.apache.beam.runners.flink.FlinkPipelineRunner@5ea48610
[grpc-default-executor-1] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting job
invocation beam_to_flink_cluster_f66fa319-20ac-41a8-ae78-576f795e6c41
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to
Flink program.
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Batch
Execution Environment.
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink Master
URL localhost:8081.
[flink-runner-job-invoker] INFO org.apache.flink.api.java.ExecutionEnvironment
- The job has 0 registered types and 0 default Kryo serializers
[flink-runner-job-invoker] ERROR
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during
job invocation beam_to_flink_cluster_f66fa319-20ac-41a8-ae78-576f795e6c41.
java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID:
7ade4a3688b2e671b42db146cff1c218)
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:864)
at
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator$BatchTranslationContext.execute(FlinkBatchPortablePipelineTranslator.java:194)
at
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:116)
at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:83)
at
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:83)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: 7ade4a3688b2e671b42db146cff1c218)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
... 3 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at sun.reflect.GeneratedMethodAccessor72.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The user defined 'open()' method caused an
exception: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
UNAVAILABLE: io exception
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:499)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
UNAVAILABLE: io exception
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:445)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:430)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:297)
at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:198)
at
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:137)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:495)
... 4 more
Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
UNAVAILABLE: io exception
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
at
org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.startWorker(BeamFnExternalWorkerPoolGrpc.java:237)
at
org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:116)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:246)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:230)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
... 16 more
Caused by:
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannel$AnnotatedConnectException:
Connection refused: localhost/127.0.0.1:35055
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:688)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
May 31, 2020 5:52:46 PM
com.google.api.client.googleapis.services.AbstractGoogleClient <init>
WARNING: Application name is not set. Call Builder#setApplicationName.
[flink-runner-job-invoker] INFO
org.apache.beam.runners.fnexecution.artifact.AbstractLegacyArtifactRetrievalService
- Manifest at
gs://mybucket/artifacts/job_294cb0e0-7316-485c-80a4-24ee2e7ce28c/MANIFEST has 1
artifact locations
[pool-15-thread-1] INFO org.apache.beam.sdk.extensions.gcp.util.GcsUtil -
Ignoring failed deletion of file
gs://mybucket/artifacts/job_294cb0e0-7316-485c-80a4-24ee2e7ce28c/artifacts/
which already does not exist:
{"code":404,"errors":[{"domain":"global","message":"No such object:
mybucket/artifacts/job_294cb0e0-7316-485c-80a4-24ee2e7ce28c/artifacts/","reason":"notFound"}],"message":"No
such object:
mybucket/artifacts/job_294cb0e0-7316-485c-80a4-24ee2e7ce28c/artifacts/"}
2020-05-31 20:54:05
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The user defined 'open()' method caused an
exception: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
UNAVAILABLE: io exception
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:499)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
UNAVAILABLE: io exception
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:445)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:430)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:297)
at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:198)
at
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:137)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:495)
... 4 more
Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
UNAVAILABLE: io exception
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
at
org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.startWorker(BeamFnExternalWorkerPoolGrpc.java:237)
at
org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:116)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:246)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:230)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
... 16 more
Caused by:
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannel$AnnotatedConnectException:
Connection refused: localhost/127.0.0.1:46513
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:688)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)