Hi Buvana, I suspect this is a bug. If you can try running your pipeline again with these changes:
1. Remove `--spark-master-url spark://YYYYYYYY:7077` from your Docker run command. 2. Add `--environment_type=LOOPBACK` to your pipeline options. It will help us confirm the cause of the issue. On Thu, May 28, 2020 at 7:12 PM Ramanan, Buvana (Nokia - US/Murray Hill) < [email protected]> wrote: > Kyle, Max, All, > > > > I am desperately trying to get Beam working on at least one of the runners > of Flink or Spark. Facing failures in both cases with similar message. > > > > Flink runner issue (Beam v 2.19.0) was reported yesterday with a > permalink: > https://lists.apache.org/thread.html/r4977083014eb2d252710ad24ed32d5ff3c402ba161e7b36328a3bd87%40%3Cuser.beam.apache.org%3E > > > > Also came across this related discussion: > > > https://lists.apache.org/thread.html/a9b6f019b22a65640c272a9497a3c9cc34d68cc2b5c1c9bdebc7ff38%40%3Cuser.beam.apache.org%3E > > > > I get a similar error message with Spark Runner as I got with the Flink > Runner (although its now the newer version of Beam). I paste my environment > details, code and the error message below. Code runs fine on Direct Runner. > > > > HADOOP_CONF_DIR is configured aptly before running Spark Master and Slave. > > > > I hope to make some headway soon. Please help – may be I have to downgrade > to a lower version of Beam where this issue did not exist; if so, plmk the > version # > > > > Thank you, > > Regards, > > Buvana > > > > *Spark Runner scenario:* > > > > Beam version 2.21.0 on both the client end and the Job server ends. > > > > Docker Spark Job Server: > > https://hub.docker.com/r/apache/beam_spark_job_server > > docker run --net=host apache/beam_spark_job_server:latest --job-host > XXXXXXX --job-port 8099 --spark-master-url spark://YYYYYYYY:7077 > > > > Client code: > > > > options = PipelineOptions([ > > "--hdfs_host=ZZZZZZZZZ", > > "--hdfs_user=hdfs", > > "--hdfs_port=50070", > > "--runner=PortableRunner", > > "--job_endpoint=XXXXXXXXX:8099" > > ]) > > p = beam.Pipeline(options=options) > > input_file_hdfs = "hdfs://user/buvana/manifest" > > lines = p | 'ReadMyFile' >> beam.io.ReadFromText(input_file_hdfs) > > res = lines | "WriteMyFile" >> > beam.io.WriteToText("hdfs://user/buvana/copy-manifest", ".csv") > > p.run() > > > > Error message at the Spark Master UI: > > > > worker/operations.py", line 195, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive > > File "apache_beam/runners/worker/operations.py", line 670, in > apache_beam.runners.worker.operations.DoOperation.process > > File "apache_beam/runners/worker/operations.py", line 671, in > apache_beam.runners.worker.operations.DoOperation.process > > File "apache_beam/runners/common.py", line 963, in > apache_beam.runners.common.DoFnRunner.process > > File "apache_beam/runners/common.py", line 1045, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > > File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", > line 421, in raise_with_traceback > > raise exc.with_traceback(traceback) > > File "apache_beam/runners/common.py", line 961, in > apache_beam.runners.common.DoFnRunner.process > > File "apache_beam/runners/common.py", line 726, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > > File "apache_beam/runners/common.py", line 814, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > > File > "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/transforms/core.py", > line 1501, in <lambda> > > wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] > > File > "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/io/iobase.py", > line 1005, in <lambda> > > lambda _, sink: sink.initialize_write(), self.sink) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", > line 135, in _f > > return fnc(self, *args, **kwargs) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", > line 167, in initialize_write > > tmp_dir = self._create_temp_dir(file_path_prefix) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", > line 172, in _create_temp_dir > > base_path, last_component = FileSystems.split(file_path_prefix) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", > line 151, in split > > filesystem = FileSystems.get_filesystem(path) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", > line 113, in get_filesystem > > return systems[0](pipeline_options=options) > > File > "/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", > line 114, in __init__ > > raise ValueError('pipeline_options is not set') > > ValueError: pipeline_options is not set [while running > 'WriteMyFile/Write/WriteImpl/InitializeWrite'] >
