Re: Question about --environment_type argument

2019-05-30 Thread Chamikara Jayalath
I believe FileSystems are initialized for Python SDK here: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L98 For this to work, runner, at initialization, should set the pipeline options object, for example by invoking following function. https://github.com/ap

Re: Question about --environment_type argument

2019-05-29 Thread Robert Bradshaw
I was wondering if there was an error earlier in the logs, e.g. at startup, about this missing parameter (given that it was linked in, I'd assume it at least tried to load). As for the other question, if the Python worker is doing the read, then yes, it needs access to HDFS as well. On Wed, May 2

Re: Question about --environment_type argument

2019-05-29 Thread Maximilian Michels
The environment for your Python function also needs to configure access for HDFS. Unfortunately, this is a separate from Java. I haven't used HDFS with Python myself, but it looks like you have to configure these options in HadoopFileSystemOptions: -hdfs_host -hdfs_port -hdfs_user Now, I don'

Re: Question about --environment_type argument

2019-05-29 Thread 青雉(祁明良)
Was there any indication in the logs that the hadoop file system attempted to load but failed? Nope, same message “No filesystem found for scheme hdfs” when HADOOP_CONF_DIR not set. I guess I met the last problem. When I load input data from HDFS, the python sdk worker fails. It complains about

Re: Question about --environment_type argument

2019-05-29 Thread Robert Bradshaw
Glad you were able to figure it out! Agree the error message was suboptimal. Was there any indication in the logs that the hadoop file system attempted to load but failed? On Wed, May 29, 2019 at 4:41 AM 青雉(祁明良) wrote: > > Thanks guys, I got it. It was because Flink taskmanager docker missing >

Re: Question about --environment_type argument

2019-05-28 Thread 青雉(祁明良)
Thanks guys, I got it. It was because Flink taskmanager docker missing HADOOP_CONF_DIR environment. Maybe we could improve the error message in the future:) Best, Mingliang On 29 May 2019, at 3:12 AM, Lukasz Cwik mailto:lc...@google.com>> wrote: Are you losing the META-INF/ ServiceLoader entri

Re: Question about --environment_type argument

2019-05-28 Thread Lukasz Cwik
Are you losing the META-INF/ ServiceLoader entries related to binding the FileSystem via the FileSystemRegistrar when building the uber jar[1]? It does look like the Flink JobServer driver is registering the file systems[2]. 1: https://github.com/apache/beam/blob/95297dd82bd2fd3986900093cc1797c806

Re: Question about --environment_type argument

2019-05-28 Thread 青雉(祁明良)
Yes, I did (2). Since the job server successfully created the artifact directory, I think I did it correctly. And somehow this dependency is not submitted to task manager. Maybe I can also try out (1), but to add additional jar to flink classpath sounds not a perfect solution. 获取 Outlook for iOS

Re: Question about --environment_type argument

2019-05-28 Thread Maximilian Michels
Hi Mingliang, Oh I see. You will also have to add the Jars to the TaskManager then. You have these options: 1. Include them directly in the TaskManager classpath 2. Include them as dependencies to the JobServer, which will cause them to be attached to Flink's JobGraph. Do I understand correc

Re: Question about --environment_type argument

2019-05-28 Thread 青雉(祁明良)
Yes Max, I did add these Hadoop jars. The error message from task manager was about missing HDFS file system class from beam-sdks-java-io-hadoop-file-system module, which I also shadowed into job server. I see the artifact directory is successfully created at HDFS by job server, but fails at ta

Re: Question about --environment_type argument

2019-05-28 Thread Maximilian Michels
Recent versions of Flink do not bundle Hadoop anymore, but they are still "Hadoop compatible". You just need to include the Hadoop jars in the classpath. Beams's Hadoop does not bundle Hadoop either, it just provides Beam file system abstractions which are similar to Flink "Hadoop compatibilit

Re: Question about --environment_type argument

2019-05-28 Thread 青雉(祁明良)
Thanks Robert, I had one, “qmlmoon” Looks like I had the jobserver working now, I just add a shadow dependency of beam-sdks-java-io-hadoop-file-system to beam-runners-flink_2.11-job-server and rebuild the job server, but Flink taskmanger also complains about the same issue during job running.

Re: Question about --environment_type argument

2019-05-28 Thread Robert Bradshaw
On Tue, May 28, 2019 at 2:21 PM 青雉(祁明良) wrote: > > https://github.com/apache/beam/pull/8700 > Please help to create a JIRA and format the PR message. Thanks! I created https://issues.apache.org/jira/browse/BEAM-7441 . If you create an account I can add you as a contributor and assign you to that

