Re: The state of external transforms in Beam

2019-11-08 Thread Chamikara Jayalath
Send https://github.com/apache/beam/pull/10054 to update the roadmap.

Thanks,
Cham

On Mon, Nov 4, 2019 at 10:24 AM Chamikara Jayalath 
wrote:

> Makes sense.
>
> I can look into expanding on what we have at following location and adding
> links to some of the existing work as a first step.
> https://beam.apache.org/roadmap/connectors-multi-sdk/
>
> Created https://issues.apache.org/jira/browse/BEAM-8553
>
> We also need more detailed documentation for cross-language transforms but
> that can be separate (and hopefully with help from tech writers who have
> been helping with Beam documentation in general).
>
> Thanks,
> Cham
>
>
> On Sun, Nov 3, 2019 at 7:16 PM Thomas Weise  wrote:
>
>> This thread was very helpful to find more detail in
>> https://jira.apache.org/jira/browse/BEAM-7870
>>
>> It would be great to have cross-language current state mentioned as top
>> level entry on https://beam.apache.org/roadmap/
>>
>>
>> On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath 
>> wrote:
>>
>>> Thanks for the nice write up Chad.
>>>
>>> On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw 
>>> wrote:
>>>
 Thanks for bringing this up again. My thoughts on the open questions
 below.

 On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova 
 wrote:
 > That commit solves 2 problems:
 >
 > Adds the pubsub Java deps so that they’re available in our portable
 pipeline
 > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
 available as a standard coder. This is required because both PubsubIO.Read
 and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
 objects, but only “standard” (i.e. portable) coders can be used, so we have
 to hack it to make PubsubMessage appear as a standard coder.
 >
 > More details:
 >
 > There’s a similar magic commit required for Kafka external transforms
 > The Jira issue for this problem is here:
 https://jira.apache.org/jira/browse/BEAM-7870
 > For problem #2 above there seems to be some consensus forming around
 using Avro or schema/row coders to send compound types in a portable way.
 Here’s the PR for making row coders portable
 > https://github.com/apache/beam/pull/9188

 +1. Note that this doesn't mean that the IO itself must produce rows;
 part of the Schema work in Java is to make it easy to automatically
 convert from various Java classes to schemas transparently, so this
 same logic that would allow one to apply an SQL filter directly to a
 Kafka/PubSub read would allow cross-language. Even if that doesn't
 work, we need not uglify the Java API; we can have an
 option/alternative transform that appends the convert-to-Row DoFn for
 easier use by external (though the goal of the former work is to make
 this step unnecissary).

>>>
>>> Updating all IO connectors / transforms to have a version that
>>> produces/consumes a PCollection is infeasible so I agree that we need
>>> an automatic conversion to/from PCollection possibly by injecting
>>> PTransfroms during ExternalTransform expansion.
>>>

 > I don’t really have any ideas for problem #1

 The crux of the issue here is that the jobs API was not designed with
 cross-language in mind, and so the artifact API ties artifacts to jobs
 rather than to environments. To solve this we need to augment the
 notion of environment to allow the specification of additional
 dependencies (e.g. jar files in this specific case, or better as
 maven/pypi/... dependencies (with version ranges) such that
 environment merging and dependency resolution can be sanely done), and
 a way for the expansion service to provide such dependencies.

 Max wrote up a summary of the prior discussions at

 https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8

 In the short term, one can build a custom docker image that has all
 the requisite dependencies installed.

 This touches on a related but separable issue that one may want to run
 some of these transforms "natively" in the same process as the runner
 (e.g. a Java IO in the Flink Java Runner) rather than via docker.
 (Similarly with subprocess.) Exactly how that works with environment
 specifications is also a bit TBD, but my proposal has been that these
 are best viewed as runner-specific substitutions of standard
 environments.

>>>
>>> We need a permanent solution for this but for now we have a temporary
>>> solution where additional jar files can be specified through an experiment
>>> when running a Python pipeline:
>>> https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55
>>>
>>> Thanks,
>>> Cham
>>>
>>>

 > So the portability expansion system works, and now it’s time to sand
 off some of 

Re: The state of external transforms in Beam

