Sorry, didn't read closely.. LOOPBACK won't work if you're doing
cross-language transforms. Instead, you can try using a distributed
filesystem like GCS (since unfortunately there will be no easy local
workaround until BEAM-5440 <https://issues.apache.org/jira/browse/BEAM-5440> is
resolved).

On Mon, Sep 28, 2020 at 11:32 AM Kyle Weaver <[email protected]> wrote:

> > This looks to me like an issue with artifact staging. It looks like the
> worker is trying to start the apache/beam_java_sdk:2.24.0 environment, but
> can't find the jar that we staged that contains the code for the Java
> KafkaIO.
>
> Yeah. This kind of error most often happens when the job server and Beam
> worker are assumed to share the same filesystem, but don't (cf [1]). For
> debugging purposes, you might try to set `--environment_type=LOOPBACK`,
> which would circumvent any Docker-related filesystem issues.
>
> [1] https://issues.apache.org/jira/browse/BEAM-5440
>
> On Mon, Sep 28, 2020 at 10:43 AM Brian Hulette <[email protected]>
> wrote:
>
>> This looks to me like an issue with artifact staging. It looks like the
>> worker is trying to start the apache/beam_java_sdk:2.24.0 environment, but
>> can't find the jar that we staged that contains the code for the Java
>> KafkaIO.
>>
>> It looks like the example you're following was last updated last summer,
>> and xlang has undergone a lot of development since then. I'd recommend
>> trying to follow the example in the Beam repo instead [1]. It includes
>> directions for running on Dataflow, but if you just pass
>> --runner=FlinkRunner instead of DataflowRunner, it will start up a local
>> flink cluster for you and run the pipeline on it. Then you can also omit
>> all the GCP specific flags (--region, --num_workers, etc).
>>
>> Brian
>>
>> [1]
>> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/kafkataxi
>>
>> On Mon, Sep 28, 2020 at 6:28 AM Carolyn Langen <[email protected]>
>> wrote:
>>
>>> Greetings all,
>>>
>>> I am trying to get an example working which runs a simple Python-based
>>> pipeline which should listen to Kafka through the Flink PortableRunner.
>>> This is the repository I'm working from (forked from an older example, I'm
>>> trying to update it to use more recent versions of Beam and Flink):
>>> https://github.com/cdlangen/demo-beam-summit-2018. I run the example
>>> with the following steps:
>>>
>>> cd docker
>>> docker-compose up -d
>>> docker run --net=host apache/beam_flink1.10_job_server:latest
>>> --flink-master=localhost:8081
>>> python ../wordcount.py
>>>
>>> At some point I get an exception which seems to originate from
>>> taskmanager. An excerpt of the logs:
>>> taskmanager_1       | java.io.FileNotFoundException:
>>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw
>>> (No such file or directory)
>>> taskmanager_1       | at java.io.FileInputStream.open0(Native Method)
>>> taskmanager_1       | at
>>> java.io.FileInputStream.open(FileInputStream.java:195)
>>> taskmanager_1       | at
>>> java.io.FileInputStream.<init>(FileInputStream.java:138)
>>> taskmanager_1       | at
>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124)
>>> taskmanager_1       | at
>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83)
>>> taskmanager_1       | at
>>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
>>> taskmanager_1       | at
>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121)
>>> taskmanager_1       | at
>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96)
>>> taskmanager_1       | at
>>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>> taskmanager_1       | at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> taskmanager_1       | at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> taskmanager_1       | at java.lang.Thread.run(Thread.java:748)
>>> taskmanager_1       | java.io.FileNotFoundException:
>>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw
>>> (No such file or directory)
>>> taskmanager_1       | at java.io.FileInputStream.open0(Native Method)
>>> taskmanager_1       | at
>>> java.io.FileInputStream.open(FileInputStream.java:195)
>>> taskmanager_1       | at
>>> java.io.FileInputStream.<init>(FileInputStream.java:138)
>>> taskmanager_1       | at
>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124)
>>> taskmanager_1       | at
>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83)
>>> taskmanager_1       | at
>>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
>>> taskmanager_1       | at
>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121)
>>> taskmanager_1       | at
>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96)
>>> taskmanager_1       | at
>>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>> taskmanager_1       | at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> taskmanager_1       | at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> taskmanager_1       | at java.lang.Thread.run(Thread.java:748)
>>> taskmanager_1       | 2020-09-28 12:31:37,631 INFO
>>>  org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory
>>>  - Still waiting for startup of environment apache/beam_java_sdk:2.24.0 for
>>> worker id 1-4
>>> taskmanager_1       | java.io.FileNotFoundException:
>>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw
>>> (No such file or directory)
>>> taskmanager_1       | at java.io.FileInputStream.open0(Native Method)
>>> taskmanager_1       | at
>>> java.io.FileInputStream.open(FileInputStream.java:195)
>>> taskmanager_1       | at
>>> java.io.FileInputStream.<init>(FileInputStream.java:138)
>>> taskmanager_1       | at
>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124)
>>> taskmanager_1       | at
>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83)
>>> taskmanager_1       | at
>>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
>>> taskmanager_1       | at
>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121)
>>> taskmanager_1       | at
>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96)
>>> taskmanager_1       | at
>>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>> taskmanager_1       | at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> taskmanager_1       | at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> taskmanager_1       | at java.lang.Thread.run(Thread.java:748)
>>> taskmanager_1       | java.io.FileNotFoundException:
>>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw
>>> (No such file or directory)
>>> taskmanager_1       | at java.io.FileInputStream.open0(Native Method)
>>> taskmanager_1       | at
>>> java.io.FileInputStream.open(FileInputStream.java:195)
>>> taskmanager_1       | at
>>> java.io.FileInputStream.<init>(FileInputStream.java:138)
>>> taskmanager_1       | at
>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124)
>>> taskmanager_1       | at
>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83)
>>> taskmanager_1       | at
>>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
>>> taskmanager_1       | at
>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121)
>>> taskmanager_1       | at
>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96)
>>> taskmanager_1       | at
>>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>> taskmanager_1       | at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>> taskmanager_1       | at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> taskmanager_1       | at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> taskmanager_1       | at java.lang.Thread.run(Thread.java:748)
>>> taskmanager_1       | 2020-09-28 12:31:42,688 INFO
>>>  org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory
>>>  - Still waiting for startup of environment apache/beam_java_sdk:2.24.0 for
>>> worker id 1-4
>>>
>>> My best guess of what is happening is that the container is unable to
>>> run apache/beam_java_sdk:2.24.0 (refer to the two lines which say "INFO
>>>  org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory
>>>  - Still waiting for startup of environment apache/beam_java_sdk:2.24.0 for
>>> worker id 1-4"), and as a result there is a FileNotFoundException related
>>> to the Java SDK.
>>>
>>> What am I doing wrong? How can I fix this error?
>>>
>>> Best regards,
>>> Carolyn
>>>
>>>
>>>
>>>
>>> PS- apologies if there are duplicate posts. I haven't posted to this
>>> user list before and wasn't sure if my message wasn't appearing on the list
>>> because I wasn't subscribed yet.
>>>
>>

Reply via email to