Re: Question about --environment_type argument

2019-05-28 Thread 青雉(祁明良)
https://github.com/apache/beam/pull/8700 Please help to create a JIRA and format the PR message. Filesystems are registered using the java service provider interfaces. Here the HDFS filesystem needs to be built into the job server (or at least on the classpath when it's invoked). I tried to put

Re: Question about --environment_type argument

2019-05-28 Thread Robert Bradshaw
On Tue, May 28, 2019 at 1:18 PM 青雉(祁明良) wrote: > > I added some log to the beam code and found this. The error message is > definitely much clear but swallowed here > https://github.com/apache/beam/blob/release-2.12.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ar

Re: Question about --environment_type argument

2019-05-28 Thread 青雉(祁明良)
I added some log to the beam code and found this. The error message is definitely much clear but swallowed here https://github.com/apache/beam/blob/release-2.12.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java#L229 T

Re: Question about --environment_type argument

2019-05-28 Thread Robert Bradshaw
IIRC, the default artifact directory is local, not HDFS, which would of course not be readable on the workers. Good point about missing hdfs parameters on the job server. Looks like by default, it gets these from the environment? https://github.com/apache/beam/blob/release-2.12.0/sdks/java/io/hado

Re: Question about --environment_type argument

2019-05-28 Thread 青雉(祁明良)
Yes, it is built from release-2.12.0 branch. There was an NPE message at BeamFileSystemArtifactStagingService.java:239, but it shows only at the first submission. Plus, I wonder why there was --hdfs host / port / user argument for the python submission script, but not for the job server. If I l

Re: Question about --environment_type argument

2019-05-28 Thread Robert Bradshaw
Thanks for the report. Is this with 2.12.0? If so, https://github.com/apache/beam/blob/release-2.12.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java#L293 seems a strange place to get a NullPointerException. Is there per

Re: Question about --environment_type argument

2019-05-27 Thread 青雉(祁明良)
Hi Robert, When I set the —artifacts-dir to hdfs location, I got a NPE exception. The url is accessible via hadoop client. --- ./beam-runners-flink_2.11-job-server-shadow-2.12.0-SNAPSHOT/bin/beam-runners-flink_2.11-job-server --flink-master-url test-mqi-job1-hl:8081 --artifacts-dir

Re: Question about --environment_type argument

2019-05-27 Thread Robert Bradshaw
On Mon, May 27, 2019 at 2:24 PM 青雉(祁明良) wrote: > > Just now I try to use the PROCESS environment type, the Flink taskmanager > complains about "/tmp/beam-artifact-staging/job_xxx" not found. And I found > this directory is only created on the machine with beam job endpoint. I guess > maybe I sh

Re: Question about --environment_type argument

2019-05-27 Thread 青雉(祁明良)
Just now I try to use the PROCESS environment type, the Flink taskmanager complains about "/tmp/beam-artifact-staging/job_xxx" not found. And I found this directory is only created on the machine with beam job endpoint. I guess maybe I should set the artifact-dir to a hdfs location, but no luck

Re: Question about --environment_type argument

2019-05-27 Thread Maximilian Michels
That's a tough question because we haven't done extensive benchmarks. At an earlier stage we had determined a 5-10% overhead for the language portability. I can only say that this depends heavily on the type of work load. Also there are some optimizations with regards to state and timers which

Re: Question about --environment_type argument

2019-05-27 Thread 青雉(祁明良)
Thanks max, it is clear to me now. BTW, I would like to ask about the performance of python runner on Flink. As I remember, when Flink first introduce python support(maybe around 2015), it was 5-10 slower than scala. For now, what is the performance difference of scala / python with Beam on Fli

Re: Question about --environment_type argument

2019-05-27 Thread Maximilian Michels
Hi Mingliang, The environment is created for each TaskManager. For docker, will it create one docker per flink taskmanager? Yes. For process, does it mean start a python process to run the user code? And it seems "command" should be set in the environment config, but what should it

Question about --environment_type argument

2019-05-26 Thread 青雉(祁明良)
Hi All, I'm currently trying python portable runner with Flink. I see there are 3 kinds of environment_type available "docker/process/external(loopback)" when submit a job. But I didn't find any material explain more. 1. For docker, will it create one docker per flink taskmanager? 2. For