Re: [UPDATE] Beam 2.16.0 Release Tracking

2019-09-13 Thread Mark Liu
New nightly snapshot was built and published to
http://repository.apache.org/content/groups/snapshots/org/apache/beam/.

I'm working on release branch validation and blocker cherry-pick. The
blockers are listed in jira

.

On Wed, Sep 11, 2019 at 2:39 PM Mark Liu  wrote:

> Hi all,
>
> The release-2.16.0 branch
>  is cut on
> commit/8f919849766269b504dd17439f698ab97c6d89ad
> 
> .
> The next step would be building snapshots and verify release branch.
>
> Regards,
> Mark
>


Re: Pointers on Contributing to Structured Streaming Spark Runner

2019-09-13 Thread Xinyu Liu
Hi, Etienne,

The slides are very informative! Thanks for sharing the details about how
the Beam API are mapped into Spark Structural Streaming. We (LinkedIn) are
also interested in trying the new SparkRunner to run Beam pipeine in batch,
and contribute to it too. From my understanding, seems the functionality on
batch side is mostly complete and covers quite a large percentage of the
tests (a few missing pieces like state and timer in ParDo and SDF). If so,
is it possible to merge the new runner sooner into master so it's much
easier for us to pull it in (we have an internal fork) and contribute back?

Also curious about the scheme part in the runner. Seems we can leverage the
schema-aware work in PCollection and translate from Beam schema to Spark,
so it can be optimized in the planner layer. It will be great to hear back
your plans on that.

Congrats on this great work!
Thanks,
Xinyu

On Wed, Sep 11, 2019 at 6:02 PM Rui Wang  wrote:

> Hello Etienne,
>
> Your slide mentioned that streaming mode development is blocked because
> Spark lacks supporting multiple-aggregations in its streaming mode but
> design is ongoing. Do you have a link or something else to their design
> discussion/doc?
>
>
> -Rui
>
> On Wed, Sep 11, 2019 at 5:10 PM Etienne Chauchot 
> wrote:
>
>> Hi Rahul,
>> Sure, and great ! Thanks for proposing !
>> If you want details, here is the presentation I did 30 mins ago at the
>> apachecon. You will find the video on youtube shortly but in the meantime,
>> here is my presentation slides.
>>
>> And here is the structured streaming branch. I'll be happy to review your
>> PRs, thanks !
>>
>> 
>> https://github.com/apache/beam/tree/spark-runner_structured-streaming
>>
>> Best
>> Etienne
>>
>> Le mercredi 11 septembre 2019 à 16:37 +0530, rahul patwari a écrit :
>>
>> Hi Etienne,
>>
>> I came to know about the work going on in Structured Streaming Spark
>> Runner from Apache Beam Wiki - Works in Progress.
>> I have contributed to BeamSql earlier. And I am working on supporting
>> PCollectionView in BeamSql.
>>
>> I would love to understand the Runner's side of Apache Beam and
>> contribute to the Structured Streaming Spark Runner.
>>
>> Can you please point me in the right direction?
>>
>> Thanks,
>> Rahul
>>
>>


Re: How do you write portable runner pipeline on separate python code ?

2019-09-13 Thread Kyle Weaver
> Is it one of the best guarded secrets? ;-)
Apparently so!

Filed a few related jiras and assigned to myself.
[1] https://issues.apache.org/jira/browse/BEAM-8214
[2] https://issues.apache.org/jira/browse/BEAM-8232
[3] https://issues.apache.org/jira/browse/BEAM-8233

Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com


On Fri, Sep 13, 2019 at 9:57 AM Robert Bradshaw  wrote:

