Kyle,

Thanks a lot for the pointers. I got interested to run my beam pipeline on 
FlinkRunner and got a local Flink cluster setup, tested a sample code to work 
fine.

I started the Beam job runner going:
docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master 
$IP:8081 --job-host $IP  --job-port 8099

Submitted a beam pipeline, which when run with LocalRunner works totally fine. 
The last stage of the pipeline code looks as follows:
. . .
. . .
. . .
    output= (
        {
            'Mean Open': mean_open,
            'Mean Close': mean_close
        } |
        beam.CoGroupByKey() |
        beam.io.WriteToText(args.output)
    )

So, we are ending the pipeline with a io.WriteToText()

Now, when I supply a filename, whether residing in local disk (/tmp) or network 
mounted disk(e.g /nas2), I get the following error:
python test-beam.py –input data/sp500.csv –output /tmp/result.txt

WARNING:root:Make sure that locally built Python SDK docker image has Python 
3.6 interpreter.
ERROR:root:java.lang.RuntimeException: Error received from SDK harness for 
instruction 2: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 883, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 667, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 748, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 
1095, in _finalize_write
    writer = sink.open_writer(init_result, str(uuid.uuid4()))
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", 
line 140, in _f
    return fnc(self, *args, **kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 
191, in open_writer
    return FileBasedSinkWriter(self, writer_path)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 
395, in __init__
    self.temp_handle = self.sink.open(temp_shard_path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/textio.py", line 
397, in open
    file_handle = super(_TextSink, self).open(temp_path)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", 
line 140, in _f
    return fnc(self, *args, **kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 
134, in open
    return FileSystems.create(temp_path, self.mime_type, self.compression_type)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 217, in create
    return filesystem.create(path, mime_type, compression_type)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", 
line 155, in create
    return self._path_open(path, 'wb', mime_type, compression_type)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/localfilesystem.py", 
line 137, in _path_open
    raw_file = open(path, mode)
FileNotFoundError: [Errno 2] No such file or directory: 
'/tmp/beam-temp-result.txt-43eab4947dd811eab6a2002590f97cb6/dbc67656-ad7a-4b8b-97f1-6a223bb7afde.result.txt'


It appears that the filesystem in the client side is not the same as the 
environment that Flink creates to run the Beam pipeline (I think Flink does a 
docker run of the python sdk to run the Beam pipeline? In that case, how would 
the container know where to write the file?)

Please help me debug. The Flink monitoring dashboard shows the several stages 
of the job, Map, Reduce and what not… In the end, the status is FAILED.

-Buvana

From: Kyle Weaver <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Monday, April 13, 2020 at 11:57 AM
To: "[email protected]" <[email protected]>
Subject: Re: SparkRunner on k8s

Hi Buvana,

Running Beam Python on Spark on Kubernetes is more complicated, because Beam 
has its own solution for running Python code [1]. Unfortunately there's no 
guide that I know of for Spark yet, however we do have instructions for Flink 
[2]. Beam's Flink and Spark runners, and I assume GCP's (unofficial) Flink and 
Spark [3] operators, are probably similar enough that it shouldn't be too hard 
to port the YAML from the Flink operator to the Spark operator. I filed an 
issue for it [4], but I probably won't have the bandwidth to work on it myself 
for a while.

- Kyle

[1] https://beam.apache.org/roadmap/portability/
[2] 
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md
[3] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/
[4] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/870

On Sat, Apr 11, 2020 at 4:33 PM Ramanan, Buvana (Nokia - US/Murray Hill) 
<[email protected]<mailto:[email protected]>> 
wrote:
Thank you, Rahul for your very useful response. Can you please extend your 
response by commenting on the procedure for Beam python pipeline?

From: rahul patwari 
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Friday, April 10, 2020 at 10:57 PM
To: user <[email protected]<mailto:[email protected]>>
Subject: Re: SparkRunner on k8s

Hi Buvana,

You can submit a Beam Pipeline to Spark on k8s like any other Spark Pipeline 
using the spark-submit script.

Create an Uber Jar of your Beam code and provide it as the primary resource to 
spark-submit. Provide the k8s master and the container image to use as 
arguments to spark-submit.
Refer https://spark.apache.org/docs/latest/running-on-kubernetes.html to know 
more about how to run Spark on k8s.

The Beam pipeline will be translated to a Spark Pipeline using Spark APIs in 
Runtime.

Regards,
Rahul

On Sat, Apr 11, 2020 at 4:38 AM Ramanan, Buvana (Nokia - US/Murray Hill) 
<[email protected]<mailto:[email protected]>> 
wrote:
Hello,

I newly joined this group and I went through the archive to see if any 
discussion exists on submitting Beam pipelines to a SparkRunner on k8s.

I run my Spark jobs on a k8s cluster in the cluster mode. Would like to deploy 
my beam pipeline on a SparkRunner with k8s underneath.

The Beam documentation:
https://beam.apache.org/documentation/runners/spark/
does not discuss about k8s (though there is mention of Mesos and YARN).

Can someone please point me to relevant material in this regard? Or, provide 
the steps for running my beam pipeline in this configuration?

Thank you,
Regards,
Buvana

Reply via email to