2019-11-04 Thread Chamikara Jayalath
Makes sense.

I can look into expanding on what we have at following location and adding
links to some of the existing work as a first step.
https://beam.apache.org/roadmap/connectors-multi-sdk/

Created https://issues.apache.org/jira/browse/BEAM-8553

We also need more detailed documentation for cross-language transforms but
that can be separate (and hopefully with help from tech writers who have
been helping with Beam documentation in general).

Thanks,
Cham


On Sun, Nov 3, 2019 at 7:16 PM Thomas Weise  wrote:

> This thread was very helpful to find more detail in
> https://jira.apache.org/jira/browse/BEAM-7870
>
> It would be great to have cross-language current state mentioned as top
> level entry on https://beam.apache.org/roadmap/
>
>
> On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath 
> wrote:
>
>> Thanks for the nice write up Chad.
>>
>> On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw 
>> wrote:
>>
>>> Thanks for bringing this up again. My thoughts on the open questions
>>> below.
>>>
>>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova 
>>> wrote:
>>> > That commit solves 2 problems:
>>> >
>>> > Adds the pubsub Java deps so that they’re available in our portable
>>> pipeline
>>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>>> available as a standard coder. This is required because both PubsubIO.Read
>>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>>> to hack it to make PubsubMessage appear as a standard coder.
>>> >
>>> > More details:
>>> >
>>> > There’s a similar magic commit required for Kafka external transforms
>>> > The Jira issue for this problem is here:
>>> https://jira.apache.org/jira/browse/BEAM-7870
>>> > For problem #2 above there seems to be some consensus forming around
>>> using Avro or schema/row coders to send compound types in a portable way.
>>> Here’s the PR for making row coders portable
>>> > https://github.com/apache/beam/pull/9188
>>>
>>> +1. Note that this doesn't mean that the IO itself must produce rows;
>>> part of the Schema work in Java is to make it easy to automatically
>>> convert from various Java classes to schemas transparently, so this
>>> same logic that would allow one to apply an SQL filter directly to a
>>> Kafka/PubSub read would allow cross-language. Even if that doesn't
>>> work, we need not uglify the Java API; we can have an
>>> option/alternative transform that appends the convert-to-Row DoFn for
>>> easier use by external (though the goal of the former work is to make
>>> this step unnecissary).
>>>
>>
>> Updating all IO connectors / transforms to have a version that
>> produces/consumes a PCollection is infeasible so I agree that we need
>> an automatic conversion to/from PCollection possibly by injecting
>> PTransfroms during ExternalTransform expansion.
>>
>>>
>>> > I don’t really have any ideas for problem #1
>>>
>>> The crux of the issue here is that the jobs API was not designed with
>>> cross-language in mind, and so the artifact API ties artifacts to jobs
>>> rather than to environments. To solve this we need to augment the
>>> notion of environment to allow the specification of additional
>>> dependencies (e.g. jar files in this specific case, or better as
>>> maven/pypi/... dependencies (with version ranges) such that
>>> environment merging and dependency resolution can be sanely done), and
>>> a way for the expansion service to provide such dependencies.
>>>
>>> Max wrote up a summary of the prior discussions at
>>>
>>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8
>>>
>>> In the short term, one can build a custom docker image that has all
>>> the requisite dependencies installed.
>>>
>>> This touches on a related but separable issue that one may want to run
>>> some of these transforms "natively" in the same process as the runner
>>> (e.g. a Java IO in the Flink Java Runner) rather than via docker.
>>> (Similarly with subprocess.) Exactly how that works with environment
>>> specifications is also a bit TBD, but my proposal has been that these
>>> are best viewed as runner-specific substitutions of standard
>>> environments.
>>>
>>
>> We need a permanent solution for this but for now we have a temporary
>> solution where additional jar files can be specified through an experiment
>> when running a Python pipeline:
>> https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55
>>
>> Thanks,
>> Cham
>>
>>
>>>
>>> > So the portability expansion system works, and now it’s time to sand
>>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>>> resolve some of these remaining issues.
>>>
>>> +1
>>>
>>>
>>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova 
>>> wrote:
>>> >
>>> > Hi all,
>>> > There was some interest in this topic at the Beam Summi