> Note that loopback won't fix the problem for, say, cross-language IOs.
> But, yes, it's really handy and should probably be used more.
>
> On Fri, Sep 13, 2019 at 8:29 AM Lukasz Cwik  wrote:
>
>> And/or update the wiki/website with some how to's...
>>
>> On Fri, Sep 13, 2019 at 7:51 AM Thomas Weise  wrote:
>>
>>> I agree that loopback would be preferable for this purpose. I just
>>> wasn't aware this even works with the portable Flink runner. Is it one of
>>> the best guarded secrets? ;-)
>>>
>>> Kyle, can you please post the pipeline options you would use for Flink?
>>>
>>>
>>> On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver  wrote:
>>>
 I prefer loopback because a) it writes output files to the local
 filesystem, as the user expects, and b) you don't have to pull or build
 docker images, or even have docker installed on your system -- which is one
 less point of failure.

 Kyle Weaver | Software Engineer | github.com/ibzib |
 kcwea...@google.com


 On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise  wrote:

> This should become much better with 2.16 when we have the Docker
> images prebuilt.
>
> Docker is probably still the best option for Python on a JVM based
> runner in a local environment that does not have a development setup.
>
>
> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver 
> wrote:
>
>> +dev  I think we should probably point new
>> users of the portable Flink/Spark runners to use loopback or some other
>> non-docker environment, as Docker adds some operational complexity that
>> isn't really needed to run a word count example. For example, Yu's 
>> pipeline
>> errored here because the expected Docker container wasn't built before
>> running.
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib |
>> kcwea...@google.com
>>
>>
>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw 
>> wrote:
>>
>>> On this note, making local files easy to read is something we'd
>>> definitely like to improve, as the current behavior is quite surprising.
>>> This could be useful not just for running with docker and the portable
>>> runner locally, but more generally when running on a distributed system
>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient 
>>> if we
>>> could automatically stage local files to be read as artifacts that 
>>> could be
>>> consumed by any worker (possibly via external directory mounting in the
>>> local docker case rather than an actual copy), and conversely copy small
>>> outputs back to the local machine (with the similar optimization for 
>>> local
>>> docker).
>>>
>>> At the very least, however, obvious messaging when the local
>>> filesystem is used from within docker, which is often a (non-obvious and
>>> hard to debug) mistake should be added.
>>>
>>>
>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik 
>>> wrote:
>>>
 When you use a local filesystem path and a docker environment,
 "/tmp" is written inside the container. You can solve this issue by:
 * Using a "remote" filesystem such as HDFS/S3/GCS/...
 * Mounting an external directory into the container so that any
 "local" writes appear outside the container
 * Using a non-docker environment such as external or process.

 On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe 
 wrote:

> Hello.
>
> I would like to ask for help with my sample code using portable
> runner using apache flink.
> I was able to work out the wordcount.py using this page.
>
> https://beam.apache.org/roadmap/portability/
>
> I got below two files under /tmp.
>
> -rw-r--r-- 1 ywatanabe ywatanabe185 Sep 12 19:56
> py-wordcount-direct-1-of-2
> -rw-r--r-- 1 ywatanabe ywatanabe190 Sep 12 19:56
> py-wordcount-direct-0-of-2
>
> Then I wrote sample code with below steps.
>
> 1.Install apache_beam using pip3 separate from source code
> directory.
> 2. Wrote sample code as below and named it
> "test-protable-runner.py".  Placed it separate directory from source 
> code.
>
> ---
> (python) ywatanabe@debian-09-00:~$ ls -ltr
> total 16
> drwxr-xr-x 18 

Re: How do you write portable runner pipeline on separate python code ?

2019-09-13 Thread Robert Bradshaw
Note that loopback won't fix the problem for, say, cross-language IOs. But,
yes, it's really handy and should probably be used more.

On Fri, Sep 13, 2019 at 8:29 AM Lukasz Cwik  wrote:

> And/or update the wiki/website with some how to's...
>
> On Fri, Sep 13, 2019 at 7:51 AM Thomas Weise  wrote:
>
>> I agree that loopback would be preferable for this purpose. I just wasn't
>> aware this even works with the portable Flink runner. Is it one of the best
>> guarded secrets? ;-)
>>
>> Kyle, can you please post the pipeline options you would use for Flink?
>>
>>
>> On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver  wrote:
>>
>>> I prefer loopback because a) it writes output files to the local
>>> filesystem, as the user expects, and b) you don't have to pull or build
>>> docker images, or even have docker installed on your system -- which is one
>>> less point of failure.
>>>
>>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>>
>>>
>>> On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise  wrote:
>>>
 This should become much better with 2.16 when we have the Docker images
 prebuilt.

 Docker is probably still the best option for Python on a JVM based
 runner in a local environment that does not have a development setup.


 On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver 
 wrote:

