Re: [DISCUSS] Should File based IOs implement readAll() or just readFiles()

2019-02-01 Thread Chamikara Jayalath
Python SDK doesn't have FileIO yet so let's keep ReadAllFromFoo transforms
currently available for various file types around till we have that.

Thanks,
Cham

On Fri, Feb 1, 2019 at 7:41 AM Jean-Baptiste Onofré  wrote:

> Hi,
>
> readFiles() should be used IMHO. We should remove readAll() to avoid
> confusion.
>
> Regards
> JB
>
> On 30/01/2019 17:25, Ismaël Mejía wrote:
> > Hello,
> >
> > A ‘recent’ pattern of use in Beam is to have in file based IOs a
> > `readAll()` implementation that basically matches a `PCollection` of
> > file patterns and reads them, e.g. `TextIO`, `AvroIO`. `ReadAll` is
> > implemented by a expand function that matches files with FileIO and
> > then reads them using a format specific `ReadFiles` transform e.g.
> > TextIO.ReadFiles, AvroIO.ReadFiles. So in the end `ReadAll` in the
> > Java implementation is just an user friendly API to hide FileIO.match
> > + ReadFiles.
> >
> > Most recent IOs do NOT implement ReadAll to encourage the more
> > composable approach of File + ReadFiles, e.g. XmlIO and ParquetIO.
> >
> > Implementing ReadAll as a wrapper is relatively easy and is definitely
> > user friendly, but it has an  issue, it may be error-prone and it adds
> > more code to maintain (mostly ‘repeated’ code). However `readAll` is a
> > more abstract pattern that applies not only to File based IOs so it
> > makes sense for example in other transforms that map a `Pcollection`
> > of read requests and is the basis for SDF composable style APIs like
> > the recent `HBaseIO.readAll()`.
> >
> > So the question is should we:
> >
> > [1] Implement `readAll` in all file based IOs to be user friendly and
> > assume the (minor) maintenance cost
> >
> > or
> >
> > [2] Deprecate `readAll` from file based IOs and encourage users to use
> > FileIO + `readFiles` (less maintenance and encourage composition).
> >
> > I just checked quickly in the python code base but I did not find if
> > the File match + ReadFiles pattern applies, but it would be nice to
> > see what the python guys think on this too.
> >
> > This discussion comes from a recent slack conversation with Łukasz
> > Gajowy, and we wanted to settle into one approach to make the IO
> > signatures consistent, so any opinions/preferences?
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Beam Python streaming pipeline on Flink Runner

2019-02-01 Thread Robert Bradshaw
On Fri, Feb 1, 2019 at 5:42 PM Thomas Weise  wrote:

>
> On Fri, Feb 1, 2019 at 6:17 AM Maximilian Michels  wrote:
>
>> > Max thanks for your summary. I would like to add that we agree that
>> > the runner specific translation via URN is a temporal solution until
>> > the wrappers transforms are written, is this correct? In any case this
>> > alternative standard expansion approach deserves a discussion of their
>> > own as you mention.
>>
>> Correct. Wrapping existing Beam transforms should always be preferred
>> over Runner-specific translation because the latter is not portable.
>>
>>
> From a Python user perspective, this can still be exposed as a stub,
> without having to know about the URN.
>

Yep. In the long run, I'd expect many sources to be offered as their own
easy-to-use stubs.


> Also, isn't how we expose this is orthogonal to how it is being translated?
>

Yes.


> It may even be possible to switch the stub to SDF based translation once
> that is ready.
>

Yep. The expansion would change, but that's all an internal detail iside
the composite the user doesn't care about.


