Sorry, didn't read closely.. LOOPBACK won't work if you're doing cross-language transforms. Instead, you can try using a distributed filesystem like GCS (since unfortunately there will be no easy local workaround until BEAM-5440 <https://issues.apache.org/jira/browse/BEAM-5440> is resolved).
On Mon, Sep 28, 2020 at 11:32 AM Kyle Weaver <[email protected]> wrote: > > This looks to me like an issue with artifact staging. It looks like the > worker is trying to start the apache/beam_java_sdk:2.24.0 environment, but > can't find the jar that we staged that contains the code for the Java > KafkaIO. > > Yeah. This kind of error most often happens when the job server and Beam > worker are assumed to share the same filesystem, but don't (cf [1]). For > debugging purposes, you might try to set `--environment_type=LOOPBACK`, > which would circumvent any Docker-related filesystem issues. > > [1] https://issues.apache.org/jira/browse/BEAM-5440 > > On Mon, Sep 28, 2020 at 10:43 AM Brian Hulette <[email protected]> > wrote: > >> This looks to me like an issue with artifact staging. It looks like the >> worker is trying to start the apache/beam_java_sdk:2.24.0 environment, but >> can't find the jar that we staged that contains the code for the Java >> KafkaIO. >> >> It looks like the example you're following was last updated last summer, >> and xlang has undergone a lot of development since then. I'd recommend >> trying to follow the example in the Beam repo instead [1]. It includes >> directions for running on Dataflow, but if you just pass >> --runner=FlinkRunner instead of DataflowRunner, it will start up a local >> flink cluster for you and run the pipeline on it. Then you can also omit >> all the GCP specific flags (--region, --num_workers, etc). >> >> Brian >> >> [1] >> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/kafkataxi >> >> On Mon, Sep 28, 2020 at 6:28 AM Carolyn Langen <[email protected]> >> wrote: >> >>> Greetings all, >>> >>> I am trying to get an example working which runs a simple Python-based >>> pipeline which should listen to Kafka through the Flink PortableRunner. >>> This is the repository I'm working from (forked from an older example, I'm >>> trying to update it to use more recent versions of Beam and Flink): >>> https://github.com/cdlangen/demo-beam-summit-2018. I run the example >>> with the following steps: >>> >>> cd docker >>> docker-compose up -d >>> docker run --net=host apache/beam_flink1.10_job_server:latest >>> --flink-master=localhost:8081 >>> python ../wordcount.py >>> >>> At some point I get an exception which seems to originate from >>> taskmanager. An excerpt of the logs: >>> taskmanager_1 | java.io.FileNotFoundException: >>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw >>> (No such file or directory) >>> taskmanager_1 | at java.io.FileInputStream.open0(Native Method) >>> taskmanager_1 | at >>> java.io.FileInputStream.open(FileInputStream.java:195) >>> taskmanager_1 | at >>> java.io.FileInputStream.<init>(FileInputStream.java:138) >>> taskmanager_1 | at >>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124) >>> taskmanager_1 | at >>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83) >>> taskmanager_1 | at >>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252) >>> taskmanager_1 | at >>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121) >>> taskmanager_1 | at >>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96) >>> taskmanager_1 | at >>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >>> taskmanager_1 | at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> taskmanager_1 | at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> taskmanager_1 | at java.lang.Thread.run(Thread.java:748) >>> taskmanager_1 | java.io.FileNotFoundException: >>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw >>> (No such file or directory) >>> taskmanager_1 | at java.io.FileInputStream.open0(Native Method) >>> taskmanager_1 | at >>> java.io.FileInputStream.open(FileInputStream.java:195) >>> taskmanager_1 | at >>> java.io.FileInputStream.<init>(FileInputStream.java:138) >>> taskmanager_1 | at >>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124) >>> taskmanager_1 | at >>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83) >>> taskmanager_1 | at >>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252) >>> taskmanager_1 | at >>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121) >>> taskmanager_1 | at >>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96) >>> taskmanager_1 | at >>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >>> taskmanager_1 | at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> taskmanager_1 | at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> taskmanager_1 | at java.lang.Thread.run(Thread.java:748) >>> taskmanager_1 | 2020-09-28 12:31:37,631 INFO >>> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory >>> - Still waiting for startup of environment apache/beam_java_sdk:2.24.0 for >>> worker id 1-4 >>> taskmanager_1 | java.io.FileNotFoundException: >>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw >>> (No such file or directory) >>> taskmanager_1 | at java.io.FileInputStream.open0(Native Method) >>> taskmanager_1 | at >>> java.io.FileInputStream.open(FileInputStream.java:195) >>> taskmanager_1 | at >>> java.io.FileInputStream.<init>(FileInputStream.java:138) >>> taskmanager_1 | at >>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124) >>> taskmanager_1 | at >>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83) >>> taskmanager_1 | at >>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252) >>> taskmanager_1 | at >>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121) >>> taskmanager_1 | at >>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96) >>> taskmanager_1 | at >>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >>> taskmanager_1 | at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> taskmanager_1 | at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> taskmanager_1 | at java.lang.Thread.run(Thread.java:748) >>> taskmanager_1 | java.io.FileNotFoundException: >>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw >>> (No such file or directory) >>> taskmanager_1 | at java.io.FileInputStream.open0(Native Method) >>> taskmanager_1 | at >>> java.io.FileInputStream.open(FileInputStream.java:195) >>> taskmanager_1 | at >>> java.io.FileInputStream.<init>(FileInputStream.java:138) >>> taskmanager_1 | at >>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124) >>> taskmanager_1 | at >>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83) >>> taskmanager_1 | at >>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252) >>> taskmanager_1 | at >>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121) >>> taskmanager_1 | at >>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96) >>> taskmanager_1 | at >>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >>> taskmanager_1 | at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >>> taskmanager_1 | at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> taskmanager_1 | at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> taskmanager_1 | at java.lang.Thread.run(Thread.java:748) >>> taskmanager_1 | 2020-09-28 12:31:42,688 INFO >>> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory >>> - Still waiting for startup of environment apache/beam_java_sdk:2.24.0 for >>> worker id 1-4 >>> >>> My best guess of what is happening is that the container is unable to >>> run apache/beam_java_sdk:2.24.0 (refer to the two lines which say "INFO >>> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory >>> - Still waiting for startup of environment apache/beam_java_sdk:2.24.0 for >>> worker id 1-4"), and as a result there is a FileNotFoundException related >>> to the Java SDK. >>> >>> What am I doing wrong? How can I fix this error? >>> >>> Best regards, >>> Carolyn >>> >>> >>> >>> >>> PS- apologies if there are duplicate posts. I haven't posted to this >>> user list before and wasn't sure if my message wasn't appearing on the list >>> because I wasn't subscribed yet. >>> >>