Re: The state of external transforms in Beam

2019-11-03 Thread Thomas Weise
This thread was very helpful to find more detail in
https://jira.apache.org/jira/browse/BEAM-7870

It would be great to have cross-language current state mentioned as top
level entry on https://beam.apache.org/roadmap/


On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath 
wrote:

> Thanks for the nice write up Chad.
>
> On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw 
> wrote:
>
>> Thanks for bringing this up again. My thoughts on the open questions
>> below.
>>
>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova  wrote:
>> > That commit solves 2 problems:
>> >
>> > Adds the pubsub Java deps so that they’re available in our portable
>> pipeline
>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
>> available as a standard coder. This is required because both PubsubIO.Read
>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
>> objects, but only “standard” (i.e. portable) coders can be used, so we have
>> to hack it to make PubsubMessage appear as a standard coder.
>> >
>> > More details:
>> >
>> > There’s a similar magic commit required for Kafka external transforms
>> > The Jira issue for this problem is here:
>> https://jira.apache.org/jira/browse/BEAM-7870
>> > For problem #2 above there seems to be some consensus forming around
>> using Avro or schema/row coders to send compound types in a portable way.
>> Here’s the PR for making row coders portable
>> > https://github.com/apache/beam/pull/9188
>>
>> +1. Note that this doesn't mean that the IO itself must produce rows;
>> part of the Schema work in Java is to make it easy to automatically
>> convert from various Java classes to schemas transparently, so this
>> same logic that would allow one to apply an SQL filter directly to a
>> Kafka/PubSub read would allow cross-language. Even if that doesn't
>> work, we need not uglify the Java API; we can have an
>> option/alternative transform that appends the convert-to-Row DoFn for
>> easier use by external (though the goal of the former work is to make
>> this step unnecissary).
>>
>
> Updating all IO connectors / transforms to have a version that
> produces/consumes a PCollection is infeasible so I agree that we need
> an automatic conversion to/from PCollection possibly by injecting
> PTransfroms during ExternalTransform expansion.
>
>>
>> > I don’t really have any ideas for problem #1
>>
>> The crux of the issue here is that the jobs API was not designed with
>> cross-language in mind, and so the artifact API ties artifacts to jobs
>> rather than to environments. To solve this we need to augment the
>> notion of environment to allow the specification of additional
>> dependencies (e.g. jar files in this specific case, or better as
>> maven/pypi/... dependencies (with version ranges) such that
>> environment merging and dependency resolution can be sanely done), and
>> a way for the expansion service to provide such dependencies.
>>
>> Max wrote up a summary of the prior discussions at
>>
>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8
>>
>> In the short term, one can build a custom docker image that has all
>> the requisite dependencies installed.
>>
>> This touches on a related but separable issue that one may want to run
>> some of these transforms "natively" in the same process as the runner
>> (e.g. a Java IO in the Flink Java Runner) rather than via docker.
>> (Similarly with subprocess.) Exactly how that works with environment
>> specifications is also a bit TBD, but my proposal has been that these
>> are best viewed as runner-specific substitutions of standard
>> environments.
>>
>
> We need a permanent solution for this but for now we have a temporary
> solution where additional jar files can be specified through an experiment
> when running a Python pipeline:
> https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55
>
> Thanks,
> Cham
>
>
>>
>> > So the portability expansion system works, and now it’s time to sand
>> off some of the rough corners. I’d love to hear others’ thoughts on how to
>> resolve some of these remaining issues.
>>
>> +1
>>
>>
>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova  wrote:
>> >
>> > Hi all,
>> > There was some interest in this topic at the Beam Summit this week
>> (btw, great job to everyone involved!), so I thought I’d try to summarize
>> the current state of things.
>> > First, let me explain the idea behind an external transforms for the
>> uninitiated.
>> >
>> > Problem:
>> >
>> > there’s a transform that you want to use, but it’s not available in
>> your desired language. IO connectors are a good example: there are many
>> available in the Java SDK, but not so much in Python or Go.
>> >
>> > Solution:
>> >
>> > Create a stub transform in your desired language (e.g. Python) whose
>> primary role is to serialize the parameters passed to that transform
>> > When you run you