>
>
>> On 01.02.19 14:25, Ismaël Mejía wrote:
>> > Thanks for the explanation Robert it makes much more sense now. (Sorry
>> > for the confusion in the mapping I mistyped the direction SDF <->
>> > Source).
>> >
>> > Status of SDF:
>> > - Support for Dynamic Work Rebalancing is WIP.
>> > - Bounded version translation is supported by all non-portable runners
>> > in a relatively naive way.
>> > - Unbounded version translation is not supported in the non-portable
>> > runners. (Let's not forget that this case may make sense too).
>> > - Portable runners translation of SDF is WIP
>> > - There is only one IO that is written based on SDF:
>> >- HBaseIO
>> > - Some other IOs should work out of the box (those based on
>> > non-splittable DoFn):
>> >- ClickhouseIO
>> >- File-based ones: TextIO, AvroIO, ParquetIO
>> >- JdbcIO
>> >- SolrIO
>> >
>> > Max thanks for your summary. I would like to add that we agree that
>> > the runner specific translation via URN is a temporal solution until
>> > the wrappers transforms are written, is this correct? In any case this
>> > alternative standard expansion approach deserves a discussion of their
>> > own as you mention.
>> >
>> > On Fri, Feb 1, 2019 at 2:02 PM Robert Bradshaw 
>> wrote:
>> >>
>> >> Are you suggesting something akin to a generic
>> >>
>> >>  urn: JsonConfiguredJavaSource
>> >>  payload: some json specifying which source and which parameters
>> >>
>> >> which would expand to actually constructing and applying that source?
>> >>
>> >> (FWIW, I was imagining PubSubIO already had a translation into
>> BeamFnApi protos that fully specified it, and we use that same format to
>> translate back out.)
>> >>
>> >> On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels 
>> wrote:
>> >>>
>> >>> Recaping here:
>> >>>
>> >>> We all agree that SDF is the way to go for future implementations of
>> >>> sources. It enables us to get rid of the source interfaces. However,
>> SDF
>> >>> does not solve the lack of streaming sources in Python.
>> >>>
>> >>> The expansion PR (thanks btw!) solves the problem of
>> >>> expanding/translating URNs known to an ExpansionService. That is a
>> more
>> >>> programmatic way of replacing language-specific performs, instead of
>> >>> relying on translators directly in the Runner.
>> >>>
>> >>> What is unsolved is the configuration of sources from a foreign
>> >>> environment. In my opinion this is the most pressing issue for Python
>> >>> sources, because what is PubSubIO worth in Python if you cannot
>> >>> configure it?
>> >>>
>> >>> What about this:
>> >>>
>> >>> I think it is worth adding a JSON configuration option for all
>> existing
>> >>> Java sources. That way, we could easily configure them as part of the
>> >>> expansion request (which would contain a JSON configuration). I'll
>> >>> probably fork a thread to discuss this in more detail, but would like
>> to
>> >>> hear your thoughts.
>> >>>
>> >>> -Max
>> >>>
>> >>> On 01.02.19 13:08, Robert Bradshaw wrote:
>>  On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels >  > wrote:
>> 
>>   Ah, I thought you meant native Flink transforms.
>> 
>>   Exactly! The translation code is already there. The main
>> challenge
>>   is how to
>>   programmatically configure the BeamIO from Python. I suppose
>> that is
>>   also an
>>   unsolved problem for cross-language transforms in general.
>> 
>> 
>>  This is what https://github.com/apache/beam/pull/7316 does.
>> 
>>  For a particular source, one would want to define a URN and
>>  corresponding payload, then (probably) a CompositeTransform in Python
>>  that takes the users arguments, packages them into the payload,
>> applies
>>  the ExternalTransform, and returns the results. How to handle
>> arbitrary
>>  UDFs 

Re: Beam Python streaming pipeline on Flink Runner

2019-02-01 Thread Thomas Weise
On Fri, Feb 1, 2019 at 6:12 AM Maximilian Michels  wrote:

> Yes, I imagine sources to implement a JsonConfigurable interface (e.g.
> on their builders):
>
> JsonConfigurable {
>// Either a json string or Map
>apply(String jsonConfig);
> }
>
> In Python we would create this transform:
>
> URN: JsonConfiguredSource:v1
> payload: {
> environment: environment_id, // Java/Python/Go
> resourceIdentifier: string,  // "org.apache.beam.io.PubSubIO"
> configuration: json config,  // { "topic" : "my_pubsub_topic" }
> }
>
> That's more generic and could be used for other languages where we might
> have sources/sinks.
>

Looks good!

I imagine there would be a wrapper for this similar to:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/flink/flink_streaming_impulse_source.py#L33


> > (FWIW, I was imagining PubSubIO already had a translation into BeamFnApi
> protos that fully specified it, and we use that same format to translate
> back out.)
>
> Not that I know of.
>
> On 01.02.19 14:02, Robert Bradshaw wrote:
> > Are you suggesting something akin to a generic
> >
> >  urn: JsonConfiguredJavaSource
> >  payload: some json specifying which source and which parameters
> >
> > which would expand to actually constructing and applying that source?
> >
> > (FWIW, I was imagining PubSubIO already had a translation into BeamFnApi
> > protos that fully specified it, and we use that same format to translate
> > back out.)
> >
> > On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels  > > wrote:
> >
> > Recaping here:
> >
> > We all agree that SDF is the way to go for future implementations of
> > sources. It enables us to get rid of the source interfaces. However,
> > SDF
> > does not solve the lack of streaming sources in Python.
> >
> > The expansion PR (thanks btw!) solves the problem of
> > expanding/translating URNs known to an ExpansionService. That is a
> more
> > programmatic way of replacing language-specific performs, instead of
> > relying on translators directly in the Runner.
> >
> > What is unsolved is the configuration of sources from a foreign
> > environment. In my opinion this is the most pressing issue for Python
> > sources, because what is PubSubIO worth in Python if you cannot
> > configure it?
> >
> > What about this:
> >
> > I think it is worth adding a JSON configuration option for all
> existing
> > Java sources. That way, we could easily configure them as part of the
> > expansion request (which would contain a JSON configuration). I'll
> > probably fork a thread to discuss this in more detail, but would
> > like to
> > hear your thoughts.
> >
> > -Max
> >
> > On 01.02.19 13:08, Robert Bradshaw wrote:
> >  > On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels
> > mailto:m...@apache.org>
> >  > >> wrote:
> >  >
> >  > Ah, I thought you meant native Flink transforms.
> >  >
> >  > Exactly! The translation code is already there. The main
> > challenge
> >  > is how to
> >  > programmatically configure the BeamIO from Python. I suppose
> > that is
> >  > also an
> >  > unsolved problem for cross-language transforms in general.
> >  >
> >  >
> >  > This is what https://github.com/apache/beam/pull/7316 does.
> >  >
> >  > For a particular source, one would want to define a URN and
> >  > corresponding payload, then (probably) a CompositeTransform in
> > Python
> >  > that takes the users arguments, packages them into the payload,
> > applies
> >  > the ExternalTransform, and returns the results. How to handle
> > arbitrary
> >  > UDFs embedded in sources is still TBD.
> >  >
> >  > For Matthias' pipeline with PubSubIO we can build something
> >  > specific, but for
> >  > the general case there should be way to initialize a Beam IO
> > via a
> >  > configuration
> >  > map provided by an external environment.
> >  >
> >  >
> >  > I thought quite a bit about how we could represent expansions
> > statically
> >  > (e.g. have some kind of expansion template that could be used, at
> > least
> >  > in many cases, as data without firing up a separate process. May
> be
> >  > worth doing eventually, but we run into the same issues that were
> >  > discussed at
> >  > https://github.com/apache/beam/pull/7316#discussion_r249996455 ).
> >  >
> >  > If one is already using a portable runner like Flink, having the
> job
> >  > service process automatically also serve up an expansion service
> for
> >  > various URNs it knows and cares about is probably a pretty low
> bar.
> >  > Flink could serve up things it would rather get back untouched in
> a
> >  > transform with a special flink 

Re: Beam Python streaming pipeline on Flink Runner

2019-02-01 Thread Thomas Weise
On Fri, Feb 1, 2019 at 6:17 AM Maximilian Michels  wrote:

> > Max thanks for your summary. I would like to add that we agree that
> > the runner specific translation via URN is a temporal solution until
> > the wrappers transforms are written, is this correct? In any case this
> > alternative standard expansion approach deserves a discussion of their
> > own as you mention.
>
> Correct. Wrapping existing Beam transforms should always be preferred
> over Runner-specific translation because the latter is not portable.
>
>
>From a Python user perspective, this can still be exposed as a stub,
without having to know about the URN.

Also, isn't how we expose this is orthogonal to how it is being translated?

It may even be possible to switch the stub to SDF based translation once
that is ready.


> On 01.02.19 14:25, Ismaël Mejía wrote:
> > Thanks for the explanation Robert it makes much more sense now. (Sorry
> > for the confusion in the mapping I mistyped the direction SDF <->
> > Source).
> >
> > Status of SDF:
> > - Support for Dynamic Work Rebalancing is WIP.
> > - Bounded version translation is supported by all non-portable runners
> > in a relatively naive way.
> > - Unbounded version translation is not supported in the non-portable
> > runners. (Let's not forget that this case may make sense too).
> > - Portable runners translation of SDF is WIP
> > - There is only one IO that is written based on SDF:
> >- HBaseIO
> > - Some other IOs should work out of the box (those based on
> > non-splittable DoFn):
> >- ClickhouseIO
> >- File-based ones: TextIO, AvroIO, ParquetIO
> >- JdbcIO
> >- SolrIO
> >
> > Max thanks for your summary. I would like to add that we agree that
> > the runner specific translation via URN is a temporal solution until
> > the wrappers transforms are written, is this correct? In any case this
> > alternative standard expansion approach deserves a discussion of their
> > own as you mention.
> >
> > On Fri, Feb 1, 2019 at 2:02 PM Robert Bradshaw 
> wrote:
> >>
> >> Are you suggesting something akin to a generic
> >>
> >>  urn: JsonConfiguredJavaSource
> >>  payload: some json specifying which source and which parameters
> >>
> >> which would expand to actually constructing and applying that source?
> >>
> >> (FWIW, I was imagining PubSubIO already had a translation into
> BeamFnApi protos that fully specified it, and we use that same format to
> translate back out.)
> >>
> >> On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels 
> wrote:
> >>>
> >>> Recaping here:
> >>>
> >>> We all agree that SDF is the way to go for future implementations of
> >>> sources. It enables us to get rid of the source interfaces. However,
> SDF
> >>> does not solve the lack of streaming sources in Python.
> >>>
> >>> The expansion PR (thanks btw!) solves the problem of
> >>> expanding/translating URNs known to an ExpansionService. That is a more
> >>> programmatic way of replacing language-specific performs, instead of
> >>> relying on translators directly in the Runner.
> >>>
> >>> What is unsolved is the configuration of sources from a foreign
> >>> environment. In my opinion this is the most pressing issue for Python
> >>> sources, because what is PubSubIO worth in Python if you cannot
> >>> configure it?
> >>>
> >>> What about this:
> >>>
> >>> I think it is worth adding a JSON configuration option for all existing
> >>> Java sources. That way, we could easily configure them as part of the
> >>> expansion request (which would contain a JSON configuration). I'll
> >>> probably fork a thread to discuss this in more detail, but would like
> to
> >>> hear your thoughts.
> >>>
> >>> -Max
> >>>
> >>> On 01.02.19 13:08, Robert Bradshaw wrote:
>  On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels   > wrote:
> 
>   Ah, I thought you meant native Flink transforms.
> 
>   Exactly! The translation code is already there. The main
> challenge
>   is how to
>   programmatically configure the BeamIO from Python. I suppose
> that is
>   also an
>   unsolved problem for cross-language transforms in general.
> 
> 
>  This is what https://github.com/apache/beam/pull/7316 does.
> 
>  For a particular source, one would want to define a URN and
>  corresponding payload, then (probably) a CompositeTransform in Python
>  that takes the users arguments, packages them into the payload,
> applies
>  the ExternalTransform, and returns the results. How to handle
> arbitrary
>  UDFs embedded in sources is still TBD.
> 
>   For Matthias' pipeline with PubSubIO we can build something
>   specific, but for
>   the general case there should be way to initialize a Beam IO via
> a
>   configuration
>   map provided by an external environment.
> 
> 
>  I thought quite a bit about how we could represent expansions
> statically
>  (e.g. 

Re: [DISCUSS] Should File based IOs implement readAll() or just readFiles()

2019-02-01 Thread Jean-Baptiste Onofré
Hi,

readFiles() should be used IMHO. We should remove readAll() to avoid
confusion.

Regards
JB

On 30/01/2019 17:25, Ismaël Mejía wrote:
> Hello,
> 
> A ‘recent’ pattern of use in Beam is to have in file based IOs a
> `readAll()` implementation that basically matches a `PCollection` of
> file patterns and reads them, e.g. `TextIO`, `AvroIO`. `ReadAll` is
> implemented by a expand function that matches files with FileIO and
> then reads them using a format specific `ReadFiles` transform e.g.
> TextIO.ReadFiles, AvroIO.ReadFiles. So in the end `ReadAll` in the
> Java implementation is just an user friendly API to hide FileIO.match
> + ReadFiles.
> 
> Most recent IOs do NOT implement ReadAll to encourage the more
> composable approach of File + ReadFiles, e.g. XmlIO and ParquetIO.
> 
> Implementing ReadAll as a wrapper is relatively easy and is definitely
> user friendly, but it has an  issue, it may be error-prone and it adds
> more code to maintain (mostly ‘repeated’ code). However `readAll` is a
> more abstract pattern that applies not only to File based IOs so it
> makes sense for example in other transforms that map a `Pcollection`
> of read requests and is the basis for SDF composable style APIs like
> the recent `HBaseIO.readAll()`.
> 
> So the question is should we:
> 
> [1] Implement `readAll` in all file based IOs to be user friendly and
> assume the (minor) maintenance cost
> 
> or
> 
> [2] Deprecate `readAll` from file based IOs and encourage users to use
> FileIO + `readFiles` (less maintenance and encourage composition).
> 
> I just checked quickly in the python code base but I did not find if
> the File match + ReadFiles pattern applies, but it would be nice to
> see what the python guys think on this too.
> 
> This discussion comes from a recent slack conversation with Łukasz
> Gajowy, and we wanted to settle into one approach to make the IO
> signatures consistent, so any opinions/preferences?
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: [DISCUSS] Should File based IOs implement readAll() or just readFiles()

2019-02-01 Thread Łukasz Gajowy
+1 to deprecating and not implementing readAll() in transforms where
it is equivalent to matchAll() + readMatches() + readFiles().

It encourages and advertises the use of a nice, composable api reducing the
amount of code to be maintained.

can the Python/Go SDK align with this?

+ 1 to that too.

pt., 1 lut 2019 o 14:56 Ismaël Mejía  napisał(a):

> I want to chime in that I am also +1 to deprecating readAll, is there
> anyone strongly pro readAll instead of the explicit composition?
> And more important, can the Python/Go SDK align with this (deprecating
> ReadAll and implementing ReadFiles)?
>
> On Thu, Jan 31, 2019 at 12:34 AM Chamikara Jayalath
>  wrote:
> >
> > Thanks for the clarification Ismaël and Eugene. +1 for deprecating
> existing FooIO.readAll() transforms in favor of FooIO.readFiles().
> >
> > On Wed, Jan 30, 2019 at 3:25 PM Eugene Kirpichov 
> wrote:
> >>
> >> TextIO.read() and AvroIO.read() indeed perform better than match() +
> readMatches() + readFiles(), due to DWR - so for these two in particular I
> would not recommend such a refactoring.
> >> However, new file-based IOs that do not support DWR should only provide
> readFiles(). Those that do, should provide read() and readFiles(). When SDF
> supports DWR, then readFiles() will be enough in all cases.
> >> In general there's no need for readAll() for new file-based IOs - it is
> always equivalent to matchAll() + readMatches() + readFiles() including
> performance-wise. It was included in TextIO/AvroIO before readFiles() was a
> thing.
> >>
> >> On Wed, Jan 30, 2019 at 2:41 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >>>
> >>> On Wed, Jan 30, 2019 at 2:37 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
> 
> 
> 
>  On Wed, Jan 30, 2019 at 2:33 PM Ismaël Mejía 
> wrote:
> >
> > Ups slight typo, in the first line of the previous email I meant read
> > instead of readAll
> >
> > On Wed, Jan 30, 2019 at 11:32 PM Ismaël Mejía 
> wrote:
> > >
> > > Reuven is right for the example, readAll at this moment may be
> faster
> > > and also supports Dynamic Work Rebalancing (DWR), but the
> performance
> > > of the other approach may (and must) be improved to be equal, once
> the
> > > internal implementation of TextIO.read moves to a SDF version
> instead
> > > of the FileBasedSource one, and once that runners support DWR
> through
> > > SDF. Of course all of this is future work. Probably Eugene can
> > > eventually chime in to give more details in practical performance
> in
> > > his tests in Dataflow.
> > >
> > > Really interesting topic, but I want to bring back the discussion
> to
> > > the subject of the thread. I think there is some confusion after
> > > Jeff's example which should have been:
> > >
> > >   return input
> > >   .apply(TextIO.readAll());
> > >
> > > to:
> > >
> > >   return input
> > >   .apply(FileIO.match().filepattern(fileSpec))
> > >   .apply(FileIO.readMatches())
> > >   .apply(TextIO.readFiles());
> > >
> > > This is the question we are addressing, do we need a readAll
> transform
> > > that replaces the 3 steps or no?
> 
> 
>  Ismaël, I'm not quite sure how these two are equal. readFiles()
> transform returns a PCollection of ReadableFile objects. Users are expected
> to read these files in a subsequent ParDo and produce a PCollection of
> proper type. FooIO.ReadAll() transforms on the other hand are tailored to
> each IO connector and return a PCollection of objects of type that are
> supported to be returned by that IO connector.
> >>>
> >>>
> >>> I assume you meant FileIO.readFiles()  here. Or did you mean
> TextIO.readFiles() ? If so that seems very similar to TextIO.readAll().
> 
> 
> 
> >
> > >
> > > On Wed, Jan 30, 2019 at 9:03 PM Robert Bradshaw <
> rober...@google.com> wrote:
> > > >
> > > > Yes, this is precisely the goal of SDF.
> > > >
> > > >
> > > > On Wed, Jan 30, 2019 at 8:41 PM Kenneth Knowles 
> wrote:
> > > > >
> > > > > So is the latter is intended for splittable DoFn but not yet
> using it? The promise of SDF is precisely this composability, isn't it?
> > > > >
> > > > > Kenn
> > > > >
> > > > > On Wed, Jan 30, 2019 at 10:16 AM Jeff Klukas <
> jklu...@mozilla.com> wrote:
> > > > >>
> > > > >> Reuven - Is TextIO.read().from() a more complex case than the
> topic Ismaël is bringing up in this thread? I'm surprised to hear that the
> two examples have different performance characteristics.
> > > > >>
> > > > >> Reading through the implementation, I guess the fundamental
> difference is whether a given configuration expands to TextIO.ReadAll or to
> io.Read. AFAICT, that detail and the subsequent performance impact is not
> documented.
> > > > >>
> > > > >> If the above is correct, perhaps it's an 

Re: Beam Python streaming pipeline on Flink Runner

2019-02-01 Thread Maximilian Michels

Max thanks for your summary. I would like to add that we agree that
the runner specific translation via URN is a temporal solution until
the wrappers transforms are written, is this correct? In any case this
alternative standard expansion approach deserves a discussion of their
own as you mention.


Correct. Wrapping existing Beam transforms should always be preferred 
over Runner-specific translation because the latter is not portable.


On 01.02.19 14:25, Ismaël Mejía wrote:

Thanks for the explanation Robert it makes much more sense now. (Sorry
for the confusion in the mapping I mistyped the direction SDF <->
Source).

Status of SDF:
- Support for Dynamic Work Rebalancing is WIP.
- Bounded version translation is supported by all non-portable runners
in a relatively naive way.
- Unbounded version translation is not supported in the non-portable
runners. (Let's not forget that this case may make sense too).
- Portable runners translation of SDF is WIP
- There is only one IO that is written based on SDF:
   - HBaseIO
- Some other IOs should work out of the box (those based on
non-splittable DoFn):
   - ClickhouseIO
   - File-based ones: TextIO, AvroIO, ParquetIO
   - JdbcIO
   - SolrIO

Max thanks for your summary. I would like to add that we agree that
the runner specific translation via URN is a temporal solution until
the wrappers transforms are written, is this correct? In any case this
alternative standard expansion approach deserves a discussion of their
own as you mention.

On Fri, Feb 1, 2019 at 2:02 PM Robert Bradshaw  wrote:


Are you suggesting something akin to a generic

 urn: JsonConfiguredJavaSource
 payload: some json specifying which source and which parameters

which would expand to actually constructing and applying that source?

(FWIW, I was imagining PubSubIO already had a translation into BeamFnApi protos 
that fully specified it, and we use that same format to translate back out.)

On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels  wrote:


Recaping here:

We all agree that SDF is the way to go for future implementations of
sources. It enables us to get rid of the source interfaces. However, SDF
does not solve the lack of streaming sources in Python.

The expansion PR (thanks btw!) solves the problem of
expanding/translating URNs known to an ExpansionService. That is a more
programmatic way of replacing language-specific performs, instead of
relying on translators directly in the Runner.

What is unsolved is the configuration of sources from a foreign
environment. In my opinion this is the most pressing issue for Python
sources, because what is PubSubIO worth in Python if you cannot
configure it?

What about this:

I think it is worth adding a JSON configuration option for all existing
Java sources. That way, we could easily configure them as part of the
expansion request (which would contain a JSON configuration). I'll
probably fork a thread to discuss this in more detail, but would like to
hear your thoughts.

-Max

On 01.02.19 13:08, Robert Bradshaw wrote:

On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels mailto:m...@apache.org>> wrote:

 Ah, I thought you meant native Flink transforms.

 Exactly! The translation code is already there. The main challenge
 is how to
 programmatically configure the BeamIO from Python. I suppose that is
 also an
 unsolved problem for cross-language transforms in general.


This is what https://github.com/apache/beam/pull/7316 does.

For a particular source, one would want to define a URN and
corresponding payload, then (probably) a CompositeTransform in Python
that takes the users arguments, packages them into the payload, applies
the ExternalTransform, and returns the results. How to handle arbitrary
UDFs embedded in sources is still TBD.

 For Matthias' pipeline with PubSubIO we can build something
 specific, but for
 the general case there should be way to initialize a Beam IO via a
 configuration
 map provided by an external environment.


I thought quite a bit about how we could represent expansions statically
(e.g. have some kind of expansion template that could be used, at least
in many cases, as data without firing up a separate process. May be
worth doing eventually, but we run into the same issues that were
discussed at
https://github.com/apache/beam/pull/7316#discussion_r249996455 ).

If one is already using a portable runner like Flink, having the job
service process automatically also serve up an expansion service for
various URNs it knows and cares about is probably a pretty low bar.
Flink could serve up things it would rather get back untouched in a
transform with a special flink runner urn.

As Ahmet mentions, SDF is better solution. I hope it's not that far
away, but even once it comes we'll likely want the above framework to
invoke the full suite of Java IOs even after they're running on SDF
themselves.

- Robert

 On 31.01.19 17:36, Thomas Weise wrote:
  > Exactly, 

Re: Beam Python streaming pipeline on Flink Runner

2019-02-01 Thread Maximilian Michels
Yes, I imagine sources to implement a JsonConfigurable interface (e.g. 
on their builders):


JsonConfigurable {
  // Either a json string or Map
  apply(String jsonConfig);
}

In Python we would create this transform:

URN: JsonConfiguredSource:v1
payload: {
   environment: environment_id, // Java/Python/Go
   resourceIdentifier: string,  // "org.apache.beam.io.PubSubIO"
   configuration: json config,  // { "topic" : "my_pubsub_topic" }
}

That's more generic and could be used for other languages where we might 
have sources/sinks.



(FWIW, I was imagining PubSubIO already had a translation into BeamFnApi protos 
that fully specified it, and we use that same format to translate back out.)


Not that I know of.

On 01.02.19 14:02, Robert Bradshaw wrote:

Are you suggesting something akin to a generic

     urn: JsonConfiguredJavaSource
     payload: some json specifying which source and which parameters

which would expand to actually constructing and applying that source?

(FWIW, I was imagining PubSubIO already had a translation into BeamFnApi 
protos that fully specified it, and we use that same format to translate 
back out.)


On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels > wrote:


Recaping here:

We all agree that SDF is the way to go for future implementations of
sources. It enables us to get rid of the source interfaces. However,
SDF
does not solve the lack of streaming sources in Python.

The expansion PR (thanks btw!) solves the problem of
expanding/translating URNs known to an ExpansionService. That is a more
programmatic way of replacing language-specific performs, instead of
relying on translators directly in the Runner.

What is unsolved is the configuration of sources from a foreign
environment. In my opinion this is the most pressing issue for Python
sources, because what is PubSubIO worth in Python if you cannot
configure it?

What about this:

I think it is worth adding a JSON configuration option for all existing
Java sources. That way, we could easily configure them as part of the
expansion request (which would contain a JSON configuration). I'll
probably fork a thread to discuss this in more detail, but would
like to
hear your thoughts.

-Max

On 01.02.19 13:08, Robert Bradshaw wrote:
 > On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels
mailto:m...@apache.org>
 > >> wrote:
 >
 >     Ah, I thought you meant native Flink transforms.
 >
 >     Exactly! The translation code is already there. The main
challenge
 >     is how to
 >     programmatically configure the BeamIO from Python. I suppose
that is
 >     also an
 >     unsolved problem for cross-language transforms in general.
 >
 >
 > This is what https://github.com/apache/beam/pull/7316 does.
 >
 > For a particular source, one would want to define a URN and
 > corresponding payload, then (probably) a CompositeTransform in
Python
 > that takes the users arguments, packages them into the payload,
applies
 > the ExternalTransform, and returns the results. How to handle
arbitrary
 > UDFs embedded in sources is still TBD.
 >
 >     For Matthias' pipeline with PubSubIO we can build something
 >     specific, but for
 >     the general case there should be way to initialize a Beam IO
via a
 >     configuration
 >     map provided by an external environment.
 >
 >
 > I thought quite a bit about how we could represent expansions
statically
 > (e.g. have some kind of expansion template that could be used, at
least
 > in many cases, as data without firing up a separate process. May be
 > worth doing eventually, but we run into the same issues that were
 > discussed at
 > https://github.com/apache/beam/pull/7316#discussion_r249996455 ).
 >
 > If one is already using a portable runner like Flink, having the job
 > service process automatically also serve up an expansion service for
 > various URNs it knows and cares about is probably a pretty low bar.
 > Flink could serve up things it would rather get back untouched in a
 > transform with a special flink runner urn.
 >
 > As Ahmet mentions, SDF is better solution. I hope it's not that far
 > away, but even once it comes we'll likely want the above
framework to
 > invoke the full suite of Java IOs even after they're running on SDF
 > themselves.
 >
 > - Robert
 >
 >     On 31.01.19 17:36, Thomas Weise wrote:
 >      > Exactly, that's what I had in mind.
 >      >
 >      > A Flink runner native transform would make the existing
unbounded
 >     sources
 >      > available, similar to:
 >      >
 >      >
 >


Re: [DISCUSS] Should File based IOs implement readAll() or just readFiles()

2019-02-01 Thread Ismaël Mejía
I want to chime in that I am also +1 to deprecating readAll, is there
anyone strongly pro readAll instead of the explicit composition?
And more important, can the Python/Go SDK align with this (deprecating
ReadAll and implementing ReadFiles)?

On Thu, Jan 31, 2019 at 12:34 AM Chamikara Jayalath
 wrote:
>
> Thanks for the clarification Ismaël and Eugene. +1 for deprecating existing 
> FooIO.readAll() transforms in favor of FooIO.readFiles().
>
> On Wed, Jan 30, 2019 at 3:25 PM Eugene Kirpichov  wrote:
>>
>> TextIO.read() and AvroIO.read() indeed perform better than match() + 
>> readMatches() + readFiles(), due to DWR - so for these two in particular I 
>> would not recommend such a refactoring.
>> However, new file-based IOs that do not support DWR should only provide 
>> readFiles(). Those that do, should provide read() and readFiles(). When SDF 
>> supports DWR, then readFiles() will be enough in all cases.
>> In general there's no need for readAll() for new file-based IOs - it is 
>> always equivalent to matchAll() + readMatches() + readFiles() including 
>> performance-wise. It was included in TextIO/AvroIO before readFiles() was a 
>> thing.
>>
>> On Wed, Jan 30, 2019 at 2:41 PM Chamikara Jayalath  
>> wrote:
>>>
>>> On Wed, Jan 30, 2019 at 2:37 PM Chamikara Jayalath  
>>> wrote:



 On Wed, Jan 30, 2019 at 2:33 PM Ismaël Mejía  wrote:
>
> Ups slight typo, in the first line of the previous email I meant read
> instead of readAll
>
> On Wed, Jan 30, 2019 at 11:32 PM Ismaël Mejía  wrote:
> >
> > Reuven is right for the example, readAll at this moment may be faster
> > and also supports Dynamic Work Rebalancing (DWR), but the performance
> > of the other approach may (and must) be improved to be equal, once the
> > internal implementation of TextIO.read moves to a SDF version instead
> > of the FileBasedSource one, and once that runners support DWR through
> > SDF. Of course all of this is future work. Probably Eugene can
> > eventually chime in to give more details in practical performance in
> > his tests in Dataflow.
> >
> > Really interesting topic, but I want to bring back the discussion to
> > the subject of the thread. I think there is some confusion after
> > Jeff's example which should have been:
> >
> >   return input
> >   .apply(TextIO.readAll());
> >
> > to:
> >
> >   return input
> >   .apply(FileIO.match().filepattern(fileSpec))
> >   .apply(FileIO.readMatches())
> >   .apply(TextIO.readFiles());
> >
> > This is the question we are addressing, do we need a readAll transform
> > that replaces the 3 steps or no?


 Ismaël, I'm not quite sure how these two are equal. readFiles() transform 
 returns a PCollection of ReadableFile objects. Users are expected to read 
 these files in a subsequent ParDo and produce a PCollection of proper 
 type. FooIO.ReadAll() transforms on the other hand are tailored to each IO 
 connector and return a PCollection of objects of type that are supported 
 to be returned by that IO connector.
>>>
>>>
>>> I assume you meant FileIO.readFiles()  here. Or did you mean 
>>> TextIO.readFiles() ? If so that seems very similar to TextIO.readAll().



>
> >
> > On Wed, Jan 30, 2019 at 9:03 PM Robert Bradshaw  
> > wrote:
> > >
> > > Yes, this is precisely the goal of SDF.
> > >
> > >
> > > On Wed, Jan 30, 2019 at 8:41 PM Kenneth Knowles  
> > > wrote:
> > > >
> > > > So is the latter is intended for splittable DoFn but not yet using 
> > > > it? The promise of SDF is precisely this composability, isn't it?
> > > >
> > > > Kenn
> > > >
> > > > On Wed, Jan 30, 2019 at 10:16 AM Jeff Klukas  
> > > > wrote:
> > > >>
> > > >> Reuven - Is TextIO.read().from() a more complex case than the 
> > > >> topic Ismaël is bringing up in this thread? I'm surprised to hear 
> > > >> that the two examples have different performance characteristics.
> > > >>
> > > >> Reading through the implementation, I guess the fundamental 
> > > >> difference is whether a given configuration expands to 
> > > >> TextIO.ReadAll or to io.Read. AFAICT, that detail and the 
> > > >> subsequent performance impact is not documented.
> > > >>
> > > >> If the above is correct, perhaps it's an argument for IOs to 
> > > >> provide higher-level methods in cases where they can optimize 
> > > >> performance compared to what a user might naively put together.
> > > >>
> > > >> On Wed, Jan 30, 2019 at 12:35 PM Reuven Lax  
> > > >> wrote:
> > > >>>
> > > >>> Jeff, what you did here is not simply a refactoring. These two 
> > > >>> are quite different, and will likely have different performance 
> > > >>> characteristics.
> > > >>>
> > > 

Re: Beam Python streaming pipeline on Flink Runner

2019-02-01 Thread Ismaël Mejía
Thanks for the explanation Robert it makes much more sense now. (Sorry
for the confusion in the mapping I mistyped the direction SDF <->
Source).

Status of SDF:
- Support for Dynamic Work Rebalancing is WIP.
- Bounded version translation is supported by all non-portable runners
in a relatively naive way.
- Unbounded version translation is not supported in the non-portable
runners. (Let's not forget that this case may make sense too).
- Portable runners translation of SDF is WIP
- There is only one IO that is written based on SDF:
  - HBaseIO
- Some other IOs should work out of the box (those based on
non-splittable DoFn):
  - ClickhouseIO
  - File-based ones: TextIO, AvroIO, ParquetIO
  - JdbcIO
  - SolrIO

Max thanks for your summary. I would like to add that we agree that
the runner specific translation via URN is a temporal solution until
the wrappers transforms are written, is this correct? In any case this
alternative standard expansion approach deserves a discussion of their
own as you mention.

On Fri, Feb 1, 2019 at 2:02 PM Robert Bradshaw  wrote:
>
> Are you suggesting something akin to a generic
>
> urn: JsonConfiguredJavaSource
> payload: some json specifying which source and which parameters
>
> which would expand to actually constructing and applying that source?
>
> (FWIW, I was imagining PubSubIO already had a translation into BeamFnApi 
> protos that fully specified it, and we use that same format to translate back 
> out.)
>
> On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels  wrote:
>>
>> Recaping here:
>>
>> We all agree that SDF is the way to go for future implementations of
>> sources. It enables us to get rid of the source interfaces. However, SDF
>> does not solve the lack of streaming sources in Python.
>>
>> The expansion PR (thanks btw!) solves the problem of
>> expanding/translating URNs known to an ExpansionService. That is a more
>> programmatic way of replacing language-specific performs, instead of
>> relying on translators directly in the Runner.
>>
>> What is unsolved is the configuration of sources from a foreign
>> environment. In my opinion this is the most pressing issue for Python
>> sources, because what is PubSubIO worth in Python if you cannot
>> configure it?
>>
>> What about this:
>>
>> I think it is worth adding a JSON configuration option for all existing
>> Java sources. That way, we could easily configure them as part of the
>> expansion request (which would contain a JSON configuration). I'll
>> probably fork a thread to discuss this in more detail, but would like to
>> hear your thoughts.
>>
>> -Max
>>
>> On 01.02.19 13:08, Robert Bradshaw wrote:
>> > On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels > > > wrote:
>> >
>> > Ah, I thought you meant native Flink transforms.
>> >
>> > Exactly! The translation code is already there. The main challenge
>> > is how to
>> > programmatically configure the BeamIO from Python. I suppose that is
>> > also an
>> > unsolved problem for cross-language transforms in general.
>> >
>> >
>> > This is what https://github.com/apache/beam/pull/7316 does.
>> >
>> > For a particular source, one would want to define a URN and
>> > corresponding payload, then (probably) a CompositeTransform in Python
>> > that takes the users arguments, packages them into the payload, applies
>> > the ExternalTransform, and returns the results. How to handle arbitrary
>> > UDFs embedded in sources is still TBD.
>> >
>> > For Matthias' pipeline with PubSubIO we can build something
>> > specific, but for
>> > the general case there should be way to initialize a Beam IO via a
>> > configuration
>> > map provided by an external environment.
>> >
>> >
>> > I thought quite a bit about how we could represent expansions statically
>> > (e.g. have some kind of expansion template that could be used, at least
>> > in many cases, as data without firing up a separate process. May be
>> > worth doing eventually, but we run into the same issues that were
>> > discussed at
>> > https://github.com/apache/beam/pull/7316#discussion_r249996455 ).
>> >
>> > If one is already using a portable runner like Flink, having the job
>> > service process automatically also serve up an expansion service for
>> > various URNs it knows and cares about is probably a pretty low bar.
>> > Flink could serve up things it would rather get back untouched in a
>> > transform with a special flink runner urn.
>> >
>> > As Ahmet mentions, SDF is better solution. I hope it's not that far
>> > away, but even once it comes we'll likely want the above framework to
>> > invoke the full suite of Java IOs even after they're running on SDF
>> > themselves.
>> >
>> > - Robert
>> >
>> > On 31.01.19 17:36, Thomas Weise wrote:
>> >  > Exactly, that's what I had in mind.
>> >  >
>> >  > A Flink runner native transform would make the existing unbounded
>> > sources
>> >  > available, similar to:
>> >  

Re: Beam Python streaming pipeline on Flink Runner

2019-02-01 Thread Robert Bradshaw
Are you suggesting something akin to a generic

urn: JsonConfiguredJavaSource
payload: some json specifying which source and which parameters

which would expand to actually constructing and applying that source?

(FWIW, I was imagining PubSubIO already had a translation into BeamFnApi
protos that fully specified it, and we use that same format to translate
back out.)

On Fri, Feb 1, 2019 at 1:44 PM Maximilian Michels  wrote:

> Recaping here:
>
> We all agree that SDF is the way to go for future implementations of
> sources. It enables us to get rid of the source interfaces. However, SDF
> does not solve the lack of streaming sources in Python.
>
> The expansion PR (thanks btw!) solves the problem of
> expanding/translating URNs known to an ExpansionService. That is a more
> programmatic way of replacing language-specific performs, instead of
> relying on translators directly in the Runner.
>
> What is unsolved is the configuration of sources from a foreign
> environment. In my opinion this is the most pressing issue for Python
> sources, because what is PubSubIO worth in Python if you cannot
> configure it?
>
> What about this:
>
> I think it is worth adding a JSON configuration option for all existing
> Java sources. That way, we could easily configure them as part of the
> expansion request (which would contain a JSON configuration). I'll
> probably fork a thread to discuss this in more detail, but would like to
> hear your thoughts.
>
> -Max
>
> On 01.02.19 13:08, Robert Bradshaw wrote:
> > On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels  > > wrote:
> >
> > Ah, I thought you meant native Flink transforms.
> >
> > Exactly! The translation code is already there. The main challenge
> > is how to
> > programmatically configure the BeamIO from Python. I suppose that is
> > also an
> > unsolved problem for cross-language transforms in general.
> >
> >
> > This is what https://github.com/apache/beam/pull/7316 does.
> >
> > For a particular source, one would want to define a URN and
> > corresponding payload, then (probably) a CompositeTransform in Python
> > that takes the users arguments, packages them into the payload, applies
> > the ExternalTransform, and returns the results. How to handle arbitrary
> > UDFs embedded in sources is still TBD.
> >
> > For Matthias' pipeline with PubSubIO we can build something
> > specific, but for
> > the general case there should be way to initialize a Beam IO via a
> > configuration
> > map provided by an external environment.
> >
> >
> > I thought quite a bit about how we could represent expansions statically
> > (e.g. have some kind of expansion template that could be used, at least
> > in many cases, as data without firing up a separate process. May be
> > worth doing eventually, but we run into the same issues that were
> > discussed at
> > https://github.com/apache/beam/pull/7316#discussion_r249996455 ).
> >
> > If one is already using a portable runner like Flink, having the job
> > service process automatically also serve up an expansion service for
> > various URNs it knows and cares about is probably a pretty low bar.
> > Flink could serve up things it would rather get back untouched in a
> > transform with a special flink runner urn.
> >
> > As Ahmet mentions, SDF is better solution. I hope it's not that far
> > away, but even once it comes we'll likely want the above framework to
> > invoke the full suite of Java IOs even after they're running on SDF
> > themselves.
> >
> > - Robert
> >
> > On 31.01.19 17:36, Thomas Weise wrote:
> >  > Exactly, that's what I had in mind.
> >  >
> >  > A Flink runner native transform would make the existing unbounded
> > sources
> >  > available, similar to:
> >  >
> >  >
> >
> https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167
> >  >
> >  >
> >  >
> >  >
> >  > On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels
> > mailto:m...@apache.org>
> >  > >> wrote:
> >  >
> >  > Wouldn't it be even more useful for the transition period if
> > we enabled Beam IO
> >  > to be used via Flink (like in the legacy Flink Runner)? In
> > this particular
> >  > example, Matthias wants to use PubSubIO, which is not even
> > available as a
> >  > native
> >  > Flink transform.
> >  >
> >  > On 31.01.19 16:21, Thomas Weise wrote:
> >  >  > Until SDF is supported, we could also add Flink runner
> > native transforms for
> >  >  > selected unbounded sources [1].
> >  >  >
> >  >  > That might be a reasonable option to unblock users that
> > want to try Python
> >  >  > streaming on Flink.
> >  >  >
> >  >  > 

Re: Beam Python streaming pipeline on Flink Runner

2019-02-01 Thread Maximilian Michels

Recaping here:

We all agree that SDF is the way to go for future implementations of 
sources. It enables us to get rid of the source interfaces. However, SDF 
does not solve the lack of streaming sources in Python.


The expansion PR (thanks btw!) solves the problem of 
expanding/translating URNs known to an ExpansionService. That is a more 
programmatic way of replacing language-specific performs, instead of 
relying on translators directly in the Runner.


What is unsolved is the configuration of sources from a foreign 
environment. In my opinion this is the most pressing issue for Python 
sources, because what is PubSubIO worth in Python if you cannot 
configure it?


What about this:

I think it is worth adding a JSON configuration option for all existing 
Java sources. That way, we could easily configure them as part of the 
expansion request (which would contain a JSON configuration). I'll 
probably fork a thread to discuss this in more detail, but would like to 
hear your thoughts.


-Max

On 01.02.19 13:08, Robert Bradshaw wrote:
On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels > wrote:


Ah, I thought you meant native Flink transforms.

Exactly! The translation code is already there. The main challenge
is how to
programmatically configure the BeamIO from Python. I suppose that is
also an
unsolved problem for cross-language transforms in general.


This is what https://github.com/apache/beam/pull/7316 does.

For a particular source, one would want to define a URN and 
corresponding payload, then (probably) a CompositeTransform in Python 
that takes the users arguments, packages them into the payload, applies 
the ExternalTransform, and returns the results. How to handle arbitrary 
UDFs embedded in sources is still TBD.


For Matthias' pipeline with PubSubIO we can build something
specific, but for
the general case there should be way to initialize a Beam IO via a
configuration
map provided by an external environment.


I thought quite a bit about how we could represent expansions statically 
(e.g. have some kind of expansion template that could be used, at least 
in many cases, as data without firing up a separate process. May be 
worth doing eventually, but we run into the same issues that were 
discussed at 
https://github.com/apache/beam/pull/7316#discussion_r249996455 ).


If one is already using a portable runner like Flink, having the job 
service process automatically also serve up an expansion service for 
various URNs it knows and cares about is probably a pretty low bar. 
Flink could serve up things it would rather get back untouched in a 
transform with a special flink runner urn.


As Ahmet mentions, SDF is better solution. I hope it's not that far 
away, but even once it comes we'll likely want the above framework to 
invoke the full suite of Java IOs even after they're running on SDF 
themselves.


- Robert

On 31.01.19 17:36, Thomas Weise wrote:
 > Exactly, that's what I had in mind.
 >
 > A Flink runner native transform would make the existing unbounded
sources
 > available, similar to:
 >
 >

https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167
 >
 >
 >
 >
 > On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels
mailto:m...@apache.org>
 > >> wrote:
 >
 >     Wouldn't it be even more useful for the transition period if
we enabled Beam IO
 >     to be used via Flink (like in the legacy Flink Runner)? In
this particular
 >     example, Matthias wants to use PubSubIO, which is not even
available as a
 >     native
 >     Flink transform.
 >
 >     On 31.01.19 16:21, Thomas Weise wrote:
 >      > Until SDF is supported, we could also add Flink runner
native transforms for
 >      > selected unbounded sources [1].
 >      >
 >      > That might be a reasonable option to unblock users that
want to try Python
 >      > streaming on Flink.
 >      >
 >      > Thomas
 >      >
 >      > [1]
 >      >
 >

https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java
 >      >
 >      >
 >      > On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels
mailto:m...@apache.org>
 >     >
 >      > 
      >
 >      >      > I have a hard time to imagine how can we map in a
generic way
 >      >     RestrictionTrackers into the existing
Bounded/UnboundedSource, so I would
 >      >     love to hear more 

Re: Beam Python streaming pipeline on Flink Runner

2019-02-01 Thread Robert Bradshaw
On Thu, Jan 31, 2019 at 6:25 PM Maximilian Michels  wrote:

> Ah, I thought you meant native Flink transforms.
>
> Exactly! The translation code is already there. The main challenge is how
> to
> programmatically configure the BeamIO from Python. I suppose that is also
> an
> unsolved problem for cross-language transforms in general.
>

This is what https://github.com/apache/beam/pull/7316 does.

For a particular source, one would want to define a URN and corresponding
payload, then (probably) a CompositeTransform in Python that takes the
users arguments, packages them into the payload, applies the
ExternalTransform, and returns the results. How to handle arbitrary UDFs
embedded in sources is still TBD.


> For Matthias' pipeline with PubSubIO we can build something specific, but
> for
> the general case there should be way to initialize a Beam IO via a
> configuration
> map provided by an external environment.
>

I thought quite a bit about how we could represent expansions statically
(e.g. have some kind of expansion template that could be used, at least in
many cases, as data without firing up a separate process. May be worth
doing eventually, but we run into the same issues that were discussed at
https://github.com/apache/beam/pull/7316#discussion_r249996455 ).

If one is already using a portable runner like Flink, having the job
service process automatically also serve up an expansion service for
various URNs it knows and cares about is probably a pretty low bar. Flink
could serve up things it would rather get back untouched in a transform
with a special flink runner urn.

As Ahmet mentions, SDF is better solution. I hope it's not that far away,
but even once it comes we'll likely want the above framework to invoke the
full suite of Java IOs even after they're running on SDF themselves.

- Robert



> On 31.01.19 17:36, Thomas Weise wrote:
> > Exactly, that's what I had in mind.
> >
> > A Flink runner native transform would make the existing unbounded
> sources
> > available, similar to:
> >
> >
> https://github.com/apache/beam/blob/2e89c1e4d35e7b5f95a622259d23d921c3d6ad1f/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L167
> >
> >
> >
> >
> > On Thu, Jan 31, 2019 at 8:18 AM Maximilian Michels  > > wrote:
> >
> > Wouldn't it be even more useful for the transition period if we
> enabled Beam IO
> > to be used via Flink (like in the legacy Flink Runner)? In this
> particular
> > example, Matthias wants to use PubSubIO, which is not even available
> as a
> > native
> > Flink transform.
> >
> > On 31.01.19 16:21, Thomas Weise wrote:
> >  > Until SDF is supported, we could also add Flink runner native
> transforms for
> >  > selected unbounded sources [1].
> >  >
> >  > That might be a reasonable option to unblock users that want to
> try Python
> >  > streaming on Flink.
> >  >
> >  > Thomas
> >  >
> >  > [1]
> >  >
> >
> https://github.com/lyft/beam/blob/release-2.10.0-lyft/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java
> >  >
> >  >
> >  > On Thu, Jan 31, 2019 at 6:51 AM Maximilian Michels <
> m...@apache.org
> > 
> >  > >> wrote:
> >  >
> >  >  > I have a hard time to imagine how can we map in a generic
> way
> >  > RestrictionTrackers into the existing
> Bounded/UnboundedSource, so I would
> >  > love to hear more about the details.
> >  >
> >  > Isn't it the other way around? The SDF is a generalization of
> > UnboundedSource.
> >  > So we would wrap UnboundedSource using SDF. I'm not saying it
> is
> > trivial, but
> >  > SDF offers all the functionality that UnboundedSource needs.
> >  >
> >  > For example, the @GetInitialRestriction method would call
> split on the
> >  > UnboundedSource and the restriction trackers would then be
> used to
> > process the
> >  > splits.
> >  >
> >  > On 31.01.19 15:16, Ismaël Mejía wrote:
> >  >  >> Not necessarily. This would be one way. Another way is
> build an SDF
> >  > wrapper for UnboundedSource. Probably the easier path for
> migration.
> >  >  >
> >  >  > That would be fantastic, I have heard about such wrapper
> multiple
> >  >  > times but so far there is not any realistic proposal. I
> have a hard
> >  >  > time to imagine how can we map in a generic way
> RestrictionTrackers
> >  >  > into the existing Bounded/UnboundedSource, so I would love
> to hear
> >  >  > more about the details.
> >  >  >
> >  >  > On Thu, Jan 31, 2019 at 3:07 PM Maximilian Michels <
> m...@apache.org
> > 
> >  > >> wrote:
> >