Thanks Kyle,

Is there any way of doing this with HDFS, or something else I can
setup and test without relying on Google services?

Met vriendelijke groet / Kind regards,
Carolyn

Met vriendelijke groet / Kind regards,




Carolyn Langen

Research Engineer


Stationsplein 45 | 3013 AK | Rotterdam

T. +31 (0)6 2445 1380

www.almende.com

LinkedIn



On Tue, Sep 29, 2020 at 7:39 PM Kyle Weaver <[email protected]> wrote:
>
> > @Kyle, BEAM-5440 mentions a workaround ("For local testing, users may want 
> > to mount a host directory."). Do you happen to know how I might do this?
>
> Unfortunately, that workaround ("For local testing, users may want to mount a 
> host directory.") is the target of that ticket. It would require a change in 
> Beam that still hasn't been merged.
>
> The best way is probably to set the artifact staging directory to point to a 
> distributed file system instead of a local file system. By setting 
> `--artifacts-dir=gs://<your bucket>/<your directory>` in when starting the 
> flink_job_server.
>
> On Tue, Sep 29, 2020 at 2:17 AM Carolyn Langen <[email protected]> wrote:
>>
>> Hi both,
>>
>> @Brian, I've tried to adapt the example, but it didn't work. Similar to this 
>> thread, I get java.lang.UnsupportedOperationException.
>>
>> @Kyle, BEAM-5440 mentions a workaround ("For local testing, users may want 
>> to mount a host directory."). Do you happen to know how I might do this?
>>
>> For context: The main reason that I want to get a Kafka example working is 
>> to be able to develop an MQTT xlang io transform. It seems to me that a 
>> working unbounded source example is a prerequisite to that. I could 
>> potentially use the google pubsub example as a starting point since that 
>> seems to be working right now (right? I haven't tried it yet since I haven't 
>> worked with Dataflow yet). Will any of the issues with xlang kafka also be 
>> an issue when writing an MQTT transform?
>>
>>
>> On Mon, Sep 28, 2020 at 8:34 PM Kyle Weaver <[email protected]> wrote:
>>>
>>> Sorry, didn't read closely.. LOOPBACK won't work if you're doing 
>>> cross-language transforms. Instead, you can try using a distributed 
>>> filesystem like GCS (since unfortunately there will be no easy local 
>>> workaround until BEAM-5440 is resolved).
>>>
>>> On Mon, Sep 28, 2020 at 11:32 AM Kyle Weaver <[email protected]> wrote:
>>>>
>>>> > This looks to me like an issue with artifact staging. It looks like the 
>>>> > worker is trying to start the apache/beam_java_sdk:2.24.0 environment, 
>>>> > but can't find the jar that we staged that contains the code for the 
>>>> > Java KafkaIO.
>>>>
>>>> Yeah. This kind of error most often happens when the job server and Beam 
>>>> worker are assumed to share the same filesystem, but don't (cf [1]). For 
>>>> debugging purposes, you might try to set `--environment_type=LOOPBACK`, 
>>>> which would circumvent any Docker-related filesystem issues.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/BEAM-5440
>>>>
>>>> On Mon, Sep 28, 2020 at 10:43 AM Brian Hulette <[email protected]> wrote:
>>>>>
>>>>> This looks to me like an issue with artifact staging. It looks like the 
>>>>> worker is trying to start the apache/beam_java_sdk:2.24.0 environment, 
>>>>> but can't find the jar that we staged that contains the code for the Java 
>>>>> KafkaIO.
>>>>>
>>>>> It looks like the example you're following was last updated last summer, 
>>>>> and xlang has undergone a lot of development since then. I'd recommend 
>>>>> trying to follow the example in the Beam repo instead [1]. It includes 
>>>>> directions for running on Dataflow, but if you just pass 
>>>>> --runner=FlinkRunner instead of DataflowRunner, it will start up a local 
>>>>> flink cluster for you and run the pipeline on it. Then you can also omit 
>>>>> all the GCP specific flags (--region, --num_workers, etc).
>>>>>
>>>>> Brian
>>>>>
>>>>> [1] 
>>>>> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/kafkataxi
>>>>>
>>>>> On Mon, Sep 28, 2020 at 6:28 AM Carolyn Langen <[email protected]> 
>>>>> wrote:
>>>>>>
>>>>>> Greetings all,
>>>>>>
>>>>>> I am trying to get an example working which runs a simple Python-based 
>>>>>> pipeline which should listen to Kafka through the Flink PortableRunner. 
>>>>>> This is the repository I'm working from (forked from an older example, 
>>>>>> I'm trying to update it to use more recent versions of Beam and Flink): 
>>>>>> https://github.com/cdlangen/demo-beam-summit-2018. I run the example 
>>>>>> with the following steps:
>>>>>>
>>>>>> cd docker
>>>>>> docker-compose up -d
>>>>>> docker run --net=host apache/beam_flink1.10_job_server:latest 
>>>>>> --flink-master=localhost:8081
>>>>>> python ../wordcount.py
>>>>>>
>>>>>> At some point I get an exception which seems to originate from 
>>>>>> taskmanager. An excerpt of the logs:
>>>>>> taskmanager_1       | java.io.FileNotFoundException: 
>>>>>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw
>>>>>>  (No such file or directory)
>>>>>> taskmanager_1       | at java.io.FileInputStream.open0(Native Method)
>>>>>> taskmanager_1       | at 
>>>>>> java.io.FileInputStream.open(FileInputStream.java:195)
>>>>>> taskmanager_1       | at 
>>>>>> java.io.FileInputStream.<init>(FileInputStream.java:138)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>>>>> taskmanager_1       | at 
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>> taskmanager_1       | at 
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>> taskmanager_1       | at java.lang.Thread.run(Thread.java:748)
>>>>>> taskmanager_1       | java.io.FileNotFoundException: 
>>>>>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw
>>>>>>  (No such file or directory)
>>>>>> taskmanager_1       | at java.io.FileInputStream.open0(Native Method)
>>>>>> taskmanager_1       | at 
>>>>>> java.io.FileInputStream.open(FileInputStream.java:195)
>>>>>> taskmanager_1       | at 
>>>>>> java.io.FileInputStream.<init>(FileInputStream.java:138)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>>>>> taskmanager_1       | at 
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>> taskmanager_1       | at 
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>> taskmanager_1       | at java.lang.Thread.run(Thread.java:748)
>>>>>> taskmanager_1       | 2020-09-28 12:31:37,631 INFO  
>>>>>> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory 
>>>>>>  - Still waiting for startup of environment apache/beam_java_sdk:2.24.0 
>>>>>> for worker id 1-4
>>>>>> taskmanager_1       | java.io.FileNotFoundException: 
>>>>>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw
>>>>>>  (No such file or directory)
>>>>>> taskmanager_1       | at java.io.FileInputStream.open0(Native Method)
>>>>>> taskmanager_1       | at 
>>>>>> java.io.FileInputStream.open(FileInputStream.java:195)
>>>>>> taskmanager_1       | at 
>>>>>> java.io.FileInputStream.<init>(FileInputStream.java:138)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>>>>> taskmanager_1       | at 
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>> taskmanager_1       | at 
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>> taskmanager_1       | at java.lang.Thread.run(Thread.java:748)
>>>>>> taskmanager_1       | java.io.FileNotFoundException: 
>>>>>> /tmp/beam-artifact-staging/ea9f367a84774c16c071a73e1e20b0e388d272444db1e0bf7dc893dcd74361ad/1-external_1beam:env:docker-beam-sdks-java-io-expansion-service-2.24.0-nAJcoAXzVni_-O7VAaHy6713XSUPw
>>>>>>  (No such file or directory)
>>>>>> taskmanager_1       | at java.io.FileInputStream.open0(Native Method)
>>>>>> taskmanager_1       | at 
>>>>>> java.io.FileInputStream.open(FileInputStream.java:195)
>>>>>> taskmanager_1       | at 
>>>>>> java.io.FileInputStream.<init>(FileInputStream.java:138)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:124)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:83)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>>>>> taskmanager_1       | at 
>>>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>>>>> taskmanager_1       | at 
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>> taskmanager_1       | at 
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>> taskmanager_1       | at java.lang.Thread.run(Thread.java:748)
>>>>>> taskmanager_1       | 2020-09-28 12:31:42,688 INFO  
>>>>>> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory 
>>>>>>  - Still waiting for startup of environment apache/beam_java_sdk:2.24.0 
>>>>>> for worker id 1-4
>>>>>>
>>>>>> My best guess of what is happening is that the container is unable to 
>>>>>> run apache/beam_java_sdk:2.24.0 (refer to the two lines which say "INFO  
>>>>>> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory 
>>>>>>  - Still waiting for startup of environment apache/beam_java_sdk:2.24.0 
>>>>>> for worker id 1-4"), and as a result there is a FileNotFoundException 
>>>>>> related to the Java SDK.
>>>>>>
>>>>>> What am I doing wrong? How can I fix this error?
>>>>>>
>>>>>> Best regards,
>>>>>> Carolyn
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> PS- apologies if there are duplicate posts. I haven't posted to this 
>>>>>> user list before and wasn't sure if my message wasn't appearing on the 
>>>>>> list because I wasn't subscribed yet.

Reply via email to