I believe FileSystems are initialized for Python SDK here:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L98
For this to work, runner, at initialization, should set the pipeline
options object, for example by invoking following function.
https://github.com/ap
I was wondering if there was an error earlier in the logs, e.g. at startup,
about this missing parameter (given that it was linked in, I'd assume it at
least tried to load).
As for the other question, if the Python worker is doing the read, then
yes, it needs access to HDFS as well.
On Wed, May 2
The environment for your Python function also needs to configure access for
HDFS. Unfortunately, this is a separate from Java. I haven't used HDFS with
Python myself, but it looks like you have to configure these options in
HadoopFileSystemOptions:
-hdfs_host
-hdfs_port
-hdfs_user
Now, I don'
Was there any indication in
the logs that the hadoop file system attempted to load but failed?
Nope, same message “No filesystem found for scheme hdfs” when HADOOP_CONF_DIR
not set.
I guess I met the last problem. When I load input data from HDFS, the python
sdk worker fails. It complains about
Glad you were able to figure it out!
Agree the error message was suboptimal. Was there any indication in
the logs that the hadoop file system attempted to load but failed?
On Wed, May 29, 2019 at 4:41 AM 青雉(祁明良) wrote:
>
> Thanks guys, I got it. It was because Flink taskmanager docker missing
>
Thanks guys, I got it. It was because Flink taskmanager docker missing
HADOOP_CONF_DIR environment.
Maybe we could improve the error message in the future:)
Best,
Mingliang
On 29 May 2019, at 3:12 AM, Lukasz Cwik
mailto:lc...@google.com>> wrote:
Are you losing the META-INF/ ServiceLoader entri
Are you losing the META-INF/ ServiceLoader entries related to binding the
FileSystem via the FileSystemRegistrar when building the uber jar[1]?
It does look like the Flink JobServer driver is registering the file
systems[2].
1:
https://github.com/apache/beam/blob/95297dd82bd2fd3986900093cc1797c806
Yes, I did (2). Since the job server successfully
created the artifact directory, I think I did it correctly. And somehow this
dependency is not submitted to task manager.
Maybe I can also try out (1), but to add additional jar to flink classpath
sounds not a perfect solution.
获取 Outlook for iOS
Hi Mingliang,
Oh I see. You will also have to add the Jars to the TaskManager then.
You have these options:
1. Include them directly in the TaskManager classpath
2. Include them as dependencies to the JobServer, which will cause them
to be attached to Flink's JobGraph.
Do I understand correc
Yes Max, I did add these Hadoop jars. The error
message from task manager was about missing HDFS file system class from
beam-sdks-java-io-hadoop-file-system module, which I also shadowed into job
server.
I see the artifact directory is successfully created at HDFS by job server, but
fails at ta
Recent versions of Flink do not bundle Hadoop anymore, but they are
still "Hadoop compatible". You just need to include the Hadoop jars in
the classpath.
Beams's Hadoop does not bundle Hadoop either, it just provides Beam file
system abstractions which are similar to Flink "Hadoop compatibilit
Thanks Robert, I had one, “qmlmoon”
Looks like I had the jobserver working now, I just add a shadow dependency of
beam-sdks-java-io-hadoop-file-system to beam-runners-flink_2.11-job-server and
rebuild the job server, but Flink taskmanger also complains about the same
issue during job running.
On Tue, May 28, 2019 at 2:21 PM 青雉(祁明良) wrote:
>
> https://github.com/apache/beam/pull/8700
> Please help to create a JIRA and format the PR message.
Thanks! I created https://issues.apache.org/jira/browse/BEAM-7441 . If
you create an account I can add you as a contributor and assign you to
that
https://github.com/apache/beam/pull/8700
Please help to create a JIRA and format the PR message.
Filesystems are registered using the java service provider interfaces.
Here the HDFS filesystem needs to be built into the job server (or at
least on the classpath when it's invoked).
I tried to put
On Tue, May 28, 2019 at 1:18 PM 青雉(祁明良) wrote:
>
> I added some log to the beam code and found this. The error message is
> definitely much clear but swallowed here
> https://github.com/apache/beam/blob/release-2.12.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ar
I added some log to the beam code and found this. The error message is
definitely much clear but swallowed here
https://github.com/apache/beam/blob/release-2.12.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java#L229
T
IIRC, the default artifact directory is local, not HDFS, which would
of course not be readable on the workers.
Good point about missing hdfs parameters on the job server. Looks like
by default, it gets these from the environment?
https://github.com/apache/beam/blob/release-2.12.0/sdks/java/io/hado
Yes, it is built from release-2.12.0 branch. There was an NPE message at
BeamFileSystemArtifactStagingService.java:239, but it shows only at the first
submission.
Plus, I wonder why there was --hdfs host / port / user argument for the python
submission script, but not for the job server. If I l
Thanks for the report. Is this with 2.12.0? If so,
https://github.com/apache/beam/blob/release-2.12.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java#L293
seems a strange place to get a NullPointerException. Is there per
Hi Robert,
When I set the —artifacts-dir to hdfs location, I got a NPE exception. The url
is accessible via hadoop client.
---
./beam-runners-flink_2.11-job-server-shadow-2.12.0-SNAPSHOT/bin/beam-runners-flink_2.11-job-server
--flink-master-url test-mqi-job1-hl:8081 --artifacts-dir
On Mon, May 27, 2019 at 2:24 PM 青雉(祁明良) wrote:
>
> Just now I try to use the PROCESS environment type, the Flink taskmanager
> complains about "/tmp/beam-artifact-staging/job_xxx" not found. And I found
> this directory is only created on the machine with beam job endpoint. I guess
> maybe I sh
Just now I try to use the PROCESS environment type, the Flink taskmanager
complains about "/tmp/beam-artifact-staging/job_xxx" not found. And I found
this directory is only created on the machine with beam job endpoint. I guess
maybe I should set the artifact-dir to a hdfs location, but no luck
That's a tough question because we haven't done extensive benchmarks. At
an earlier stage we had determined a 5-10% overhead for the language
portability. I can only say that this depends heavily on the type of
work load. Also there are some optimizations with regards to state and
timers which
Thanks max, it is clear to me now.
BTW, I would like to ask about the performance of python runner on Flink. As I
remember, when Flink first introduce python support(maybe around 2015), it was
5-10 slower than scala. For now, what is the performance difference of scala /
python with Beam on Fli
Hi Mingliang,
The environment is created for each TaskManager.
For docker, will it create one docker per flink taskmanager?
Yes.
For process, does it mean start a python process to run the user code? And it seems
"command" should be set in the environment config, but what should it
Hi All,
I'm currently trying python portable runner with Flink. I see there are 3 kinds
of environment_type available "docker/process/external(loopback)" when submit a
job. But I didn't find any material explain more.
1. For docker, will it create one docker per flink taskmanager?
2. For
26 matches
Mail list logo