Hi Buvana,

I suspect this is a bug. If you can try running your pipeline again with
these changes:

1. Remove `--spark-master-url spark://YYYYYYYY:7077` from your Docker run
command.
2. Add `--environment_type=LOOPBACK` to your pipeline options.

It will help us confirm the cause of the issue.

On Thu, May 28, 2020 at 7:12 PM Ramanan, Buvana (Nokia - US/Murray Hill) <
[email protected]> wrote:

> Kyle, Max, All,
>
>
>
> I am desperately trying to get Beam working on at least one of the runners
> of Flink or Spark. Facing failures in both cases with similar message.
>
>
>
> Flink runner issue (Beam v 2.19.0) was reported yesterday with a
> permalink:
> https://lists.apache.org/thread.html/r4977083014eb2d252710ad24ed32d5ff3c402ba161e7b36328a3bd87%40%3Cuser.beam.apache.org%3E
>
>
>
> Also came across this related discussion:
>
>
> https://lists.apache.org/thread.html/a9b6f019b22a65640c272a9497a3c9cc34d68cc2b5c1c9bdebc7ff38%40%3Cuser.beam.apache.org%3E
>
>
>
> I get a similar error message with Spark Runner as I got with the Flink
> Runner (although its now the newer version of Beam). I paste my environment
> details, code and the error message below. Code runs fine on Direct Runner.
>
>
>
> HADOOP_CONF_DIR is configured aptly before running Spark Master and Slave.
>
>
>
> I hope to make some headway soon. Please help – may be I have to downgrade
> to a lower version of Beam where this issue did not exist; if so, plmk the
> version #
>
>
>
> Thank you,
>
> Regards,
>
> Buvana
>
>
>
> *Spark Runner scenario:*
>
>
>
> Beam version 2.21.0 on both the client end and the Job server ends.
>
>
>
> Docker Spark Job Server:
>
> https://hub.docker.com/r/apache/beam_spark_job_server
>
> docker run --net=host apache/beam_spark_job_server:latest --job-host
> XXXXXXX --job-port 8099 --spark-master-url spark://YYYYYYYY:7077
>
>
>
> Client code:
>
>
>
> options = PipelineOptions([
>
>     "--hdfs_host=ZZZZZZZZZ",
>
>     "--hdfs_user=hdfs",
>
>     "--hdfs_port=50070",
>
>     "--runner=PortableRunner",
>
>     "--job_endpoint=XXXXXXXXX:8099"
>
> ])
>
> p = beam.Pipeline(options=options)
>
> input_file_hdfs = "hdfs://user/buvana/manifest"
>
> lines = p | 'ReadMyFile' >> beam.io.ReadFromText(input_file_hdfs)
>
> res = lines | "WriteMyFile" >>
> beam.io.WriteToText("hdfs://user/buvana/copy-manifest", ".csv")
>
> p.run()
>
>
>
> Error message at the Spark Master UI:
>
>
>
> worker/operations.py", line 195, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>
>   File "apache_beam/runners/worker/operations.py", line 670, in
> apache_beam.runners.worker.operations.DoOperation.process
>
>   File "apache_beam/runners/worker/operations.py", line 671, in
> apache_beam.runners.worker.operations.DoOperation.process
>
>   File "apache_beam/runners/common.py", line 963, in
> apache_beam.runners.common.DoFnRunner.process
>
>   File "apache_beam/runners/common.py", line 1045, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>
>   File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py",
> line 421, in raise_with_traceback
>
>     raise exc.with_traceback(traceback)
>
>   File "apache_beam/runners/common.py", line 961, in
> apache_beam.runners.common.DoFnRunner.process
>
>   File "apache_beam/runners/common.py", line 726, in
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>
>   File "apache_beam/runners/common.py", line 814, in
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>
>   File
> "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/transforms/core.py",
> line 1501, in <lambda>
>
>     wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
>
>   File
> "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/io/iobase.py",
> line 1005, in <lambda>
>
>     lambda _, sink: sink.initialize_write(), self.sink)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
> line 135, in _f
>
>     return fnc(self, *args, **kwargs)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 167, in initialize_write
>
>     tmp_dir = self._create_temp_dir(file_path_prefix)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
> line 172, in _create_temp_dir
>
>     base_path, last_component = FileSystems.split(file_path_prefix)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
> line 151, in split
>
>     filesystem = FileSystems.get_filesystem(path)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
> line 113, in get_filesystem
>
>     return systems[0](pipeline_options=options)
>
>   File
> "/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py",
> line 114, in __init__
>
>     raise ValueError('pipeline_options is not set')
>
> ValueError: pipeline_options is not set [while running
> 'WriteMyFile/Write/WriteImpl/InitializeWrite']
>

Reply via email to