Running Beam on Flink

2020-02-06 Thread Xander Song
I am having difficulty following the Python guide for running Beam on Flink
. I created a virtual
environment with Apache Beam installed, then I started up the JobService
Docker container with

docker run --net=host apachebeam/flink1.9_job_server:latest


I receive the following message confirming that the container is running.


[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
ArtifactStagingService started on localhost:8098

[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java
ExpansionService started on localhost:8097

[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
JobService started on localhost:8099


In another terminal, I execute a Beam script called
test_beam_local_flink.py based on the example.


from __future__ import print_function
import apache_beamfrom apache_beam.options.pipeline_options import
PipelineOptions

data = [1,2,3]

options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK"])
with apache_beam.Pipeline(options=options) as p:
  video_collection = (
p | apache_beam.Create(data)
  | apache_beam.Map(lambda x: x + 1)
  | apache_beam.Map(lambda x: print(x))
  )
print('Done')

After a wait, I get the following traceback.

/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/__init__.py:84:
UserWarning: You are using Apache Beam with Python 2. New releases of
Apache Beam will soon support Python 3 only.

  'You are using Apache Beam with Python 2. '

Traceback (most recent call last):

  File "test_beam_local_flink.py", line 18, in 

| apache_beam.Map(lambda x: print(x))

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 481, in __exit__

self.run().wait_until_finish()

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 461, in run

self._options).run(False)

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 474, in run

return self.runner.run_pipeline(self, self._options)

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 220, in run_pipeline

job_service = self.create_job_service(options)

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 136, in create_job_service

return server.start()

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/apache_beam/runners/portability/job_server.py",
line 59, in start

grpc.channel_ready_future(channel).result(timeout=self._timeout)

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
line 140, in result

self._block(timeout)

  File 
"/Users/xander/Projects/flink-test/env2/lib/python2.7/site-packages/grpc/_utilities.py",
line 86, in _block

raise grpc.FutureTimeoutError()

grpc.FutureTimeoutError



Any help is greatly appreciated.


Re: Stability of Timer.withOutputTimestamp

2020-02-06 Thread Steve Niemitz
cool, thank you.  I meant stable as in "my pipeline will produce correct
results", API changes are fine with me.

Still curious too on the second question wrt firing time vs output time
validation.

On Wed, Feb 5, 2020 at 11:20 PM Kenneth Knowles  wrote:

> It is definitely too new to be stable in the sense of not even tiny
> changes to the API / runtime compatibility.
>
> However, in my opinion it is so fundamental (and overdue) it will
> certainly exist in some form.
>
> Feel free to use it if you are OK with the possibility of minor
> compile-time adjustments and you do not require Dataflow pipeline update
> compatibility.
>
> Kenn
>
> On Wed, Feb 5, 2020 at 10:31 AM Luke Cwik  wrote:
>
>> +Reuven Lax 
>>
>> On Wed, Feb 5, 2020 at 7:33 AM Steve Niemitz  wrote:
>>
>>> Also, as a follow up, I'm curious about this commit:
>>>
>>> https://github.com/apache/beam/commit/80862f2de6f224c3a1e7885d197d1ca952ec07e3
>>>
>>> My use case is that I want to set a timer to fire after the max
>>> timestamp of a window, but hold the watermark to the max timestamp until it
>>> fires, essentially delaying the window closing by some amount of event
>>> time.  Previous to that revert commit it seems like that would have been
>>> possible, but now it would fail (since the target is after the window's
>>> maxTimestamp).
>>>
>>> What was the reason this was reverted, and are there plans to un-revert
>>> it?
>>>
>>> On Wed, Feb 5, 2020 at 10:01 AM Steve Niemitz 
>>> wrote:
>>>
 I noticed that Timer.withOutputTimestamp has landed in 2.19, but I
 didn't see any mention of it in the release notes.

 Is this feature considered stable (specifically on dataflow)?

>>>