> +dev  I think we should probably point new users
> of the portable Flink/Spark runners to use loopback or some other
> non-docker environment, as Docker adds some operational complexity that
> isn't really needed to run a word count example. For example, Yu's 
> pipeline
> errored here because the expected Docker container wasn't built before
> running.
>
> Kyle Weaver | Software Engineer | github.com/ibzib |
> kcwea...@google.com
>
>
> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw 
> wrote:
>
>> On this note, making local files easy to read is something we'd
>> definitely like to improve, as the current behavior is quite surprising.
>> This could be useful not just for running with docker and the portable
>> runner locally, but more generally when running on a distributed system
>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if 
>> we
>> could automatically stage local files to be read as artifacts that could 
>> be
>> consumed by any worker (possibly via external directory mounting in the
>> local docker case rather than an actual copy), and conversely copy small
>> outputs back to the local machine (with the similar optimization for 
>> local
>> docker).
>>
>> At the very least, however, obvious messaging when the local
>> filesystem is used from within docker, which is often a (non-obvious and
>> hard to debug) mistake should be added.
>>
>>
>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik 
>> wrote:
>>
>>> When you use a local filesystem path and a docker environment,
>>> "/tmp" is written inside the container. You can solve this issue by:
>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>> * Mounting an external directory into the container so that any
>>> "local" writes appear outside the container
>>> * Using a non-docker environment such as external or process.
>>>
>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe 
>>> wrote:
>>>
 Hello.

 I would like to ask for help with my sample code using portable
 runner using apache flink.
 I was able to work out the wordcount.py using this page.

 https://beam.apache.org/roadmap/portability/

 I got below two files under /tmp.

 -rw-r--r-- 1 ywatanabe ywatanabe185 Sep 12 19:56
 py-wordcount-direct-1-of-2
 -rw-r--r-- 1 ywatanabe ywatanabe190 Sep 12 19:56
 py-wordcount-direct-0-of-2

 Then I wrote sample code with below steps.

 1.Install apache_beam using pip3 separate from source code
 directory.
 2. Wrote sample code as below and named it
 "test-protable-runner.py".  Placed it separate directory from source 
 code.

 ---
 (python) ywatanabe@debian-09-00:~$ ls -ltr
 total 16
 drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
 code directory)
 -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
 test-portable-runner.py

 ---
 3. Executed the code with "python3 test-protable-ruuner.py"


 ==
 #!/usr/bin/env

 import 

[PROPOSAL] Async ParDo API for Apache Beam

2019-09-13 Thread Bharath Kumara Subramanian
Hi all,

I have put a proposal

to
add async ParDo API.

For context, please refer to the archive conversation:
https://www.mail-archive.com/dev@beam.apache.org/msg12101.html

Thanks,
Bharath


Re: [DISCUSSION] ParDo Async Java API

2019-09-13 Thread Bharath Kumara Subramanian
Let me start a separate proposal thread and link this conversation.
Sorry about that.

On Fri, Sep 13, 2019 at 9:31 AM Bharath Kumara Subramanian <
codin.mart...@gmail.com> wrote:

