I was wondering if there was an error earlier in the logs, e.g. at startup, about this missing parameter (given that it was linked in, I'd assume it at least tried to load).
As for the other question, if the Python worker is doing the read, then yes, it needs access to HDFS as well. On Wed, May 29, 2019 at 1:54 PM 青雉(祁明良) <[email protected]> wrote: > Was there any indication in > the logs that the hadoop file system attempted to load but failed? > > Nope, same message “No filesystem found for scheme hdfs” when > HADOOP_CONF_DIR not set. > > I guess I met the last problem. When I load input data from HDFS, the > python sdk worker fails. It complains about pipeline_options of > hadoopfilesystem.py is empty. I thought that HDFS is only accessed by Flink > and data is then serialized from JVM to python sdk worker, does the python > sdk worker also needs to access HDFS? > > Submission script > -------- > python word_count.py --input hdfs://algo-emr/k8s_flink/LICENSE.txt > --output out --runner=PortableRunner --job_endpoint=localhost:8099 > --environment_type PROCESS --environment_config > "{\"command\":\"/opt/apache/beam/boot\"}" --hdfs_host 10.53.48.6 > --hdfs_port 4008 --hdfs_user data > > Error log > --------- > Caused by: java.lang.RuntimeException: Error received from SDK harness for > instruction 3: Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", > line 157, in _execute > response = task() > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", > line 190, in <lambda> > self._execute(lambda: worker.do_instruction(work), work) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", > line 312, in do_instruction > request.instruction_id) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", > line 331, in process_bundle > bundle_processor.process_bundle(instruction_id)) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", > line 554, in process_bundle > ].process_encoded(data.data) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", > line 140, in process_encoded > self.output(decoded_value) > File "apache_beam/runners/worker/operations.py", line 245, in > apache_beam.runners.worker.operations.Operation.output > def output(self, windowed_value, output_index=0): > File "apache_beam/runners/worker/operations.py", line 246, in > apache_beam.runners.worker.operations.Operation.output > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "apache_beam/runners/worker/operations.py", line 142, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive > self.consumer.process(windowed_value) > File "apache_beam/runners/worker/operations.py", line 560, in > apache_beam.runners.worker.operations.DoOperation.process > with self.scoped_process_state: > File "apache_beam/runners/worker/operations.py", line 561, in > apache_beam.runners.worker.operations.DoOperation.process > delayed_application = self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 740, in > apache_beam.runners.common.DoFnRunner.receive > self.process(windowed_value) > File "apache_beam/runners/common.py", line 746, in > apache_beam.runners.common.DoFnRunner.process > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 800, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > raise_with_traceback(new_exn) > File "apache_beam/runners/common.py", line 744, in > apache_beam.runners.common.DoFnRunner.process > return self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 423, in > apache_beam.runners.common.SimpleInvoker.invoke_process > windowed_value, self.process_method(windowed_value.value)) > File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", > line 860, in split_source > total_size = source.estimate_size() > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 137, in _f > return fnc(self, *args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py", > line 193, in estimate_size > match_result = FileSystems.match([pattern])[0] > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", > line 186, in match > filesystem = FileSystems.get_filesystem(patterns[0]) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", > line 98, in get_filesystem > return systems[0](pipeline_options=options) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/hadoopfilesystem.py", > line 110, in __init__ > raise ValueError('pipeline_options is not set') > ValueError: pipeline_options is not set [while running 'read/Read/Split'] > > > On 29 May 2019, at 3:44 PM, Robert Bradshaw <[email protected]> wrote: > > Glad you were able to figure it out! > > Agree the error message was suboptimal. Was there any indication in > the logs that the hadoop file system attempted to load but failed? > > On Wed, May 29, 2019 at 4:41 AM 青雉(祁明良) <[email protected]> wrote: > > > Thanks guys, I got it. It was because Flink taskmanager docker missing > HADOOP_CONF_DIR environment. > Maybe we could improve the error message in the future:) > > Best, > Mingliang > > On 29 May 2019, at 3:12 AM, Lukasz Cwik <[email protected]> wrote: > > Are you losing the META-INF/ ServiceLoader entries related to binding the > FileSystem via the FileSystemRegistrar when building the uber jar[1]? > It does look like the Flink JobServer driver is registering the file > systems[2]. > > 1: > https://github.com/apache/beam/blob/95297dd82bd2fd3986900093cc1797c806c859e6/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java#L33 > 2: > https://github.com/apache/beam/blob/ee96f66e14866f9642e9c67bf2ef231be7e7d55b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java#L63 > > On Tue, May 28, 2019 at 11:39 AM 青雉(祁明良) <[email protected]> wrote: > > > Yes, I did (2). Since the job server successfully > created the artifact directory, I think I did it correctly. And somehow > this dependency is not submitted to task manager. > Maybe I can also try out (1), but to add additional jar to flink classpath > sounds not a perfect solution. > > 获取 Outlook for iOS > > > > On Wed, May 29, 2019 at 1:01 AM +0800, "Maximilian Michels" < > [email protected]> wrote: > > Hi Mingliang, > > Oh I see. You will also have to add the Jars to the TaskManager then. > > You have these options: > > 1. Include them directly in the TaskManager classpath > 2. Include them as dependencies to the JobServer, which will cause them > to be attached to Flink's JobGraph. > > Do I understand correctly that you already did (2)? > > Cheers, > Max > > On 28.05.19 18:33, 青雉(祁明良) wrote: > > Yes Max, I did add these Hadoop jars. The error > message from task manager was about missing HDFS file system class from > beam-sdks-java-io-hadoop-file-system module, which I also shadowed into > job server. > I see the artifact directory is successfully created at HDFS by job > server, but fails at task manager when reading. > > Best, > Mingliang > > 获取 Outlook for iOS > > > > On Tue, May 28, 2019 at 11:47 PM +0800, "Maximilian Michels" > > wrote: > > > Recent versions of Flink do not bundle Hadoop anymore, but they are > still "Hadoop compatible". You just need to include the Hadoop jars in > the classpath. > > Beams's Hadoop does not bundle Hadoop either, it just provides Beam file > system abstractions which are similar to Flink "Hadoop compatibility". > > You probably want to add this to the job server: > shadow library.java.hadoop_client > shadow library.java.hadoop_common > > Cheers, > Max > > On 28.05.19 15:41, 青雉(祁明良) wrote: > > Thanks Robert, I had one, “qmlmoon” > > Looks like I had the jobserver working now, I just add a shadow > dependency of /beam-sdks-java-io-hadoop-file-system/ to > /beam-runners-flink_2.11-job-server/ and rebuild the job server, but > Flink taskmanger also complains about the same issue during job running. > > So how is Flink taskmanager finding this HDFS filesystem dependency? > ------- > 2019-05-28 13:15:57,695 INFO > > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService > - GetManifest for > > hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST > 2019-05-28 13:15:57,696 INFO > > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService > - Loading manifest for retrieval token > > hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST > 2019-05-28 13:15:57,698 INFO > > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService > - GetManifest for > > hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST > failed > > org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalArgumentException: No filesystem found for scheme hdfs > at > > org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214) > at > > org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053) > at > > org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057) > at > > org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986) > at > > org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.getManifest(BeamFileSystemArtifactRetrievalService.java:80) > at > > org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:298) > at > > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171) > at > > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) > at > > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) > at > > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) > at > > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) > at > > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283) > at > > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707) > at > > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > > On 28 May 2019, at 9:31 PM, Robert Bradshaw > > wrote: >> >> The easiest > would probably be to create a project > > that depends on both >> the job server and the hadoop filesystem and > then build that as a fat >> jar. > > > 本邮件及其附件含有小红书公司 > 的保密信息,仅限于发送给以上收件人或群组。禁 > 止任何其他人以任何形 > 式使用(包括但不限于全部或部分地泄露、复制、或散发) > 本邮件中的信 > 息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本 > 邮 > 件! > This communication may contain privileged or other > confidential > information of Red. If you have received it in error, > please advise the > sender by reply e-mail and immediately delete > the message and any > attachments without copying or disclosing the > contents. Thank you. > > > > 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁 > 止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发) > 本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本 > 邮件! > This communication may contain privileged or other confidential > information of Red. If you have received it in error, please advise the > sender by reply e-mail and immediately delete the message and any > attachments without copying or disclosing the contents. Thank you. > > > > > 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! > This communication may contain privileged or other confidential > information of Red. If you have received it in error, please advise the > sender by reply e-mail and immediately delete the message and any > attachments without copying or disclosing the contents. Thank you. > > > > > 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! > This communication may contain privileged or other confidential > information of Red. If you have received it in error, please advise the > sender by reply e-mail and immediately delete the message and any > attachments without copying or disclosing the contents. Thank you. > > > > 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! > > This communication may contain privileged or other confidential > information of Red. If you have received it in error, please advise the > sender by reply e-mail and immediately delete the message and any > attachments without copying or disclosing the contents. Thank you. > >
