> Is there any way of doing this with HDFS, or something else I can > setup and test without relying on Google services?
Yes, you should be able to use any Beam supported file system. On Mon, Oct 5, 2020 at 6:45 AM Carolyn Langen <[email protected]> wrote: > Thanks Kyle, > > Is there any way of doing this with HDFS, or something else I can > setup and test without relying on Google services? > > Met vriendelijke groet / Kind regards, > Carolyn > > Met vriendelijke groet / Kind regards, > > > > > Carolyn Langen > > Research Engineer > > > Stationsplein 45 | 3013 AK | Rotterdam > > T. +31 (0)6 2445 1380 > > www.almende.com > > LinkedIn > > > > On Tue, Sep 29, 2020 at 7:39 PM Kyle Weaver <[email protected]> wrote: > > > > > @Kyle, BEAM-5440 mentions a workaround ("For local testing, users may > want to mount a host directory."). Do you happen to know how I might do > this? > > > > Unfortunately, that workaround ("For local testing, users may want to > mount a host directory.") is the target of that ticket. It would require a > change in Beam that still hasn't been merged. > > > > The best way is probably to set the artifact staging directory to point > to a distributed file system instead of a local file system. By setting > `--artifacts-dir=gs://<your bucket>/<your directory>` in when starting the > flink_job_server. > > > > On Tue, Sep 29, 2020 at 2:17 AM Carolyn Langen <[email protected]> > wrote: > >> > >> Hi both, > >> > >> @Brian, I've tried to adapt the example, but it didn't work. Similar to > this thread, I get java.lang.UnsupportedOperationException. > >> > >> @Kyle, BEAM-5440 mentions a workaround ("For local testing, users may > want to mount a host directory."). Do you happen to know how I might do > this? > >> > >> For context: The main reason that I want to get a Kafka example working > is to be able to develop an MQTT xlang io transform. It seems to me that a > working unbounded source example is a prerequisite to that. I could > potentially use the google pubsub example as a starting point since that > seems to be working right now (right? I haven't tried it yet since I > haven't worked with Dataflow yet). Will any of the issues with xlang kafka > also be an issue when writing an MQTT transform? > >> > >> > >> On Mon, Sep 28, 2020 at 8:34 PM Kyle Weaver <[email protected]> > wrote: > >>> > >>> Sorry, didn't read closely.. LOOPBACK won't work if you're doing > cross-language transforms. Instead, you can try using a distributed > filesystem like GCS (since unfortunately there will be no easy local > workaround until BEAM-5440 is resolved). > >>> > >>> On Mon, Sep 28, 2020 at 11:32 AM Kyle Weaver <[email protected]> > wrote: > >>>> > >>>> > This looks to me like an issue with artifact staging. It looks like > the worker is trying to start the apache/beam_java_sdk:2.24.0 environment, > but can't find the jar that we staged that contains the code for the Java > KafkaIO. > >>>> > >>>> Yeah. This kind of error most often happens when the job server and > Beam worker are assumed to share the same filesystem, but don't (cf [1]). > For debugging purposes, you might try to set `--environment_type=LOOPBACK`, > which would circumvent any Docker-related filesystem issues. > >>>> > >>>> [1] https://issues.apache.org/jira/browse/BEAM-5440 > >>>> > >>>> On Mon, Sep 28, 2020 at 10:43 AM Brian Hulette <[email protected]> > wrote: > >>>>> > >>>>> This looks to me like an issue with artifact staging. It looks like > the worker is trying to start the apache/beam_java_sdk:2.24.0 environment, > but can't find the jar that we staged that contains the code for the Java > KafkaIO. > >>>>> > >>>>> It looks like the example you're following was last updated last > summer, and xlang has undergone a lot of development since then. I'd > recommend trying to follow the example in the Beam repo instead [1]. It > includes directions for running on Dataflow, but if you just pass > --runner=FlinkRunner instead of DataflowRunner, it will start up a local > flink cluster for you and run the pipeline on it. Then you can also omit > all the GCP specific flags (--region, --num_workers, etc). > >>>>> > >>>>> Brian > >>>>> > >>>>> [1] > https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/kafkataxi > >>>>> > >>>>> On Mon, Sep 28, 2020 at 6:28 AM Carolyn Langen <[email protected]> > wrote: > >>>>>> > >>>>>> Greetings all, > >>>>>> > >>>>>> I am trying to get an example working which runs a simple > Python-based pipeline which should listen to Kafka through the Flink > PortableRunner. This is the repository I'm working from (forked from an > older example, I'm trying to update it to use more recent versions of Beam > and Flink): https://github.com/cdlangen/demo-beam-summit-2018. I run the > example with the following steps: > >>>>>> > >>>>>> cd docker > >>>>>> docker-compose up -d > >>>>>> docker run --net=host apache/beam_flink1.10_job_server:latest > --flink-master=localhost:8081 > >>>>>> python ../wordcount.py > >>>>>> > >>>>>> At some point I get an exception which seems to originate from > taskmanager. An excerpt of the logs: > >>>>>> taskmanager_1 | java.io.FileNotFoundException: > /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw > (No such file or directory) > >>>>>> taskmanager_1 | at java.io.FileInputStream.open0(Native > Method) > >>>>>> taskmanager_1 | at > java.io.FileInputStream.open(FileInputStream.java:195) > >>>>>> taskmanager_1 | at > java.io.FileInputStream.<init>(FileInputStream.java:138) > >>>>>> taskmanager_1 | at org.apache.beam.sdk.io > .LocalFileSystem.open(LocalFileSystem.java:124) > >>>>>> taskmanager_1 | at org.apache.beam.sdk.io > .LocalFileSystem.open(LocalFileSystem.java:83) > >>>>>> taskmanager_1 | at org.apache.beam.sdk.io > .FileSystems.open(FileSystems.java:252) > >>>>>> taskmanager_1 | at > org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121) > >>>>>> taskmanager_1 | at > org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96) > >>>>>> taskmanager_1 | at > org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > >>>>>> taskmanager_1 | at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > >>>>>> taskmanager_1 | at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > >>>>>> taskmanager_1 | at java.lang.Thread.run(Thread.java:748) > >>>>>> taskmanager_1 | java.io.FileNotFoundException: > /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw > (No such file or directory) > >>>>>> taskmanager_1 | at java.io.FileInputStream.open0(Native > Method) > >>>>>> taskmanager_1 | at > java.io.FileInputStream.open(FileInputStream.java:195) > >>>>>> taskmanager_1 | at > java.io.FileInputStream.<init>(FileInputStream.java:138) > >>>>>> taskmanager_1 | at org.apache.beam.sdk.io > .LocalFileSystem.open(LocalFileSystem.java:124) > >>>>>> taskmanager_1 | at org.apache.beam.sdk.io > .LocalFileSystem.open(LocalFileSystem.java:83) > >>>>>> taskmanager_1 | at org.apache.beam.sdk.io > .FileSystems.open(FileSystems.java:252) > >>>>>> taskmanager_1 | at > org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121) > >>>>>> taskmanager_1 | at > org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96) > >>>>>> taskmanager_1 | at > org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > >>>>>> taskmanager_1 | at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > >>>>>> taskmanager_1 | at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > >>>>>> taskmanager_1 | at java.lang.Thread.run(Thread.java:748) > >>>>>> taskmanager_1 | 2020-09-28 12:31:37,631 INFO > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory - > Still waiting for startup of environment apache/beam_java_sdk:2.24.0 for > worker id 1-4 > >>>>>> taskmanager_1 | java.io.FileNotFoundException: > /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw > (No such file or directory) > >>>>>> taskmanager_1 | at java.io.FileInputStream.open0(Native > Method) > >>>>>> taskmanager_1 | at > java.io.FileInputStream.open(FileInputStream.java:195) > >>>>>> taskmanager_1 | at > java.io.FileInputStream.<init>(FileInputStream.java:138) > >>>>>> taskmanager_1 | at org.apache.beam.sdk.io > .LocalFileSystem.open(LocalFileSystem.java:124) > >>>>>> taskmanager_1 | at org.apache.beam.sdk.io > .LocalFileSystem.open(LocalFileSystem.java:83) > >>>>>> taskmanager_1 | at org.apache.beam.sdk.io > .FileSystems.open(FileSystems.java:252) > >>>>>> taskmanager_1 | at > org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121) > >>>>>> taskmanager_1 | at > org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96) > >>>>>> taskmanager_1 | at > org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > >>>>>> taskmanager_1 | at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > >>>>>> taskmanager_1 | at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > >>>>>> taskmanager_1 | at java.lang.Thread.run(Thread.java:748) > >>>>>> taskmanager_1 | java.io.FileNotFoundException: > /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw > (No such file or directory) > >>>>>> taskmanager_1 | at java.io.FileInputStream.open0(Native > Method) > >>>>>> taskmanager_1 | at > java.io.FileInputStream.open(FileInputStream.java:195) > >>>>>> taskmanager_1 | at > java.io.FileInputStream.<init>(FileInputStream.java:138) > >>>>>> taskmanager_1 | at org.apache.beam.sdk.io > .LocalFileSystem.open(LocalFileSystem.java:124) > >>>>>> taskmanager_1 | at org.apache.beam.sdk.io > .LocalFileSystem.open(LocalFileSystem.java:83) > >>>>>> taskmanager_1 | at org.apache.beam.sdk.io > .FileSystems.open(FileSystems.java:252) > >>>>>> taskmanager_1 | at > org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121) > >>>>>> taskmanager_1 | at > org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96) > >>>>>> taskmanager_1 | at > org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > >>>>>> taskmanager_1 | at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > >>>>>> taskmanager_1 | at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > >>>>>> taskmanager_1 | at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > >>>>>> taskmanager_1 | at java.lang.Thread.run(Thread.java:748) > >>>>>> taskmanager_1 | 2020-09-28 12:31:42,688 INFO > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory - > Still waiting for startup of environment apache/beam_java_sdk:2.24.0 for > worker id 1-4 > >>>>>> > >>>>>> My best guess of what is happening is that the container is unable > to run apache/beam_java_sdk:2.24.0 (refer to the two lines which say "INFO > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory - > Still waiting for startup of environment apache/beam_java_sdk:2.24.0 for > worker id 1-4"), and as a result there is a FileNotFoundException related > to the Java SDK. > >>>>>> > >>>>>> What am I doing wrong? How can I fix this error? > >>>>>> > >>>>>> Best regards, > >>>>>> Carolyn > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> PS- apologies if there are duplicate posts. I haven't posted to > this user list before and wasn't sure if my message wasn't appearing on the > list because I wasn't subscribed yet. >