> I have put together a design document
> 
> that consolidates our discussion in this thread.
> Please let me know your thoughts.
>
> Thanks,
> Bharath
>
>
>
> On Wed, Jan 30, 2019 at 10:18 AM Xinyu Liu  wrote:
>
>> I put the asks and email discussions in this JIRA to track the Async API:
>> https://jira.apache.org/jira/browse/BEAM-6550. Bharath on the
>> SamzaRunner side is willing to take a stab at this. He will come up with
>> some design doc based on our discussions. Will update the thread once it's
>> ready. Really appreciate all the suggestions and feedback here.
>>
>> Thanks,
>> Xinyu
>>
>> On Thu, Jan 24, 2019 at 2:41 PM Robert Bradshaw 
>> wrote:
>>
>>> That's a good point that this "IO" time should be tracked differently.
>>>
>>> For a single level, a wrapper/utility that correctly and completely
>>> (and transparently) implements the "naive" bit I sketched above under
>>> the hood may be sufficient and implementable purely in user-space, and
>>> quite useful.
>>>
>>> On Thu, Jan 24, 2019 at 7:38 PM Scott Wegner  wrote:
>>> >
>>> > Makes sense to me. We should make it easier to write DoFn's in this
>>> pattern that has emerged as common among I/O connectors.
>>> >
>>> > Enabling asynchronous task chaining across a fusion tree is more
>>> complicated but not necessary for this scenario.
>>> >
>>> > On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz 
>>> wrote:
>>> >>
>>> >> It's also important to note that in many (most?) IO frameworks (gRPC,
>>> finagle, etc), asynchronous IO is typically completely non-blocking, so
>>> there generally won't be a large number of threads waiting for IO to
>>> complete.  (netty uses a small pool of threads for the Event Loop Group for
>>> example).
>>> >>
>>> >> But in general I agree with Reuven, runners should not count threads
>>> in use in other thread pools for IO for the purpose of autoscaling (or most
>>> kinds of accounting).
>>> >>
>>> >> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax  wrote:
>>> >>>
>>> >>> As Steve said, the main rationale for this is so that asynchronous
>>> IOs (or in general, asynchronous remote calls) call be made. To some degree
>>> this addresses Scott's concern: the asynchronous threads should be, for the
>>> most part, simply waiting for IOs to complete; the reason to do the waiting
>>> asynchronously is so that the main threadpool does not become blocked,
>>> causing the pipeline to become IO bound. A runner like Dataflow should not
>>> be tracking these threads for the purpose of autoscaling, as adding more
>>> workers will (usually) not cause these calls to complete any faster.
>>> >>>
>>> >>> Reuven
>>> >>>
>>> >>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz 
>>> wrote:
>>> 
>>>  I think I agree with a lot of what you said here, I'm just going to
>>> restate my initial use-case to try to make it more clear as well.
>>> 
>>>  From my usage of beam, I feel like the big benefit of async DoFns
>>> would be to allow batched IO to be implemented more simply inside a DoFn.
>>> Even in the Beam SDK itself, there are a lot of IOs that batch up IO
>>> operations in ProcessElement and wait for them to complete in FinishBundle
>>> ([1][2], etc).  From my experience, things like error handling, emitting
>>> outputs as the result of an asynchronous operation completing (in the
>>> correct window, with the correct timestamp, etc) get pretty tricky, and it
>>> would be great for the SDK to provide support natively for it.
>>> 
>>>  It's also probably good to point out that really only DoFns that do
>>> IO should be asynchronous, normal CPU bound DoFns have no reason to be
>>> asynchronous.
>>> 
>>>  A really good example of this is an IO I had written recently for
>>> Bigtable, it takes an input PCollection of ByteStrings representing row
>>> keys, and returns a PCollection of the row data from bigtable.  Naively
>>> this could be implemented by simply blocking on the Bigtable read inside
>>> the ParDo, however this would limit throughput substantially (even assuming
>>> an avg read latency is 1ms, thats still only 1000 QPS / instance of the
>>> ParDo).  My implementation batches many reads together (as they arrive at
>>> the DoFn), executes them once the batch is big enough (or some time
>>> passes), and then emits them once the batch read completes.  Emitting them
>>> in the correct window and handling errors gets tricky, so this is certainly
>>> something I'd love the framework itself to handle.
>>> 
>>>  I also don't see a big benefit of making a DoFn receive a future,
>>> if all a user is ever supposed to do is attach a continuation to it, that
>>> could just as easily be done by the runner itself, basically just invoking
>>> 

