Re: The state of external transforms in Beam
Send https://github.com/apache/beam/pull/10054 to update the roadmap. Thanks, Cham On Mon, Nov 4, 2019 at 10:24 AM Chamikara Jayalath wrote: > Makes sense. > > I can look into expanding on what we have at following location and adding > links to some of the existing work as a first step. > https://beam.apache.org/roadmap/connectors-multi-sdk/ > > Created https://issues.apache.org/jira/browse/BEAM-8553 > > We also need more detailed documentation for cross-language transforms but > that can be separate (and hopefully with help from tech writers who have > been helping with Beam documentation in general). > > Thanks, > Cham > > > On Sun, Nov 3, 2019 at 7:16 PM Thomas Weise wrote: > >> This thread was very helpful to find more detail in >> https://jira.apache.org/jira/browse/BEAM-7870 >> >> It would be great to have cross-language current state mentioned as top >> level entry on https://beam.apache.org/roadmap/ >> >> >> On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath >> wrote: >> >>> Thanks for the nice write up Chad. >>> >>> On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw >>> wrote: >>> Thanks for bringing this up again. My thoughts on the open questions below. On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova wrote: > That commit solves 2 problems: > > Adds the pubsub Java deps so that they’re available in our portable pipeline > Makes the coder for the PubsubIO message-holder type, PubsubMessage, available as a standard coder. This is required because both PubsubIO.Read and PubsubIO.Write expand to ParDos which pass along these PubsubMessage objects, but only “standard” (i.e. portable) coders can be used, so we have to hack it to make PubsubMessage appear as a standard coder. > > More details: > > There’s a similar magic commit required for Kafka external transforms > The Jira issue for this problem is here: https://jira.apache.org/jira/browse/BEAM-7870 > For problem #2 above there seems to be some consensus forming around using Avro or schema/row coders to send compound types in a portable way. Here’s the PR for making row coders portable > https://github.com/apache/beam/pull/9188 +1. Note that this doesn't mean that the IO itself must produce rows; part of the Schema work in Java is to make it easy to automatically convert from various Java classes to schemas transparently, so this same logic that would allow one to apply an SQL filter directly to a Kafka/PubSub read would allow cross-language. Even if that doesn't work, we need not uglify the Java API; we can have an option/alternative transform that appends the convert-to-Row DoFn for easier use by external (though the goal of the former work is to make this step unnecissary). >>> >>> Updating all IO connectors / transforms to have a version that >>> produces/consumes a PCollection is infeasible so I agree that we need >>> an automatic conversion to/from PCollection possibly by injecting >>> PTransfroms during ExternalTransform expansion. >>> > I don’t really have any ideas for problem #1 The crux of the issue here is that the jobs API was not designed with cross-language in mind, and so the artifact API ties artifacts to jobs rather than to environments. To solve this we need to augment the notion of environment to allow the specification of additional dependencies (e.g. jar files in this specific case, or better as maven/pypi/... dependencies (with version ranges) such that environment merging and dependency resolution can be sanely done), and a way for the expansion service to provide such dependencies. Max wrote up a summary of the prior discussions at https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8 In the short term, one can build a custom docker image that has all the requisite dependencies installed. This touches on a related but separable issue that one may want to run some of these transforms "natively" in the same process as the runner (e.g. a Java IO in the Flink Java Runner) rather than via docker. (Similarly with subprocess.) Exactly how that works with environment specifications is also a bit TBD, but my proposal has been that these are best viewed as runner-specific substitutions of standard environments. >>> >>> We need a permanent solution for this but for now we have a temporary >>> solution where additional jar files can be specified through an experiment >>> when running a Python pipeline: >>> https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55 >>> >>> Thanks, >>> Cham >>> >>> > So the portability expansion system works, and now it’s time to sand off some of
Re: The state of external transforms in Beam
Makes sense. I can look into expanding on what we have at following location and adding links to some of the existing work as a first step. https://beam.apache.org/roadmap/connectors-multi-sdk/ Created https://issues.apache.org/jira/browse/BEAM-8553 We also need more detailed documentation for cross-language transforms but that can be separate (and hopefully with help from tech writers who have been helping with Beam documentation in general). Thanks, Cham On Sun, Nov 3, 2019 at 7:16 PM Thomas Weise wrote: > This thread was very helpful to find more detail in > https://jira.apache.org/jira/browse/BEAM-7870 > > It would be great to have cross-language current state mentioned as top > level entry on https://beam.apache.org/roadmap/ > > > On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath > wrote: > >> Thanks for the nice write up Chad. >> >> On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw >> wrote: >> >>> Thanks for bringing this up again. My thoughts on the open questions >>> below. >>> >>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova >>> wrote: >>> > That commit solves 2 problems: >>> > >>> > Adds the pubsub Java deps so that they’re available in our portable >>> pipeline >>> > Makes the coder for the PubsubIO message-holder type, PubsubMessage, >>> available as a standard coder. This is required because both PubsubIO.Read >>> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage >>> objects, but only “standard” (i.e. portable) coders can be used, so we have >>> to hack it to make PubsubMessage appear as a standard coder. >>> > >>> > More details: >>> > >>> > There’s a similar magic commit required for Kafka external transforms >>> > The Jira issue for this problem is here: >>> https://jira.apache.org/jira/browse/BEAM-7870 >>> > For problem #2 above there seems to be some consensus forming around >>> using Avro or schema/row coders to send compound types in a portable way. >>> Here’s the PR for making row coders portable >>> > https://github.com/apache/beam/pull/9188 >>> >>> +1. Note that this doesn't mean that the IO itself must produce rows; >>> part of the Schema work in Java is to make it easy to automatically >>> convert from various Java classes to schemas transparently, so this >>> same logic that would allow one to apply an SQL filter directly to a >>> Kafka/PubSub read would allow cross-language. Even if that doesn't >>> work, we need not uglify the Java API; we can have an >>> option/alternative transform that appends the convert-to-Row DoFn for >>> easier use by external (though the goal of the former work is to make >>> this step unnecissary). >>> >> >> Updating all IO connectors / transforms to have a version that >> produces/consumes a PCollection is infeasible so I agree that we need >> an automatic conversion to/from PCollection possibly by injecting >> PTransfroms during ExternalTransform expansion. >> >>> >>> > I don’t really have any ideas for problem #1 >>> >>> The crux of the issue here is that the jobs API was not designed with >>> cross-language in mind, and so the artifact API ties artifacts to jobs >>> rather than to environments. To solve this we need to augment the >>> notion of environment to allow the specification of additional >>> dependencies (e.g. jar files in this specific case, or better as >>> maven/pypi/... dependencies (with version ranges) such that >>> environment merging and dependency resolution can be sanely done), and >>> a way for the expansion service to provide such dependencies. >>> >>> Max wrote up a summary of the prior discussions at >>> >>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8 >>> >>> In the short term, one can build a custom docker image that has all >>> the requisite dependencies installed. >>> >>> This touches on a related but separable issue that one may want to run >>> some of these transforms "natively" in the same process as the runner >>> (e.g. a Java IO in the Flink Java Runner) rather than via docker. >>> (Similarly with subprocess.) Exactly how that works with environment >>> specifications is also a bit TBD, but my proposal has been that these >>> are best viewed as runner-specific substitutions of standard >>> environments. >>> >> >> We need a permanent solution for this but for now we have a temporary >> solution where additional jar files can be specified through an experiment >> when running a Python pipeline: >> https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55 >> >> Thanks, >> Cham >> >> >>> >>> > So the portability expansion system works, and now it’s time to sand >>> off some of the rough corners. I’d love to hear others’ thoughts on how to >>> resolve some of these remaining issues. >>> >>> +1 >>> >>> >>> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova >>> wrote: >>> > >>> > Hi all, >>> > There was some interest in this topic at the Beam Summi
Re: The state of external transforms in Beam
This thread was very helpful to find more detail in https://jira.apache.org/jira/browse/BEAM-7870 It would be great to have cross-language current state mentioned as top level entry on https://beam.apache.org/roadmap/ On Mon, Sep 16, 2019 at 6:07 PM Chamikara Jayalath wrote: > Thanks for the nice write up Chad. > > On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw > wrote: > >> Thanks for bringing this up again. My thoughts on the open questions >> below. >> >> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova wrote: >> > That commit solves 2 problems: >> > >> > Adds the pubsub Java deps so that they’re available in our portable >> pipeline >> > Makes the coder for the PubsubIO message-holder type, PubsubMessage, >> available as a standard coder. This is required because both PubsubIO.Read >> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage >> objects, but only “standard” (i.e. portable) coders can be used, so we have >> to hack it to make PubsubMessage appear as a standard coder. >> > >> > More details: >> > >> > There’s a similar magic commit required for Kafka external transforms >> > The Jira issue for this problem is here: >> https://jira.apache.org/jira/browse/BEAM-7870 >> > For problem #2 above there seems to be some consensus forming around >> using Avro or schema/row coders to send compound types in a portable way. >> Here’s the PR for making row coders portable >> > https://github.com/apache/beam/pull/9188 >> >> +1. Note that this doesn't mean that the IO itself must produce rows; >> part of the Schema work in Java is to make it easy to automatically >> convert from various Java classes to schemas transparently, so this >> same logic that would allow one to apply an SQL filter directly to a >> Kafka/PubSub read would allow cross-language. Even if that doesn't >> work, we need not uglify the Java API; we can have an >> option/alternative transform that appends the convert-to-Row DoFn for >> easier use by external (though the goal of the former work is to make >> this step unnecissary). >> > > Updating all IO connectors / transforms to have a version that > produces/consumes a PCollection is infeasible so I agree that we need > an automatic conversion to/from PCollection possibly by injecting > PTransfroms during ExternalTransform expansion. > >> >> > I don’t really have any ideas for problem #1 >> >> The crux of the issue here is that the jobs API was not designed with >> cross-language in mind, and so the artifact API ties artifacts to jobs >> rather than to environments. To solve this we need to augment the >> notion of environment to allow the specification of additional >> dependencies (e.g. jar files in this specific case, or better as >> maven/pypi/... dependencies (with version ranges) such that >> environment merging and dependency resolution can be sanely done), and >> a way for the expansion service to provide such dependencies. >> >> Max wrote up a summary of the prior discussions at >> >> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8 >> >> In the short term, one can build a custom docker image that has all >> the requisite dependencies installed. >> >> This touches on a related but separable issue that one may want to run >> some of these transforms "natively" in the same process as the runner >> (e.g. a Java IO in the Flink Java Runner) rather than via docker. >> (Similarly with subprocess.) Exactly how that works with environment >> specifications is also a bit TBD, but my proposal has been that these >> are best viewed as runner-specific substitutions of standard >> environments. >> > > We need a permanent solution for this but for now we have a temporary > solution where additional jar files can be specified through an experiment > when running a Python pipeline: > https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55 > > Thanks, > Cham > > >> >> > So the portability expansion system works, and now it’s time to sand >> off some of the rough corners. I’d love to hear others’ thoughts on how to >> resolve some of these remaining issues. >> >> +1 >> >> >> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova wrote: >> > >> > Hi all, >> > There was some interest in this topic at the Beam Summit this week >> (btw, great job to everyone involved!), so I thought I’d try to summarize >> the current state of things. >> > First, let me explain the idea behind an external transforms for the >> uninitiated. >> > >> > Problem: >> > >> > there’s a transform that you want to use, but it’s not available in >> your desired language. IO connectors are a good example: there are many >> available in the Java SDK, but not so much in Python or Go. >> > >> > Solution: >> > >> > Create a stub transform in your desired language (e.g. Python) whose >> primary role is to serialize the parameters passed to that transform >> > When you run you
Re: The state of external transforms in Beam
Thanks for the nice write up Chad. On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw wrote: > Thanks for bringing this up again. My thoughts on the open questions below. > > On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova wrote: > > That commit solves 2 problems: > > > > Adds the pubsub Java deps so that they’re available in our portable > pipeline > > Makes the coder for the PubsubIO message-holder type, PubsubMessage, > available as a standard coder. This is required because both PubsubIO.Read > and PubsubIO.Write expand to ParDos which pass along these PubsubMessage > objects, but only “standard” (i.e. portable) coders can be used, so we have > to hack it to make PubsubMessage appear as a standard coder. > > > > More details: > > > > There’s a similar magic commit required for Kafka external transforms > > The Jira issue for this problem is here: > https://jira.apache.org/jira/browse/BEAM-7870 > > For problem #2 above there seems to be some consensus forming around > using Avro or schema/row coders to send compound types in a portable way. > Here’s the PR for making row coders portable > > https://github.com/apache/beam/pull/9188 > > +1. Note that this doesn't mean that the IO itself must produce rows; > part of the Schema work in Java is to make it easy to automatically > convert from various Java classes to schemas transparently, so this > same logic that would allow one to apply an SQL filter directly to a > Kafka/PubSub read would allow cross-language. Even if that doesn't > work, we need not uglify the Java API; we can have an > option/alternative transform that appends the convert-to-Row DoFn for > easier use by external (though the goal of the former work is to make > this step unnecissary). > Updating all IO connectors / transforms to have a version that produces/consumes a PCollection is infeasible so I agree that we need an automatic conversion to/from PCollection possibly by injecting PTransfroms during ExternalTransform expansion. > > > I don’t really have any ideas for problem #1 > > The crux of the issue here is that the jobs API was not designed with > cross-language in mind, and so the artifact API ties artifacts to jobs > rather than to environments. To solve this we need to augment the > notion of environment to allow the specification of additional > dependencies (e.g. jar files in this specific case, or better as > maven/pypi/... dependencies (with version ranges) such that > environment merging and dependency resolution can be sanely done), and > a way for the expansion service to provide such dependencies. > > Max wrote up a summary of the prior discussions at > > https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8 > > In the short term, one can build a custom docker image that has all > the requisite dependencies installed. > > This touches on a related but separable issue that one may want to run > some of these transforms "natively" in the same process as the runner > (e.g. a Java IO in the Flink Java Runner) rather than via docker. > (Similarly with subprocess.) Exactly how that works with environment > specifications is also a bit TBD, but my proposal has been that these > are best viewed as runner-specific substitutions of standard > environments. > We need a permanent solution for this but for now we have a temporary solution where additional jar files can be specified through an experiment when running a Python pipeline: https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55 Thanks, Cham > > > So the portability expansion system works, and now it’s time to sand off > some of the rough corners. I’d love to hear others’ thoughts on how to > resolve some of these remaining issues. > > +1 > > > On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova wrote: > > > > Hi all, > > There was some interest in this topic at the Beam Summit this week (btw, > great job to everyone involved!), so I thought I’d try to summarize the > current state of things. > > First, let me explain the idea behind an external transforms for the > uninitiated. > > > > Problem: > > > > there’s a transform that you want to use, but it’s not available in your > desired language. IO connectors are a good example: there are many > available in the Java SDK, but not so much in Python or Go. > > > > Solution: > > > > Create a stub transform in your desired language (e.g. Python) whose > primary role is to serialize the parameters passed to that transform > > When you run your portable pipeline, just prior to it being sent to the > Job Service for execution, your stub transform’s payload is first sent to > the “Expansion Service” that’s running in the native language (Java), where > the payload is used to construct an instance of the native transform, which > is then expanded and converted to a protobuf and sent back to the calling > process (Python). > > The protobuf repre
Re: The state of external transforms in Beam
Thanks for bringing this up again. My thoughts on the open questions below. On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova wrote: > That commit solves 2 problems: > > Adds the pubsub Java deps so that they’re available in our portable pipeline > Makes the coder for the PubsubIO message-holder type, PubsubMessage, > available as a standard coder. This is required because both PubsubIO.Read > and PubsubIO.Write expand to ParDos which pass along these PubsubMessage > objects, but only “standard” (i.e. portable) coders can be used, so we have > to hack it to make PubsubMessage appear as a standard coder. > > More details: > > There’s a similar magic commit required for Kafka external transforms > The Jira issue for this problem is here: > https://jira.apache.org/jira/browse/BEAM-7870 > For problem #2 above there seems to be some consensus forming around using > Avro or schema/row coders to send compound types in a portable way. Here’s > the PR for making row coders portable > https://github.com/apache/beam/pull/9188 +1. Note that this doesn't mean that the IO itself must produce rows; part of the Schema work in Java is to make it easy to automatically convert from various Java classes to schemas transparently, so this same logic that would allow one to apply an SQL filter directly to a Kafka/PubSub read would allow cross-language. Even if that doesn't work, we need not uglify the Java API; we can have an option/alternative transform that appends the convert-to-Row DoFn for easier use by external (though the goal of the former work is to make this step unnecissary). > I don’t really have any ideas for problem #1 The crux of the issue here is that the jobs API was not designed with cross-language in mind, and so the artifact API ties artifacts to jobs rather than to environments. To solve this we need to augment the notion of environment to allow the specification of additional dependencies (e.g. jar files in this specific case, or better as maven/pypi/... dependencies (with version ranges) such that environment merging and dependency resolution can be sanely done), and a way for the expansion service to provide such dependencies. Max wrote up a summary of the prior discussions at https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8 In the short term, one can build a custom docker image that has all the requisite dependencies installed. This touches on a related but separable issue that one may want to run some of these transforms "natively" in the same process as the runner (e.g. a Java IO in the Flink Java Runner) rather than via docker. (Similarly with subprocess.) Exactly how that works with environment specifications is also a bit TBD, but my proposal has been that these are best viewed as runner-specific substitutions of standard environments. > So the portability expansion system works, and now it’s time to sand off some > of the rough corners. I’d love to hear others’ thoughts on how to resolve > some of these remaining issues. +1 On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova wrote: > > Hi all, > There was some interest in this topic at the Beam Summit this week (btw, > great job to everyone involved!), so I thought I’d try to summarize the > current state of things. > First, let me explain the idea behind an external transforms for the > uninitiated. > > Problem: > > there’s a transform that you want to use, but it’s not available in your > desired language. IO connectors are a good example: there are many available > in the Java SDK, but not so much in Python or Go. > > Solution: > > Create a stub transform in your desired language (e.g. Python) whose primary > role is to serialize the parameters passed to that transform > When you run your portable pipeline, just prior to it being sent to the Job > Service for execution, your stub transform’s payload is first sent to the > “Expansion Service” that’s running in the native language (Java), where the > payload is used to construct an instance of the native transform, which is > then expanded and converted to a protobuf and sent back to the calling > process (Python). > The protobuf representation of the expanded transform gets integrated back > into the pipeline that you’re submitting > Steps 2-3 are repeated for each external transform in your pipeline > Then the whole pipeline gets sent to the Job Service to be invoked on > Flink/Spark/etc > > > > Now on to my journey to get PubsubIO working in python on Flink. > > The first issue I encountered was that there was a lot of boilerplate > involved in serializing the stub python transform’s parameters so they can be > sent to the expansion service. > > I created a PR to make this simpler, which has just been merged to master: > https://github.com/apache/beam/pull/9098 > > With this feature in place, if you’re using python 3.7 you can use a > dataclass and the typing module to crea