The configuration looks good but the HDFS file system implementation is
not intended to be used directly.

Instead of:

> lines = p | 'ReadMyFile' >> beam.Create(hdfs_client.open(input_file_hdfs))

Use:

>  lines = p | 'ReadMyFile' >> beam.io.ReadFromText(input_file_hdfs)


Best,
Max

On 28.05.20 06:06, Ramanan, Buvana (Nokia - US/Murray Hill) wrote:
> Further to that:
> 
>  
> 
> At the Flink Job/Task Manager end, I configured/setup the following:
> 
> HADOOP_CONF_DIR
> 
> HADOOP_USER_NAME
> 
> hadoop jars copied under $FLINK_HOME/lib
> 
>  
> 
> And made sure a pyflink script is able to read and write into the hdfs
> system.
> 
>  
> 
> Should I setup / configure anything at the Job Server?
> 
>  
> 
> I came across this thread, which helped to some extent, but not completely:
> 
> https://lists.apache.org/thread.html/a9b6f019b22a65640c272a9497a3c9cc34d68cc2b5c1c9bdebc7ff38%40%3Cuser.beam.apache.org%3E
> 
> 
>  
> 
> *From: *"Ramanan, Buvana (Nokia - US/Murray Hill)"
> <buvana.rama...@nokia-bell-labs.com>
> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
> *Date: *Wednesday, May 27, 2020 at 8:30 PM
> *To: *"user@beam.apache.org" <user@beam.apache.org>
> *Subject: *Flink Runner with HDFS
> 
>  
> 
> Hello,
> 
>  
> 
> I am trying to read from, process and write data to HDFS with beam
> pipelines in python. Using Flink Runner. Beam version 2.19.0. Flink 1.9.2
> 
>  
> 
> My initial code (pasted below my sign) to make sure I can read and
> write, works fine on Local Runner. However, I get the following error
> message (pasted below my sign) at the Flink Job manager when I use
> Portable Runner with Flink Runner I invoke Job server as:
> 
>  
> 
> docker run --net=host apachebeam/flink1.9_job_server:latest
> --flink-master $IP:8081 --job-host $IP  --job-port 8099
> 
>  
> 
> I am supplying the pipeline options in my python code. Yet the error
> message is regarding the missing pipeline_options. And strangely the
> python command at the client side does not return any error message and
> simply terminates.  
> 
>  
> 
> Please let me know how I can fix my code and get this running.
> 
>  
> 
> Thank you,
> 
> Regards,
> 
> Buvana
> 
>  
> 
> ----- Code that reads and writes to hdfs ------
> 
> import apache_beam as beam
> 
> from apache_beam.options.pipeline_options import PipelineOptions
> 
> from apache_beam.options.pipeline_options import HadoopFileSystemOptions
> 
> from apache_beam.io.hadoopfilesystem import HadoopFileSystem
> 
>  
> 
> options = PipelineOptions([
> 
>     "--hdfs_host=XXXXXX.143",
> 
>     "--hdfs_user=hdfs",
> 
>     "--hdfs_port=50070",
> 
>     "--runner=PortableRunner",
> 
>     "--job_endpoint=XXXXXXX.134:8099"
> 
> ])
> 
> p = beam.Pipeline(options=options)
> 
>  
> 
> HDFS_HOSTNAME = XXXXXX.143'
> 
> HDFS_PORT = 50070
> 
> hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME,
> hdfs_port=HDFS_PORT, hdfs_user='hdfs')
> 
> hdfs_client = HadoopFileSystem(hdfs_client_options)
> 
>  
> 
> input_file_hdfs = "hdfs://user/buvana/manifest.csv"
> 
>  
> 
> lines = p | 'ReadMyFile' >> beam.Create(hdfs_client.open(input_file_hdfs))
> 
> res = lines | "WriteMyFile" >>
> beam.io.WriteToText("hdfs://user/buvana/copy_frfr", ".csv")
> 
> p.run()
> 
>  
> 
> -------------- error message at the Flink Job Manager -------------
> 
> 2020-05-2720:13:10
> 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException:
> Errorreceived fromSDKharness forinstruction 2: Traceback(most recent
> call last):
> 
>   File"apache_beam/runners/common.py", line 883,
> inapache_beam.runners.common.DoFnRunner.process
> 
>   File"apache_beam/runners/common.py", line 667,
> inapache_beam.runners.common.PerWindowInvoker.invoke_process
> 
>   File"apache_beam/runners/common.py", line 748,
> inapache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
> 
>  
> File"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/transforms/core.py",
> line 1435, in<lambda>
> 
>     wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
> 
>  
> File"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/io/iobase.py",
> line 1001, in<lambda>
> 
>     lambda _, sink: sink.initialize_write(), self.sink)
> 
>  
> File"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
> line 140, in_f
> 
>     returnfnc(self, *args, **kwargs)
> 
>  
> File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 163, ininitialize_write
> 
>     tmp_dir = self._create_temp_dir(file_path_prefix)
> 
>  
> File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 168, in_create_temp_dir
> 
>     base_path, last_component = FileSystems.split(file_path_prefix)
> 
>  
> File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
> line 147, insplit
> 
>     filesystem = FileSystems.get_filesystem(path)
> 
>  
> File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
> line 110, inget_filesystem
> 
>     returnsystems[0](pipeline_options=options)
> 
>  
> File"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py",
> line 115, in__init__
> 
>     raise ValueError('pipeline_options is not set')
> 
> ValueError: pipeline_options is notset
> 
>  
> 

Reply via email to