Re: The state of external transforms in Beam

2019-09-16 Thread Chamikara Jayalath
Thanks for the nice write up Chad.

On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw 
wrote:

> Thanks for bringing this up again. My thoughts on the open questions below.
>
> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova  wrote:
> > That commit solves 2 problems:
> >
> > Adds the pubsub Java deps so that they’re available in our portable
> pipeline
> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
> available as a standard coder. This is required because both PubsubIO.Read
> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
> objects, but only “standard” (i.e. portable) coders can be used, so we have
> to hack it to make PubsubMessage appear as a standard coder.
> >
> > More details:
> >
> > There’s a similar magic commit required for Kafka external transforms
> > The Jira issue for this problem is here:
> https://jira.apache.org/jira/browse/BEAM-7870
> > For problem #2 above there seems to be some consensus forming around
> using Avro or schema/row coders to send compound types in a portable way.
> Here’s the PR for making row coders portable
> > https://github.com/apache/beam/pull/9188
>
> +1. Note that this doesn't mean that the IO itself must produce rows;
> part of the Schema work in Java is to make it easy to automatically
> convert from various Java classes to schemas transparently, so this
> same logic that would allow one to apply an SQL filter directly to a
> Kafka/PubSub read would allow cross-language. Even if that doesn't
> work, we need not uglify the Java API; we can have an
> option/alternative transform that appends the convert-to-Row DoFn for
> easier use by external (though the goal of the former work is to make
> this step unnecissary).
>

Updating all IO connectors / transforms to have a version that
produces/consumes a PCollection is infeasible so I agree that we need
an automatic conversion to/from PCollection possibly by injecting
PTransfroms during ExternalTransform expansion.

>
> > I don’t really have any ideas for problem #1
>
> The crux of the issue here is that the jobs API was not designed with
> cross-language in mind, and so the artifact API ties artifacts to jobs
> rather than to environments. To solve this we need to augment the
> notion of environment to allow the specification of additional
> dependencies (e.g. jar files in this specific case, or better as
> maven/pypi/... dependencies (with version ranges) such that
> environment merging and dependency resolution can be sanely done), and
> a way for the expansion service to provide such dependencies.
>
> Max wrote up a summary of the prior discussions at
>
> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8
>
> In the short term, one can build a custom docker image that has all
> the requisite dependencies installed.
>
> This touches on a related but separable issue that one may want to run
> some of these transforms "natively" in the same process as the runner
> (e.g. a Java IO in the Flink Java Runner) rather than via docker.
> (Similarly with subprocess.) Exactly how that works with environment
> specifications is also a bit TBD, but my proposal has been that these
> are best viewed as runner-specific substitutions of standard
> environments.
>

We need a permanent solution for this but for now we have a temporary
solution where additional jar files can be specified through an experiment
when running a Python pipeline:
https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55

Thanks,
Cham


>
> > So the portability expansion system works, and now it’s time to sand off
> some of the rough corners. I’d love to hear others’ thoughts on how to
> resolve some of these remaining issues.
>
> +1
>
>
> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova  wrote:
> >
> > Hi all,
> > There was some interest in this topic at the Beam Summit this week (btw,
> great job to everyone involved!), so I thought I’d try to summarize the
> current state of things.
> > First, let me explain the idea behind an external transforms for the
> uninitiated.
> >
> > Problem:
> >
> > there’s a transform that you want to use, but it’s not available in your
> desired language. IO connectors are a good example: there are many
> available in the Java SDK, but not so much in Python or Go.
> >
> > Solution:
> >
> > Create a stub transform in your desired language (e.g. Python) whose
> primary role is to serialize the parameters passed to that transform
> > When you run your portable pipeline, just prior to it being sent to the
> Job Service for execution, your stub transform’s payload is first sent to
> the “Expansion Service” that’s running in the native language (Java), where
> the payload is used to construct an instance of the native transform, which
> is then expanded and converted to a protobuf and sent back to the calling
> process (Python).
> > The protobuf repre

