I am having difficulty following the Python guide for running Beam on Flink
<https://beam.apache.org/documentation/runners/flink/>. I created a virtual
environment with Apache Beam installed, then I started up the JobService
Docker container with

docker run --net=host apachebeam/flink1.9_job_server:latest


I receive the following message confirming that the container is running.


[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
ArtifactStagingService started on localhost:8098

[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java
ExpansionService started on localhost:8097

[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
JobService started on localhost:8099


In another terminal, I execute a Beam script called
test_beam_local_flink.py based on the example.


from __future__ import print_function
import apache_beamfrom apache_beam.options.pipeline_options import
PipelineOptions

data = [1,2,3]

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=LOOPBACK"])
with apache_beam.Pipeline(options=options) as p:
  video_collection = (
    p | apache_beam.Create(data)
      | apache_beam.Map(lambda x: x + 1)
      | apache_beam.Map(lambda x: print(x))
  )
print('Done')

After a wait, I get the following traceback.

/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/__init__.py:84:
UserWarning: You are using Apache Beam with Python 2. New releases of
Apache Beam will soon support Python 3 only.

  'You are using Apache Beam with Python 2. '

Traceback (most recent call last):

  File "test_beam_local_flink.py", line 18, in <module>

    | apache_beam.Map(lambda x: print(x))

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 481, in __exit__

    self.run().wait_until_finish()

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 461, in run

    self._options).run(False)

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 474, in run

    return self.runner.run_pipeline(self, self._options)

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 220, in run_pipeline

    job_service = self.create_job_service(options)

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 136, in create_job_service

    return server.start()

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/job_server.py",
line 59, in start

    grpc.channel_ready_future(channel).result(timeout=self._timeout)

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
line 140, in result

    self._block(timeout)

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
line 86, in _block

    raise grpc.FutureTimeoutError()

grpc.FutureTimeoutError



Any help is greatly appreciated.

Reply via email to