Hi both, @Brian, I've tried to adapt the example, but it didn't work. Similar to this thread <http://mail-archives.apache.org/mod_mbox/beam-user/202007.mbox/%3CCAOzzzuMtxH+NEbjvaEeALOOdvk2PB=gorrbhx0dqgjurvou...@mail.gmail.com%3E>, I get java.lang.UnsupportedOperationException.
@Kyle, BEAM-5440 <https://issues.apache.org/jira/browse/BEAM-5440> mentions a workaround ("For local testing, users may want to mount a host directory."). Do you happen to know how I might do this? For context: The main reason that I want to get a Kafka example working is to be able to develop an MQTT xlang io transform <https://lists.apache.org/thread.html/r98681059ad34216a2d9243a25306e7063370a080f34cf88741a85384%40%3Cdev.beam.apache.org%3E>. It seems to me that a working unbounded source example is a prerequisite to that. I could potentially use the google pubsub example as a starting point since that seems to be working right now (right? I haven't tried it yet since I haven't worked with Dataflow yet). Will any of the issues with xlang kafka also be an issue when writing an MQTT transform? On Mon, Sep 28, 2020 at 8:34 PM Kyle Weaver <[email protected]> wrote: > Sorry, didn't read closely.. LOOPBACK won't work if you're doing > cross-language transforms. Instead, you can try using a distributed > filesystem like GCS (since unfortunately there will be no easy local > workaround until BEAM-5440 > <https://issues.apache.org/jira/browse/BEAM-5440> is resolved). > > On Mon, Sep 28, 2020 at 11:32 AM Kyle Weaver <[email protected]> wrote: > >> > This looks to me like an issue with artifact staging. It looks like the >> worker is trying to start the apache/beam_java_sdk:2.24.0 environment, but >> can't find the jar that we staged that contains the code for the Java >> KafkaIO. >> >> Yeah. This kind of error most often happens when the job server and Beam >> worker are assumed to share the same filesystem, but don't (cf [1]). For >> debugging purposes, you might try to set `--environment_type=LOOPBACK`, >> which would circumvent any Docker-related filesystem issues. >> >> [1] https://issues.apache.org/jira/browse/BEAM-5440 >> >> On Mon, Sep 28, 2020 at 10:43 AM Brian Hulette <[email protected]> >> wrote: >> >>> This looks to me like an issue with artifact staging. It looks like the >>> worker is trying to start the apache/beam_java_sdk:2.24.0 environment, but >>> can't find the jar that we staged that contains the code for the Java >>> KafkaIO. >>> >>> It looks like the example you're following was last updated last summer, >>> and xlang has undergone a lot of development since then. I'd recommend >>> trying to follow the example in the Beam repo instead [1]. It includes >>> directions for running on Dataflow, but if you just pass >>> --runner=FlinkRunner instead of DataflowRunner, it will start up a local >>> flink cluster for you and run the pipeline on it. Then you can also omit >>> all the GCP specific flags (--region, --num_workers, etc). >>> >>> Brian >>> >>> [1] >>> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/kafkataxi >>> >>> On Mon, Sep 28, 2020 at 6:28 AM Carolyn Langen <[email protected]> >>> wrote: >>> >>>> Greetings all, >>>> >>>> I am trying to get an example working which runs a simple Python-based >>>> pipeline which should listen to Kafka through the Flink PortableRunner. >>>> This is the repository I'm working from (forked from an older example, I'm >>>> trying to update it to use more recent versions of Beam and Flink): >>>> https://github.com/cdlangen/demo-beam-summit-2018. I run the example >>>> with the following steps: >>>> >>>> cd docker >>>> docker-compose up -d >>>> docker run --net=host apache/beam_flink1.10_job_server:latest >>>> --flink-master=localhost:8081 >>>> python ../wordcount.py >>>> >>>> At some point I get an exception which seems to originate from >>>> taskmanager. An excerpt of the logs: >>>> taskmanager_1 | java.io.FileNotFoundException: >>>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw >>>> (No such file or directory) >>>> taskmanager_1 | at java.io.FileInputStream.open0(Native Method) >>>> taskmanager_1 | at >>>> java.io.FileInputStream.open(FileInputStream.java:195) >>>> taskmanager_1 | at >>>> java.io.FileInputStream.<init>(FileInputStream.java:138) >>>> taskmanager_1 | at >>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124) >>>> taskmanager_1 | at >>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83) >>>> taskmanager_1 | at >>>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252) >>>> taskmanager_1 | at >>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121) >>>> taskmanager_1 | at >>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96) >>>> taskmanager_1 | at >>>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >>>> taskmanager_1 | at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> taskmanager_1 | at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> taskmanager_1 | at java.lang.Thread.run(Thread.java:748) >>>> taskmanager_1 | java.io.FileNotFoundException: >>>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw >>>> (No such file or directory) >>>> taskmanager_1 | at java.io.FileInputStream.open0(Native Method) >>>> taskmanager_1 | at >>>> java.io.FileInputStream.open(FileInputStream.java:195) >>>> taskmanager_1 | at >>>> java.io.FileInputStream.<init>(FileInputStream.java:138) >>>> taskmanager_1 | at >>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124) >>>> taskmanager_1 | at >>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83) >>>> taskmanager_1 | at >>>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252) >>>> taskmanager_1 | at >>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121) >>>> taskmanager_1 | at >>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96) >>>> taskmanager_1 | at >>>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >>>> taskmanager_1 | at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> taskmanager_1 | at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> taskmanager_1 | at java.lang.Thread.run(Thread.java:748) >>>> taskmanager_1 | 2020-09-28 12:31:37,631 INFO >>>> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory >>>> - Still waiting for startup of environment apache/beam_java_sdk:2.24.0 for >>>> worker id 1-4 >>>> taskmanager_1 | java.io.FileNotFoundException: >>>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw >>>> (No such file or directory) >>>> taskmanager_1 | at java.io.FileInputStream.open0(Native Method) >>>> taskmanager_1 | at >>>> java.io.FileInputStream.open(FileInputStream.java:195) >>>> taskmanager_1 | at >>>> java.io.FileInputStream.<init>(FileInputStream.java:138) >>>> taskmanager_1 | at >>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124) >>>> taskmanager_1 | at >>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83) >>>> taskmanager_1 | at >>>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252) >>>> taskmanager_1 | at >>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121) >>>> taskmanager_1 | at >>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96) >>>> taskmanager_1 | at >>>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >>>> taskmanager_1 | at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> taskmanager_1 | at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> taskmanager_1 | at java.lang.Thread.run(Thread.java:748) >>>> taskmanager_1 | java.io.FileNotFoundException: >>>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw >>>> (No such file or directory) >>>> taskmanager_1 | at java.io.FileInputStream.open0(Native Method) >>>> taskmanager_1 | at >>>> java.io.FileInputStream.open(FileInputStream.java:195) >>>> taskmanager_1 | at >>>> java.io.FileInputStream.<init>(FileInputStream.java:138) >>>> taskmanager_1 | at >>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124) >>>> taskmanager_1 | at >>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83) >>>> taskmanager_1 | at >>>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252) >>>> taskmanager_1 | at >>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121) >>>> taskmanager_1 | at >>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96) >>>> taskmanager_1 | at >>>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >>>> taskmanager_1 | at >>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >>>> taskmanager_1 | at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> taskmanager_1 | at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> taskmanager_1 | at java.lang.Thread.run(Thread.java:748) >>>> taskmanager_1 | 2020-09-28 12:31:42,688 INFO >>>> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory >>>> - Still waiting for startup of environment apache/beam_java_sdk:2.24.0 for >>>> worker id 1-4 >>>> >>>> My best guess of what is happening is that the container is unable to >>>> run apache/beam_java_sdk:2.24.0 (refer to the two lines which say "INFO >>>> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory >>>> - Still waiting for startup of environment apache/beam_java_sdk:2.24.0 for >>>> worker id 1-4"), and as a result there is a FileNotFoundException related >>>> to the Java SDK. >>>> >>>> What am I doing wrong? How can I fix this error? >>>> >>>> Best regards, >>>> Carolyn >>>> >>>> >>>> >>>> >>>> PS- apologies if there are duplicate posts. I haven't posted to this >>>> user list before and wasn't sure if my message wasn't appearing on the list >>>> because I wasn't subscribed yet. >>>> >>>