Re: The state of external transforms in Beam

2019-09-16 Thread Robert Bradshaw
Thanks for bringing this up again. My thoughts on the open questions below.

On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova  wrote:
> That commit solves 2 problems:
>
> Adds the pubsub Java deps so that they’re available in our portable pipeline
> Makes the coder for the PubsubIO message-holder type, PubsubMessage, 
> available as a standard coder. This is required because both PubsubIO.Read 
> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage 
> objects, but only “standard” (i.e. portable) coders can be used, so we have 
> to hack it to make PubsubMessage appear as a standard coder.
>
> More details:
>
> There’s a similar magic commit required for Kafka external transforms
> The Jira issue for this problem is here: 
> https://jira.apache.org/jira/browse/BEAM-7870
> For problem #2 above there seems to be some consensus forming around using 
> Avro or schema/row coders to send compound types in a portable way. Here’s 
> the PR for making row coders portable
> https://github.com/apache/beam/pull/9188

+1. Note that this doesn't mean that the IO itself must produce rows;
part of the Schema work in Java is to make it easy to automatically
convert from various Java classes to schemas transparently, so this
same logic that would allow one to apply an SQL filter directly to a
Kafka/PubSub read would allow cross-language. Even if that doesn't
work, we need not uglify the Java API; we can have an
option/alternative transform that appends the convert-to-Row DoFn for
easier use by external (though the goal of the former work is to make
this step unnecissary).

> I don’t really have any ideas for problem #1

The crux of the issue here is that the jobs API was not designed with
cross-language in mind, and so the artifact API ties artifacts to jobs
rather than to environments. To solve this we need to augment the
notion of environment to allow the specification of additional
dependencies (e.g. jar files in this specific case, or better as
maven/pypi/... dependencies (with version ranges) such that
environment merging and dependency resolution can be sanely done), and
a way for the expansion service to provide such dependencies.

Max wrote up a summary of the prior discussions at
https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8

In the short term, one can build a custom docker image that has all
the requisite dependencies installed.

This touches on a related but separable issue that one may want to run
some of these transforms "natively" in the same process as the runner
(e.g. a Java IO in the Flink Java Runner) rather than via docker.
(Similarly with subprocess.) Exactly how that works with environment
specifications is also a bit TBD, but my proposal has been that these
are best viewed as runner-specific substitutions of standard
environments.

> So the portability expansion system works, and now it’s time to sand off some 
> of the rough corners. I’d love to hear others’ thoughts on how to resolve 
> some of these remaining issues.

+1


On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova  wrote:
>
> Hi all,
> There was some interest in this topic at the Beam Summit this week (btw, 
> great job to everyone involved!), so I thought I’d try to summarize the 
> current state of things.
> First, let me explain the idea behind an external transforms for the 
> uninitiated.
>
> Problem:
>
> there’s a transform that you want to use, but it’s not available in your 
> desired language. IO connectors are a good example: there are many available 
> in the Java SDK, but not so much in Python or Go.
>
> Solution:
>
> Create a stub transform in your desired language (e.g. Python) whose primary 
> role is to serialize the parameters passed to that transform
> When you run your portable pipeline, just prior to it being sent to the Job 
> Service for execution, your stub transform’s payload is first sent to the 
> “Expansion Service” that’s running in the native language (Java), where the 
> payload is used to construct an instance of the native transform, which is 
> then expanded and converted to a protobuf and sent back to the calling 
> process (Python).
> The protobuf representation of the expanded transform gets integrated back 
> into the pipeline that you’re submitting
> Steps 2-3 are repeated for each external transform in your pipeline
> Then the whole pipeline gets sent to the Job Service to be invoked on 
> Flink/Spark/etc
>
> 
>
> Now on to my journey to get PubsubIO working in python on Flink.
>
> The first issue I encountered was that there was a lot of boilerplate 
> involved in serializing the stub python transform’s parameters so they can be 
> sent to the expansion service.
>
> I created a PR to make this simpler, which has just been merged to master: 
> https://github.com/apache/beam/pull/9098
>
> With this feature in place, if you’re using python 3.7 you can use a 
> dataclass and the typing module to crea