Re: [DISCUSSION] ParDo Async Java API

2019-09-13 Thread Bharath Kumara Subramanian
I have put together a design document

that consolidates our discussion in this thread.
Please let me know your thoughts.

Thanks,
Bharath



On Wed, Jan 30, 2019 at 10:18 AM Xinyu Liu  wrote:

> I put the asks and email discussions in this JIRA to track the Async API:
> https://jira.apache.org/jira/browse/BEAM-6550. Bharath on the SamzaRunner
> side is willing to take a stab at this. He will come up with some design
> doc based on our discussions. Will update the thread once it's ready.
> Really appreciate all the suggestions and feedback here.
>
> Thanks,
> Xinyu
>
> On Thu, Jan 24, 2019 at 2:41 PM Robert Bradshaw 
> wrote:
>
>> That's a good point that this "IO" time should be tracked differently.
>>
>> For a single level, a wrapper/utility that correctly and completely
>> (and transparently) implements the "naive" bit I sketched above under
>> the hood may be sufficient and implementable purely in user-space, and
>> quite useful.
>>
>> On Thu, Jan 24, 2019 at 7:38 PM Scott Wegner  wrote:
>> >
>> > Makes sense to me. We should make it easier to write DoFn's in this
>> pattern that has emerged as common among I/O connectors.
>> >
>> > Enabling asynchronous task chaining across a fusion tree is more
>> complicated but not necessary for this scenario.
>> >
>> > On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz 
>> wrote:
>> >>
>> >> It's also important to note that in many (most?) IO frameworks (gRPC,
>> finagle, etc), asynchronous IO is typically completely non-blocking, so
>> there generally won't be a large number of threads waiting for IO to
>> complete.  (netty uses a small pool of threads for the Event Loop Group for
>> example).
>> >>
>> >> But in general I agree with Reuven, runners should not count threads
>> in use in other thread pools for IO for the purpose of autoscaling (or most
>> kinds of accounting).
>> >>
>> >> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax  wrote:
>> >>>
>> >>> As Steve said, the main rationale for this is so that asynchronous
>> IOs (or in general, asynchronous remote calls) call be made. To some degree
>> this addresses Scott's concern: the asynchronous threads should be, for the
>> most part, simply waiting for IOs to complete; the reason to do the waiting
>> asynchronously is so that the main threadpool does not become blocked,
>> causing the pipeline to become IO bound. A runner like Dataflow should not
>> be tracking these threads for the purpose of autoscaling, as adding more
>> workers will (usually) not cause these calls to complete any faster.
>> >>>
>> >>> Reuven
>> >>>
>> >>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz 
>> wrote:
>> 
>>  I think I agree with a lot of what you said here, I'm just going to
>> restate my initial use-case to try to make it more clear as well.
>> 
>>  From my usage of beam, I feel like the big benefit of async DoFns
>> would be to allow batched IO to be implemented more simply inside a DoFn.
>> Even in the Beam SDK itself, there are a lot of IOs that batch up IO
>> operations in ProcessElement and wait for them to complete in FinishBundle
>> ([1][2], etc).  From my experience, things like error handling, emitting
>> outputs as the result of an asynchronous operation completing (in the
>> correct window, with the correct timestamp, etc) get pretty tricky, and it
>> would be great for the SDK to provide support natively for it.
>> 
>>  It's also probably good to point out that really only DoFns that do
>> IO should be asynchronous, normal CPU bound DoFns have no reason to be
>> asynchronous.
>> 
>>  A really good example of this is an IO I had written recently for
>> Bigtable, it takes an input PCollection of ByteStrings representing row
>> keys, and returns a PCollection of the row data from bigtable.  Naively
>> this could be implemented by simply blocking on the Bigtable read inside
>> the ParDo, however this would limit throughput substantially (even assuming
>> an avg read latency is 1ms, thats still only 1000 QPS / instance of the
>> ParDo).  My implementation batches many reads together (as they arrive at
>> the DoFn), executes them once the batch is big enough (or some time
>> passes), and then emits them once the batch read completes.  Emitting them
>> in the correct window and handling errors gets tricky, so this is certainly
>> something I'd love the framework itself to handle.
>> 
>>  I also don't see a big benefit of making a DoFn receive a future, if
>> all a user is ever supposed to do is attach a continuation to it, that
>> could just as easily be done by the runner itself, basically just invoking
>> the entire ParDo as a continuation on the future (which then assumes the
>> runner is even representing these tasks as futures internally).
>> 
>>  Making the DoFn itself actually return a future could be an option,
>> even if the language itself doesn't support something like 

