The configuration looks good but the HDFS file system implementation is
not intended to be used directly.

Instead of:

> lines = p | 'ReadMyFile' >> beam.Create(


>  lines = p | 'ReadMyFile' >>


On 28.05.20 06:06, Ramanan, Buvana (Nokia - US/Murray Hill) wrote:
> Further to that:
> At the Flink Job/Task Manager end, I configured/setup the following:
> hadoop jars copied under $FLINK_HOME/lib
> And made sure a pyflink script is able to read and write into the hdfs
> system.
> Should I setup / configure anything at the Job Server?
> I came across this thread, which helped to some extent, but not completely:
> *From: *"Ramanan, Buvana (Nokia - US/Murray Hill)"
> <>
> *Reply-To: *"" <>
> *Date: *Wednesday, May 27, 2020 at 8:30 PM
> *To: *"" <>
> *Subject: *Flink Runner with HDFS
> Hello,
> I am trying to read from, process and write data to HDFS with beam
> pipelines in python. Using Flink Runner. Beam version 2.19.0. Flink 1.9.2
> My initial code (pasted below my sign) to make sure I can read and
> write, works fine on Local Runner. However, I get the following error
> message (pasted below my sign) at the Flink Job manager when I use
> Portable Runner with Flink Runner I invoke Job server as:
> docker run --net=host apachebeam/flink1.9_job_server:latest
> --flink-master $IP:8081 --job-host $IP  --job-port 8099
> I am supplying the pipeline options in my python code. Yet the error
> message is regarding the missing pipeline_options. And strangely the
> python command at the client side does not return any error message and
> simply terminates.  
> Please let me know how I can fix my code and get this running.
> Thank you,
> Regards,
> Buvana
> ----- Code that reads and writes to hdfs ------
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import HadoopFileSystemOptions
> from import HadoopFileSystem
> options = PipelineOptions([
>     "--hdfs_host=XXXXXX.143",
>     "--hdfs_user=hdfs",
>     "--hdfs_port=50070",
>     "--runner=PortableRunner",
>     "--job_endpoint=XXXXXXX.134:8099"
> ])
> p = beam.Pipeline(options=options)
> HDFS_PORT = 50070
> hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME,
> hdfs_port=HDFS_PORT, hdfs_user='hdfs')
> hdfs_client = HadoopFileSystem(hdfs_client_options)
> input_file_hdfs = "hdfs://user/buvana/manifest.csv"
> lines = p | 'ReadMyFile' >> beam.Create(
> res = lines | "WriteMyFile" >>
>"hdfs://user/buvana/copy_frfr", ".csv")
> -------------- error message at the Flink Job Manager -------------
> 2020-05-2720:13:10
> java.util.concurrent.ExecutionException: java.lang.RuntimeException:
> Errorreceived fromSDKharness forinstruction 2: Traceback(most recent
> call last):
>   File"apache_beam/runners/", line 883,
> inapache_beam.runners.common.DoFnRunner.process
>   File"apache_beam/runners/", line 667,
> inapache_beam.runners.common.PerWindowInvoker.invoke_process
>   File"apache_beam/runners/", line 748,
> inapache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
> File"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/transforms/",
> line 1435, in<lambda>
>     wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
> File"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/io/",
> line 1001, in<lambda>
>     lambda _, sink: sink.initialize_write(), self.sink)
> File"/usr/local/lib/python3.6/site-packages/apache_beam/options/",
> line 140, in_f
>     returnfnc(self, *args, **kwargs)
> File"/usr/local/lib/python3.6/site-packages/apache_beam/io/",
> line 163, ininitialize_write
>     tmp_dir = self._create_temp_dir(file_path_prefix)
> File"/usr/local/lib/python3.6/site-packages/apache_beam/io/",
> line 168, in_create_temp_dir
>     base_path, last_component = FileSystems.split(file_path_prefix)
> File"/usr/local/lib/python3.6/site-packages/apache_beam/io/",
> line 147, insplit
>     filesystem = FileSystems.get_filesystem(path)
> File"/usr/local/lib/python3.6/site-packages/apache_beam/io/",
> line 110, inget_filesystem
>     returnsystems[0](pipeline_options=options)
> File"/usr/local/lib/python3.6/site-packages/apache_beam/io/",
> line 115, in__init__
>     raise ValueError('pipeline_options is not set')
> ValueError: pipeline_options is notset

Reply via email to