Hello,
I am trying to read from, process and write data to HDFS with beam pipelines in
python. Using Flink Runner. Beam version 2.19.0. Flink 1.9.2
My initial code (pasted below my sign) to make sure I can read and write, works
fine on Local Runner. However, I get the following error message (pasted below
my sign) at the Flink Job manager when I use Portable Runner with Flink Runner
I invoke Job server as:
docker run --net=host apachebeam/flink1.9_job_server:latest --flink-master
$IP:8081 --job-host $IP --job-port 8099
I am supplying the pipeline options in my python code. Yet the error message is
regarding the missing pipeline_options. And strangely the python command at the
client side does not return any error message and simply terminates.
Please let me know how I can fix my code and get this running.
Thank you,
Regards,
Buvana
----- Code that reads and writes to hdfs ------
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import HadoopFileSystemOptions
from apache_beam.io.hadoopfilesystem import HadoopFileSystem
options = PipelineOptions([
"--hdfs_host=XXXXXX.143",
"--hdfs_user=hdfs",
"--hdfs_port=50070",
"--runner=PortableRunner",
"--job_endpoint=XXXXXXX.134:8099"
])
p = beam.Pipeline(options=options)
HDFS_HOSTNAME = XXXXXX.143'
HDFS_PORT = 50070
hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME,
hdfs_port=HDFS_PORT, hdfs_user='hdfs')
hdfs_client = HadoopFileSystem(hdfs_client_options)
input_file_hdfs = "hdfs://user/buvana/manifest.csv"
lines = p | 'ReadMyFile' >> beam.Create(hdfs_client.open(input_file_hdfs))
res = lines | "WriteMyFile" >>
beam.io.WriteToText("hdfs://user/buvana/copy_frfr", ".csv")
p.run()
-------------- error message at the Flink Job Manager -------------
2020-05-27 20:13:10
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error
received from SDK harness for instruction 2: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 883, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 667, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 748, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File
"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/transforms/core.py",
line 1435, in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
File
"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/io/iobase.py",
line 1001, in <lambda>
lambda _, sink: sink.initialize_write(), self.sink)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
line 140, in _f
return fnc(self, *args, **kwargs)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line
163, in initialize_write
tmp_dir = self._create_temp_dir(file_path_prefix)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line
168, in _create_temp_dir
base_path, last_component = FileSystems.split(file_path_prefix)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 147, in split
filesystem = FileSystems.get_filesystem(path)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 110, in get_filesystem
return systems[0](pipeline_options=options)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py",
line 115, in __init__
raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set