Re: How do you write portable runner pipeline on separate python code ?

2019-09-13 Thread Lukasz Cwik
And/or update the wiki/website with some how to's...

On Fri, Sep 13, 2019 at 7:51 AM Thomas Weise  wrote:

> I agree that loopback would be preferable for this purpose. I just wasn't
> aware this even works with the portable Flink runner. Is it one of the best
> guarded secrets? ;-)
>
> Kyle, can you please post the pipeline options you would use for Flink?
>
>
> On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver  wrote:
>
>> I prefer loopback because a) it writes output files to the local
>> filesystem, as the user expects, and b) you don't have to pull or build
>> docker images, or even have docker installed on your system -- which is one
>> less point of failure.
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>
>>
>> On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise  wrote:
>>
>>> This should become much better with 2.16 when we have the Docker images
>>> prebuilt.
>>>
>>> Docker is probably still the best option for Python on a JVM based
>>> runner in a local environment that does not have a development setup.
>>>
>>>
>>> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver  wrote:
>>>
 +dev  I think we should probably point new users
 of the portable Flink/Spark runners to use loopback or some other
 non-docker environment, as Docker adds some operational complexity that
 isn't really needed to run a word count example. For example, Yu's pipeline
 errored here because the expected Docker container wasn't built before
 running.

 Kyle Weaver | Software Engineer | github.com/ibzib |
 kcwea...@google.com


 On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw 
 wrote:

> On this note, making local files easy to read is something we'd
> definitely like to improve, as the current behavior is quite surprising.
> This could be useful not just for running with docker and the portable
> runner locally, but more generally when running on a distributed system
> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if 
> we
> could automatically stage local files to be read as artifacts that could 
> be
> consumed by any worker (possibly via external directory mounting in the
> local docker case rather than an actual copy), and conversely copy small
> outputs back to the local machine (with the similar optimization for local
> docker).
>
> At the very least, however, obvious messaging when the local
> filesystem is used from within docker, which is often a (non-obvious and
> hard to debug) mistake should be added.
>
>
> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik  wrote:
>
>> When you use a local filesystem path and a docker environment, "/tmp"
>> is written inside the container. You can solve this issue by:
>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>> * Mounting an external directory into the container so that any
>> "local" writes appear outside the container
>> * Using a non-docker environment such as external or process.
>>
>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe 
>> wrote:
>>
>>> Hello.
>>>
>>> I would like to ask for help with my sample code using portable
>>> runner using apache flink.
>>> I was able to work out the wordcount.py using this page.
>>>
>>> https://beam.apache.org/roadmap/portability/
>>>
>>> I got below two files under /tmp.
>>>
>>> -rw-r--r-- 1 ywatanabe ywatanabe185 Sep 12 19:56
>>> py-wordcount-direct-1-of-2
>>> -rw-r--r-- 1 ywatanabe ywatanabe190 Sep 12 19:56
>>> py-wordcount-direct-0-of-2
>>>
>>> Then I wrote sample code with below steps.
>>>
>>> 1.Install apache_beam using pip3 separate from source code directory.
>>> 2. Wrote sample code as below and named it
>>> "test-protable-runner.py".  Placed it separate directory from source 
>>> code.
>>>
>>> ---
>>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>>> total 16
>>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>>> code directory)
>>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>>> test-portable-runner.py
>>>
>>> ---
>>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>>
>>>
>>> ==
>>> #!/usr/bin/env
>>>
>>> import apache_beam as beam
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>> from apache_beam.io import WriteToText
>>>
>>>
>>> def printMsg(line):
>>>
>>> print("OUTPUT: {0}".format(line))
>>>
>>> return line
>>>
>>> options = PipelineOptions(["--runner=PortableRunner",
>>> 

Re: How do you write portable runner pipeline on separate python code ?

2019-09-13 Thread Thomas Weise
I agree that loopback would be preferable for this purpose. I just wasn't
aware this even works with the portable Flink runner. Is it one of the best
guarded secrets? ;-)

Kyle, can you please post the pipeline options you would use for Flink?


On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver  wrote:

> I prefer loopback because a) it writes output files to the local
> filesystem, as the user expects, and b) you don't have to pull or build
> docker images, or even have docker installed on your system -- which is one
> less point of failure.
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>
>
> On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise  wrote:
>
>> This should become much better with 2.16 when we have the Docker images
>> prebuilt.
>>
>> Docker is probably still the best option for Python on a JVM based runner
>> in a local environment that does not have a development setup.
>>
>>
>> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver  wrote:
>>
>>> +dev  I think we should probably point new users
>>> of the portable Flink/Spark runners to use loopback or some other
>>> non-docker environment, as Docker adds some operational complexity that
>>> isn't really needed to run a word count example. For example, Yu's pipeline
>>> errored here because the expected Docker container wasn't built before
>>> running.
>>>
>>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>>
>>>
>>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw 
>>> wrote:
>>>
 On this note, making local files easy to read is something we'd
 definitely like to improve, as the current behavior is quite surprising.
 This could be useful not just for running with docker and the portable
 runner locally, but more generally when running on a distributed system
 (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
 could automatically stage local files to be read as artifacts that could be
 consumed by any worker (possibly via external directory mounting in the
 local docker case rather than an actual copy), and conversely copy small
 outputs back to the local machine (with the similar optimization for local
 docker).

 At the very least, however, obvious messaging when the local filesystem
 is used from within docker, which is often a (non-obvious and hard to
 debug) mistake should be added.


 On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik  wrote:

> When you use a local filesystem path and a docker environment, "/tmp"
> is written inside the container. You can solve this issue by:
> * Using a "remote" filesystem such as HDFS/S3/GCS/...
> * Mounting an external directory into the container so that any
> "local" writes appear outside the container
> * Using a non-docker environment such as external or process.
>
> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe 
> wrote:
>
>> Hello.
>>
>> I would like to ask for help with my sample code using portable
>> runner using apache flink.
>> I was able to work out the wordcount.py using this page.
>>
>> https://beam.apache.org/roadmap/portability/
>>
>> I got below two files under /tmp.
>>
>> -rw-r--r-- 1 ywatanabe ywatanabe185 Sep 12 19:56
>> py-wordcount-direct-1-of-2
>> -rw-r--r-- 1 ywatanabe ywatanabe190 Sep 12 19:56
>> py-wordcount-direct-0-of-2
>>
>> Then I wrote sample code with below steps.
>>
>> 1.Install apache_beam using pip3 separate from source code directory.
>> 2. Wrote sample code as below and named it
>> "test-protable-runner.py".  Placed it separate directory from source 
>> code.
>>
>> ---
>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>> total 16
>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source
>> code directory)
>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>> test-portable-runner.py
>>
>> ---
>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>
>>
>> ==
>> #!/usr/bin/env
>>
>> import apache_beam as beam
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.io import WriteToText
>>
>>
>> def printMsg(line):
>>
>> print("OUTPUT: {0}".format(line))
>>
>> return line
>>
>> options = PipelineOptions(["--runner=PortableRunner",
>> "--job_endpoint=localhost:8099", 
>> "--shutdown_sources_on_final_watermark"])
>>
>> p = beam.Pipeline(options=options)
>>
>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>  | beam.Map(printMsg)
>>