Re: Callbacks/other functions run after a PDone/output transform

2017-12-21 Thread Eugene Kirpichov
Yeah. And I don't think there's a good way to define what sequencing even
means, if the sink is returning results in windows that aren't gonna have a
final pane.

On Thu, Dec 21, 2017, 2:00 AM Reuven Lax  wrote:

> This is only for "final pane" waiting, correct? So someone who writes a
> sink in the global window probably would not want to use this.
>
> On Wed, Dec 20, 2017 at 9:57 PM, Eugene Kirpichov 
> wrote:
>
>> PR is out https://github.com/apache/beam/pull/4301
>>
>> This should allow us to have useful sequencing for sinks like BigtableIO
>> / BigQueryIO.
>>
>> Adding a couple of interested parties:
>> - Steve, would you be interested in using this in
>> https://github.com/apache/beam/pull/3997 ?
>> - Mairbek: this should help in https://github.com/apache/beam/pull/4264 -
>> in particular, this works properly in case the input can be firing multiple
>> times.
>>
>> On Tue, Dec 19, 2017 at 5:20 PM Eugene Kirpichov 
>> wrote:
>>
>>> I figured out the Never.ever() approach and it seems to work. Will
>>> finish this up and send a PR at some point. Woohoo, thanks Kenn! Seems like
>>> this will be quite a useful transform.
>>>
>>> On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov 
>>> wrote:
>>>
 I'm a bit confused by all of these suggestions: they sound plausible at
 a high level, but I'm having a hard time making any one of them concrete.

 So suppose we want to create a transform Wait.on(PCollection
 signal): PCollection -> PCollection.
 a.apply(Wait.on(sig)) returns a PCollection that is mostly identical to
 "a", but buffers panes of "a" in any given window until the final pane of
 "sig" in the same window is fired (or, if it's never fired, until the
 window closes? could use a deadletter for that maybe).

 This transform I suppose would need to have a keyed and unkeyed version.

 The keyed version would support merging window fns, and would require
 "a" and "sig" to be keyed by the same key, and would work using a CoGbk -
 followed by a stateful ParDo? Or is there a way to get away without a
 stateful ParDo here? (not all runners support it)

 The unkeyed version would not support merging window fns. Reuven, can
 you elaborate how your combiner idea would work here - in particular, what
 do you mean by "triggering only on the final pane"? Do you mean filter
 non-final panes before entering the combiner? I wonder if that'll work,
 probably worth a shot. And Kenn, can you elaborate on "re-trigger on
 the side input with a Never.ever() trigger"?

 Thanks.

 On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax  wrote:

> This is an interesting point.
>
> In the past, we've often just though about sequencing some action to
> take place after the sink, in which case you can simply use the sink 
> output
> as a main input. However if you want to run a transform with another
> PCollection as a main input, this doesn't work. And as you've discovered,
> triggered side inputs are defined to be non-deterministic, and there's no
> way to make things line up.
>
> What you're describing only makes sense if you're blocking against the
> final pane (since otherwise there's no reasonable way to match up somePC
> panes with the sink panes). There are multiple ways you can do this: one
> would be to CGBK the two PCollections together, and trigger the new
> transform only on the final pane. Another would be to add a combiner that
> returns a Void, triggering only on the final pane, and then make this
> singleton Void a side input. You could also do something explicit with the
> state API.
>
> Reuven
>
> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov <
> kirpic...@google.com> wrote:
>
>> So this appears not as easy as anticipated (surprise!)
>>
>> Suppose we have a PCollection "donePanes" with an element per
>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of
>> data has been written; this pane is: final / non-final".
>>
>> Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn))
>> happens only after the final pane has been written.
>>
>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to
>> happen when c emits a *final* pane.
>>
>> Unfortunately, using
>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) doesn't 
>> do
>> the trick: the side input becomes ready the moment *the first *pane
>> of data has been written.
>>
>> But neither does
>> ParDo.of(fn).withSideInputs(donePanes.apply(...filter only final
>> panes...).apply(View.asSingleton())). It also becomes ready the moment 
>> *the
>> first* pane has been written, you just get an exception if you
>> access 

Re: Callbacks/other functions run after a PDone/output transform

2017-12-20 Thread Eugene Kirpichov
PR is out https://github.com/apache/beam/pull/4301

This should allow us to have useful sequencing for sinks like BigtableIO /
BigQueryIO.

Adding a couple of interested parties:
- Steve, would you be interested in using this in
https://github.com/apache/beam/pull/3997 ?
- Mairbek: this should help in https://github.com/apache/beam/pull/4264 -
in particular, this works properly in case the input can be firing multiple
times.

On Tue, Dec 19, 2017 at 5:20 PM Eugene Kirpichov 
wrote:

> I figured out the Never.ever() approach and it seems to work. Will finish
> this up and send a PR at some point. Woohoo, thanks Kenn! Seems like this
> will be quite a useful transform.
>
> On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov 
> wrote:
>
>> I'm a bit confused by all of these suggestions: they sound plausible at a
>> high level, but I'm having a hard time making any one of them concrete.
>>
>> So suppose we want to create a transform Wait.on(PCollection signal):
>> PCollection -> PCollection.
>> a.apply(Wait.on(sig)) returns a PCollection that is mostly identical to
>> "a", but buffers panes of "a" in any given window until the final pane of
>> "sig" in the same window is fired (or, if it's never fired, until the
>> window closes? could use a deadletter for that maybe).
>>
>> This transform I suppose would need to have a keyed and unkeyed version.
>>
>> The keyed version would support merging window fns, and would require "a"
>> and "sig" to be keyed by the same key, and would work using a CoGbk -
>> followed by a stateful ParDo? Or is there a way to get away without a
>> stateful ParDo here? (not all runners support it)
>>
>> The unkeyed version would not support merging window fns. Reuven, can you
>> elaborate how your combiner idea would work here - in particular, what do
>> you mean by "triggering only on the final pane"? Do you mean filter
>> non-final panes before entering the combiner? I wonder if that'll work,
>> probably worth a shot. And Kenn, can you elaborate on "re-trigger on the
>> side input with a Never.ever() trigger"?
>>
>> Thanks.
>>
>> On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax  wrote:
>>
>>> This is an interesting point.
>>>
>>> In the past, we've often just though about sequencing some action to
>>> take place after the sink, in which case you can simply use the sink output
>>> as a main input. However if you want to run a transform with another
>>> PCollection as a main input, this doesn't work. And as you've discovered,
>>> triggered side inputs are defined to be non-deterministic, and there's no
>>> way to make things line up.
>>>
>>> What you're describing only makes sense if you're blocking against the
>>> final pane (since otherwise there's no reasonable way to match up somePC
>>> panes with the sink panes). There are multiple ways you can do this: one
>>> would be to CGBK the two PCollections together, and trigger the new
>>> transform only on the final pane. Another would be to add a combiner that
>>> returns a Void, triggering only on the final pane, and then make this
>>> singleton Void a side input. You could also do something explicit with the
>>> state API.
>>>
>>> Reuven
>>>
>>> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov 
>>> wrote:
>>>
 So this appears not as easy as anticipated (surprise!)

 Suppose we have a PCollection "donePanes" with an element per
 fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of
 data has been written; this pane is: final / non-final".

 Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn))
 happens only after the final pane has been written.

 In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to happen
 when c emits a *final* pane.

 Unfortunately, using
 ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) doesn't do
 the trick: the side input becomes ready the moment *the first *pane of
 data has been written.

 But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter
 only final panes...).apply(View.asSingleton())). It also becomes ready the
 moment *the first* pane has been written, you just get an exception if
 you access the side input before the *final* pane was written.

 I can't think of a pure-Beam solution to this: either "donePanes" will
 be used as a main input to something (and then everything else can only be
 a side input, which is not general enough), or it will be used as a side
 input (and then we can't achieve "trigger only after the final pane 
 fires").

 It seems that we need a way to control the side input pushback, and
 configure whether a view becomes ready when its first pane has fired or
 when its last pane has fired. I could see this be a property on the View
 transform itself. In terms of implementation - I tried to figure out how

Re: Callbacks/other functions run after a PDone/output transform

2017-12-19 Thread Eugene Kirpichov
I figured out the Never.ever() approach and it seems to work. Will finish
this up and send a PR at some point. Woohoo, thanks Kenn! Seems like this
will be quite a useful transform.

On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov 
wrote:

> I'm a bit confused by all of these suggestions: they sound plausible at a
> high level, but I'm having a hard time making any one of them concrete.
>
> So suppose we want to create a transform Wait.on(PCollection signal):
> PCollection -> PCollection.
> a.apply(Wait.on(sig)) returns a PCollection that is mostly identical to
> "a", but buffers panes of "a" in any given window until the final pane of
> "sig" in the same window is fired (or, if it's never fired, until the
> window closes? could use a deadletter for that maybe).
>
> This transform I suppose would need to have a keyed and unkeyed version.
>
> The keyed version would support merging window fns, and would require "a"
> and "sig" to be keyed by the same key, and would work using a CoGbk -
> followed by a stateful ParDo? Or is there a way to get away without a
> stateful ParDo here? (not all runners support it)
>
> The unkeyed version would not support merging window fns. Reuven, can you
> elaborate how your combiner idea would work here - in particular, what do
> you mean by "triggering only on the final pane"? Do you mean filter
> non-final panes before entering the combiner? I wonder if that'll work,
> probably worth a shot. And Kenn, can you elaborate on "re-trigger on the
> side input with a Never.ever() trigger"?
>
> Thanks.
>
> On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax  wrote:
>
>> This is an interesting point.
>>
>> In the past, we've often just though about sequencing some action to take
>> place after the sink, in which case you can simply use the sink output as a
>> main input. However if you want to run a transform with another PCollection
>> as a main input, this doesn't work. And as you've discovered, triggered
>> side inputs are defined to be non-deterministic, and there's no way to make
>> things line up.
>>
>> What you're describing only makes sense if you're blocking against the
>> final pane (since otherwise there's no reasonable way to match up somePC
>> panes with the sink panes). There are multiple ways you can do this: one
>> would be to CGBK the two PCollections together, and trigger the new
>> transform only on the final pane. Another would be to add a combiner that
>> returns a Void, triggering only on the final pane, and then make this
>> singleton Void a side input. You could also do something explicit with the
>> state API.
>>
>> Reuven
>>
>> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov 
>> wrote:
>>
>>> So this appears not as easy as anticipated (surprise!)
>>>
>>> Suppose we have a PCollection "donePanes" with an element per
>>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of
>>> data has been written; this pane is: final / non-final".
>>>
>>> Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn))
>>> happens only after the final pane has been written.
>>>
>>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to happen
>>> when c emits a *final* pane.
>>>
>>> Unfortunately, using
>>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) doesn't do
>>> the trick: the side input becomes ready the moment *the first *pane of
>>> data has been written.
>>>
>>> But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter
>>> only final panes...).apply(View.asSingleton())). It also becomes ready the
>>> moment *the first* pane has been written, you just get an exception if
>>> you access the side input before the *final* pane was written.
>>>
>>> I can't think of a pure-Beam solution to this: either "donePanes" will
>>> be used as a main input to something (and then everything else can only be
>>> a side input, which is not general enough), or it will be used as a side
>>> input (and then we can't achieve "trigger only after the final pane fires").
>>>
>>> It seems that we need a way to control the side input pushback, and
>>> configure whether a view becomes ready when its first pane has fired or
>>> when its last pane has fired. I could see this be a property on the View
>>> transform itself. In terms of implementation - I tried to figure out how
>>> side input readiness is determined, in the direct runner and Dataflow
>>> runner, and I'm completely lost and would appreciate some help.
>>>
>>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax  wrote:
>>>
 This sounds great!

 On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers 
 wrote:

> This would be absolutely great! It seems somewhat similar to the
> changes that were made to the BigQuery sink to support WriteResult (
> 

Re: Callbacks/other functions run after a PDone/output transform

2017-12-18 Thread Eugene Kirpichov
I'm a bit confused by all of these suggestions: they sound plausible at a
high level, but I'm having a hard time making any one of them concrete.

So suppose we want to create a transform Wait.on(PCollection signal):
PCollection -> PCollection.
a.apply(Wait.on(sig)) returns a PCollection that is mostly identical to
"a", but buffers panes of "a" in any given window until the final pane of
"sig" in the same window is fired (or, if it's never fired, until the
window closes? could use a deadletter for that maybe).

This transform I suppose would need to have a keyed and unkeyed version.

The keyed version would support merging window fns, and would require "a"
and "sig" to be keyed by the same key, and would work using a CoGbk -
followed by a stateful ParDo? Or is there a way to get away without a
stateful ParDo here? (not all runners support it)

The unkeyed version would not support merging window fns. Reuven, can you
elaborate how your combiner idea would work here - in particular, what do
you mean by "triggering only on the final pane"? Do you mean filter
non-final panes before entering the combiner? I wonder if that'll work,
probably worth a shot. And Kenn, can you elaborate on "re-trigger on the
side input with a Never.ever() trigger"?

Thanks.

On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax  wrote:

> This is an interesting point.
>
> In the past, we've often just though about sequencing some action to take
> place after the sink, in which case you can simply use the sink output as a
> main input. However if you want to run a transform with another PCollection
> as a main input, this doesn't work. And as you've discovered, triggered
> side inputs are defined to be non-deterministic, and there's no way to make
> things line up.
>
> What you're describing only makes sense if you're blocking against the
> final pane (since otherwise there's no reasonable way to match up somePC
> panes with the sink panes). There are multiple ways you can do this: one
> would be to CGBK the two PCollections together, and trigger the new
> transform only on the final pane. Another would be to add a combiner that
> returns a Void, triggering only on the final pane, and then make this
> singleton Void a side input. You could also do something explicit with the
> state API.
>
> Reuven
>
> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov 
> wrote:
>
>> So this appears not as easy as anticipated (surprise!)
>>
>> Suppose we have a PCollection "donePanes" with an element per
>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of
>> data has been written; this pane is: final / non-final".
>>
>> Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn))
>> happens only after the final pane has been written.
>>
>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to happen
>> when c emits a *final* pane.
>>
>> Unfortunately, using
>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) doesn't do
>> the trick: the side input becomes ready the moment *the first *pane of
>> data has been written.
>>
>> But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter
>> only final panes...).apply(View.asSingleton())). It also becomes ready the
>> moment *the first* pane has been written, you just get an exception if
>> you access the side input before the *final* pane was written.
>>
>> I can't think of a pure-Beam solution to this: either "donePanes" will be
>> used as a main input to something (and then everything else can only be a
>> side input, which is not general enough), or it will be used as a side
>> input (and then we can't achieve "trigger only after the final pane fires").
>>
>> It seems that we need a way to control the side input pushback, and
>> configure whether a view becomes ready when its first pane has fired or
>> when its last pane has fired. I could see this be a property on the View
>> transform itself. In terms of implementation - I tried to figure out how
>> side input readiness is determined, in the direct runner and Dataflow
>> runner, and I'm completely lost and would appreciate some help.
>>
>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax  wrote:
>>
>>> This sounds great!
>>>
>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers 
>>> wrote:
>>>
 This would be absolutely great! It seems somewhat similar to the
 changes that were made to the BigQuery sink to support WriteResult (
 https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
 ).

 I find it helpful to think about the different things that may come
 after a sink. For instance:

 1. It might be helpful to have a collection of failing input elements.
 The type of failed elements is pretty straightforward -- just the input
 elements. This allows handling such failures by directing them 

Re: Callbacks/other functions run after a PDone/output transform

2017-12-15 Thread Kenneth Knowles
I can think of a couple mechanisms that you might use. You could drop to a
low level and use a stateful ParDo, storing the contents of panes to state
and emitting them all when the final pane arrives. You could also
re-trigger on the side input with a Never.ever() trigger, which will only
fire on GC time - and with terminating triggers eliminated, this is the
only final pane. And if you only want synchronization, you could just use a
simple ParDo to filter out all non-final panes.

Kenn

On Fri, Dec 15, 2017 at 7:01 PM, Ben Chambers  wrote:

> Would it make more sense for the side input watermark and details about
> the pane to be made available to the dofn which can then decide how to
> handle it? Then if a dofn only wants the final pane, it is analogous to
> triggering-is-for-sinks to push that back and only produce the views the
> dofn wants.
>
> I think exposing it and letting  the dofn figure it out has similarities
> to how the input punctuation for each input might be exposed in other
> systems.
>
> On Fri, Dec 15, 2017, 5:31 PM Eugene Kirpichov 
> wrote:
>
>> So this appears not as easy as anticipated (surprise!)
>>
>> Suppose we have a PCollection "donePanes" with an element per
>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of
>> data has been written; this pane is: final / non-final".
>>
>> Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn))
>> happens only after the final pane has been written.
>>
>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to happen
>> when c emits a *final* pane.
>>
>> Unfortunately, using 
>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton()))
>> doesn't do the trick: the side input becomes ready the moment *the first
>> *pane of data has been written.
>>
>> But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter
>> only final panes...).apply(View.asSingleton())). It also becomes ready
>> the moment *the first* pane has been written, you just get an exception
>> if you access the side input before the *final* pane was written.
>>
>> I can't think of a pure-Beam solution to this: either "donePanes" will be
>> used as a main input to something (and then everything else can only be a
>> side input, which is not general enough), or it will be used as a side
>> input (and then we can't achieve "trigger only after the final pane fires").
>>
>> It seems that we need a way to control the side input pushback, and
>> configure whether a view becomes ready when its first pane has fired or
>> when its last pane has fired. I could see this be a property on the View
>> transform itself. In terms of implementation - I tried to figure out how
>> side input readiness is determined, in the direct runner and Dataflow
>> runner, and I'm completely lost and would appreciate some help.
>>
>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax  wrote:
>>
>>> This sounds great!
>>>
>>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers 
>>> wrote:
>>>
 This would be absolutely great! It seems somewhat similar to the
 changes that were made to the BigQuery sink to support WriteResult (
 https://github.com/apache/beam/blob/master/sdks/java/io/
 google-cloud-platform/src/main/java/org/apache/beam/sdk/
 io/gcp/bigquery/WriteResult.java).

 I find it helpful to think about the different things that may come
 after a sink. For instance:

 1. It might be helpful to have a collection of failing input elements.
 The type of failed elements is pretty straightforward -- just the input
 elements. This allows handling such failures by directing them elsewhere or
 performing additional processing.

>>>
>>> BigQueryIO already does this as you point out.
>>>
>>
 2. For a sink that produces a series of files, it might be useful to
 have a collection of the file names that have been completely written. This
 allows performing additional handling on these completed segments.

>>>
>>> In fact we already do this for FileBasedSinks.   See https://github.com/
>>> apache/beam/blob/7d53878768757ef2115170a5073b99
>>> 956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/
>>> io/WriteFilesResult.java
>>>
>>
 3. For a sink that updates some destination, it would be reasonable to
 have a collection that provides (periodically) output indicating how
 complete the information written to that destination is. For instance, this
 might be something like " has all of the elements up
 to " complete. This allows tracking how much information
 has been completely written out.

>>>
>>> Interesting. Maybe tough to do since sinks often don't have that
>>> knowledge.
>>>
>>
>>>

 I think those concepts map to the more detailed description Eugene
 provided, but I find it helpful to focus on what information comes out of
 the sink and how it might be used.


Re: Callbacks/other functions run after a PDone/output transform

2017-12-15 Thread Eugene Kirpichov
So this appears not as easy as anticipated (surprise!)

Suppose we have a PCollection "donePanes" with an element per
fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of
data has been written; this pane is: final / non-final".

Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn))
happens only after the final pane has been written.

In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to happen
when c emits a *final* pane.

Unfortunately, using
ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) doesn't do
the trick: the side input becomes ready the moment *the first *pane of data
has been written.

But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter only
final panes...).apply(View.asSingleton())). It also becomes ready the
moment *the first* pane has been written, you just get an exception if you
access the side input before the *final* pane was written.

I can't think of a pure-Beam solution to this: either "donePanes" will be
used as a main input to something (and then everything else can only be a
side input, which is not general enough), or it will be used as a side
input (and then we can't achieve "trigger only after the final pane fires").

It seems that we need a way to control the side input pushback, and
configure whether a view becomes ready when its first pane has fired or
when its last pane has fired. I could see this be a property on the View
transform itself. In terms of implementation - I tried to figure out how
side input readiness is determined, in the direct runner and Dataflow
runner, and I'm completely lost and would appreciate some help.

On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax  wrote:

> This sounds great!
>
> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers  wrote:
>
>> This would be absolutely great! It seems somewhat similar to the changes
>> that were made to the BigQuery sink to support WriteResult (
>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
>> ).
>>
>> I find it helpful to think about the different things that may come after
>> a sink. For instance:
>>
>> 1. It might be helpful to have a collection of failing input elements.
>> The type of failed elements is pretty straightforward -- just the input
>> elements. This allows handling such failures by directing them elsewhere or
>> performing additional processing.
>>
>
> BigQueryIO already does this as you point out.
>
>>
>> 2. For a sink that produces a series of files, it might be useful to have
>> a collection of the file names that have been completely written. This
>> allows performing additional handling on these completed segments.
>>
>
> In fact we already do this for FileBasedSinks.   See
> https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFilesResult.java
>
>>
>> 3. For a sink that updates some destination, it would be reasonable to
>> have a collection that provides (periodically) output indicating how
>> complete the information written to that destination is. For instance, this
>> might be something like " has all of the elements up
>> to " complete. This allows tracking how much information
>> has been completely written out.
>>
>
> Interesting. Maybe tough to do since sinks often don't have that knowledge.
>
>
>>
>> I think those concepts map to the more detailed description Eugene
>> provided, but I find it helpful to focus on what information comes out of
>> the sink and how it might be used.
>>
>> Were there any use cases the above miss? Any functionality that has been
>> described that doesn't map to these use cases?
>>
>> -- Ben
>>
>> On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov 
>> wrote:
>>
>>> It makes sense to consider how this maps onto existing kinds of sinks.
>>>
>>> E.g.:
>>> - Something that just makes an RPC per record, e.g. MqttIO.write(): that
>>> will emit 1 result per bundle (either a bogus value or number of records
>>> written) that will be Combine'd into 1 result per pane of input. A user can
>>> sequence against this and be notified when some intermediate amount of data
>>> has been written for a window, or (via .isFinal()) when all of it has been
>>> written.
>>> - Something that e.g. initiates an import job, such as
>>> BigQueryIO.write(), or an ElasticsearchIO write with a follow-up atomic
>>> index swap: should emit 1 result per import job, e.g. containing
>>> information about the job (e.g. its id and statistics). Role of panes is
>>> the same.
>>> - Something like above but that supports dynamic destinations: like in
>>> WriteFiles, result will be PCollection> where
>>> ResultT may be something like a list of files that were written for this
>>> pane of this destination.
>>>
>>> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov 

Re: Callbacks/other functions run after a PDone/output transform

2017-12-04 Thread Eugene Kirpichov
I agree that the proper API for enabling the use case "do something after
the data has been written" is to return a PCollection of objects where each
object represents the result of writing some identifiable subset of the
data. Then one can apply a ParDo to this PCollection, in order to "do
something after this subset has been written".

The challenging part here is *identifying* the subset of the data that's
been written, in a way consistent with Beam's unified batch/streaming
model, where saying "all data has been written" is not an option because
more data can arrive.

The next choice is "a window of input has been written", but then again,
late data can arrive into a window as well.

Next choice after that is "a pane of input has been written", but per
https://s.apache.org/beam-sink-triggers the term "pane of input" is moot:
triggering and panes should be something private to the sink, and the same
input can trigger different sinks differently. The hypothetical different
accumulation modes make this trickier still. I'm not sure whether we intend
to also challenge the idea that windowing is inherent to the collection, or
whether it too should be specified on a transform that processes the
collection. I think for the sake of this discussion we can assume that it's
inherent, and assume the mental model that the elements in different
windows of a PCollection are processed independently - "as if" there were
multiple pipelines processing each window.

Overall, embracing the full picture, we end up with something like this:
- The input PCollection is a composition of windows.
- If the windowing strategy is non-merging (e.g. fixed or sliding windows),
the below applies to the entire contents of the PCollection. If it's
merging (e.g. session windows), then it applies per-key, and the input
should be (perhaps implicitly) keyed in a way that the sink understands -
for example, the grouping by destination in DynamicDestinations in file and
bigquery writes.
- Each window's contents is a "changelog" - stream of elements and
retractions.
- A "sink" processes each window of the collection, deciding how to handle
elements and retractions (and whether to support retractions at all) in a
sink-specific way, and deciding *when* to perform the side effects for a
portion of the changelog (a "pane") based on the sink's triggering strategy.
- If the side effect itself is parallelized, then there'll be multiple
results for the pane - e.g. one per bundle.
- Each (sink-chosen) pane produces a set of results, e.g. a list of
filenames that have been written, or simply a number of records that was
written, or a bogus void value etc. The result will implicitly include the
window of the input it's associated with. It will also implicitly include
pane information - index of the pane in this window, and whether this is
the first or last pane.
- The partitioning into bundles is an implementation detail and not very
useful, so before presenting the pane write results to the user, the sink
will probably want to Combine the bundle results so that there ends up
being 1 value for each pane that was written. Once again note that panes
may be associated with windows of the input as a whole, but if the input is
keyed (like with DynamicDestinations) they'll be associated with per-key
subsets of windows of the input.
- This combining requires an extra, well, combining operation, so it should
be optional.
- The user will end up getting either a PCollection or a
PCollection>, for sink-specific KeyT and ResultT, where
the elements of this collection will implicitly have window and pane
information, available via the implicit BoundedWindow and PaneInfo.
- Until "sink triggering" is implemented, we'll have to embrace the fact
that trigger strategy is set on the input. But in that case the user will
have to accept that the PaneInfo of ResultT's is not necessarily directly
related to panes of the input - the sink is allowed to do internal
aggregation as an implementation detail, which may modify the triggering
strategy. Basically the user will still get sink-assigned panes.
- In most cases, one may imagine that the user is interested in being
notified of "no more data associated with this window will be written", so
the user will ignore all ResultT's except those where the pane is marked
final. If a user is interested in being notified of intermediate write
results - they'll have to embrace the fact that they cannot identify the
precise subset of input associated with the intermediate result.

I think the really key points of the above are:
- Sinks should support windowed input. Sinks should write different windows
of input independently. If the sink can write multi-destination input, the
destination should function as a grouping key, and in that case merging
windowing should be allowed.
- Producing a PCollection of write results should be optional.
- When asked to produce results, sinks produce a PCollection of results
that may be 

Re: Callbacks/other functions run after a PDone/output transform

2017-12-04 Thread Robert Bradshaw
+1

At the very least an empty PCollection could be produced with no
promises about its contents but the ability to be followed (e.g. as a
side input), which is forward compatible with whatever actual metadata
one may decide to produce in the future.

On Mon, Dec 4, 2017 at 11:06 AM, Kenneth Knowles  wrote:
> +dev@
>
> I am in complete agreement with Luke. Data dependencies are easy to
> understand and a good way for an IO to communicate and establish causal
> dependencies. Converting an IO from PDone to real output may spur further
> useful thoughts based on the design decisions about what sort of output is
> most useful.
>
> Kenn
>
> On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik  wrote:
>>
>> I think all sinks actually do have valuable information to output which
>> can be used after a write (file names, transaction/commit/row ids, table
>> names, ...). In addition to this metadata, having a PCollection of all
>> successful writes and all failed writes is useful for users so they can
>> chain an action which depends on what was or wasn't successfully written.
>> Users have requested adding retry/failure handling policies to sinks so that
>> failed writes don't jam up the pipeline.
>>
>> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich 
>> wrote:
>>>
>>> So I agree generally with the idea that returning a PCollection makes all
>>> of this easier so that arbitrary additional functions can be added, what
>>> exactly would write functions be returning in a PCollection that would make
>>> sense? The whole idea is that we’ve written to an external source and now
>>> the collection itself is no longer needed.
>>>
>>> Currently, that’s represented with a PDone, but currently that doesn’t
>>> allow any work to occur after it. I see a couple possible ways of handling
>>> this given this conversation, and am curious which solution sounds like the
>>> best way to deal with the problem:
>>>
>>> 1. Have output transforms always return something specific (which would
>>> be the same across transforms by convention), that is in the form of a
>>> PCollection, so operations can occur after it.
>>>
>>> 2. Make either PDone or some new type that can act as a PCollection so we
>>> can run applies afterward.
>>>
>>> 3. Make output transforms provide the facility for a callback function
>>> which runs after the transform is complete.
>>>
>>> I went through these gymnastics recently when I was trying to build
>>> something that would move indices after writing to Algolia, and the solution
>>> was to co-opt code from the old Sink class that used to exist in Beam. The
>>> problem is that particular method requires the output transform in question
>>> to return a PCollection, even if it is trivial or doesn’t make sense to
>>> return one. This seems like a bad solution, but unfortunately there isn’t a
>>> notion of a transform that has no explicit output that needs to have
>>> operations occur after it.
>>>
>>> The three potential solutions above address this issue, but I would like
>>> to hear on which would be preferable (or perhaps a different proposal
>>> altogether?). Perhaps we could also start up a ticket on this, since it
>>> seems like a worthwhile feature addition. I would find it really useful, for
>>> one.
>>>
>>> Chet
>>>
>>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik  wrote:
>>>
>>> Instead of a callback fn, its most useful if a PCollection is returned
>>> containing the result of the sink so that any arbitrary additional functions
>>> can be applied.
>>>
>>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré 
>>> wrote:

 Agree, I would prefer to do the callback in the IO more than in the
 main.

 Regards
 JB

 On 12/01/2017 03:54 PM, Steve Niemitz wrote:
>
> I do something almost exactly like this, but with BigtableIO instead.
> I have a pull request open here [1] (which reminds me I need to finish 
> this
> up...).  It would really be nice for most IOs to support something like
> this.
>
> Essentially you do a GroupByKey (or some CombineFn) on the output from
> the BigtableIO, and then feed that into your function which will run when
> all writes finish.
>
> You probably want to avoid doing something in the main method because
> there's no guarantee it'll actually run (maybe the driver will die, get
> killed, machine will explode, etc).
>
> [1] https://github.com/apache/beam/pull/3997
>
> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick  > wrote:
>
> Assuming you're in Java. You could just follow on in your Main
> method.
> Checking the state of the Result.
>
> Example:
> PipelineResult result = pipeline.run();
> try {
> result.waitUntilFinish();
> if(result.getState() == PipelineResult.State.DONE) {
> 

Re: Callbacks/other functions run after a PDone/output transform

2017-12-04 Thread Kenneth Knowles
+dev@

I am in complete agreement with Luke. Data dependencies are easy to
understand and a good way for an IO to communicate and establish causal
dependencies. Converting an IO from PDone to real output may spur further
useful thoughts based on the design decisions about what sort of output is
most useful.

Kenn

On Mon, Dec 4, 2017 at 10:42 AM, Lukasz Cwik  wrote:

> I think all sinks actually do have valuable information to output which
> can be used after a write (file names, transaction/commit/row ids, table
> names, ...). In addition to this metadata, having a PCollection of all
> successful writes and all failed writes is useful for users so they can
> chain an action which depends on what was or wasn't successfully written.
> Users have requested adding retry/failure handling policies to sinks so
> that failed writes don't jam up the pipeline.
>
> On Fri, Dec 1, 2017 at 2:43 PM, Chet Aldrich 
> wrote:
>
>> So I agree generally with the idea that returning a PCollection makes all
>> of this easier so that arbitrary additional functions can be added, what
>> exactly would write functions be returning in a PCollection that would make
>> sense? The whole idea is that we’ve written to an external source and now
>> the collection itself is no longer needed.
>>
>> Currently, that’s represented with a PDone, but currently that doesn’t
>> allow any work to occur after it. I see a couple *possible *ways of
>> handling this given this conversation, and am curious which solution sounds
>> like the best way to deal with the problem:
>>
>> 1. Have output transforms always return *something *specific (which
>> would be the same across transforms by convention), that is in the form of
>> a PCollection, so operations can occur after it.
>>
>> 2. Make either PDone or some new type that can act as a PCollection so we
>> can run applies afterward.
>>
>> 3. Make output transforms provide the facility for a callback function
>> which runs after the transform is complete.
>>
>> I went through these gymnastics recently when I was trying to build
>> something that would move indices after writing to Algolia, and the
>> solution was to co-opt code from the old Sink class that used to exist in
>> Beam. The problem is that particular method requires the output transform
>> in question
>> to return a PCollection, even if it is trivial or doesn’t make sense to
>> return one. This seems like a bad solution, but unfortunately there isn’t a
>> notion of a transform that has no explicit output that needs to have
>> operations occur after it.
>>
>> The three potential solutions above address this issue, but I would like
>> to hear on which would be preferable (or perhaps a different proposal
>> altogether?). Perhaps we could also start up a ticket on this, since it
>> seems like a worthwhile feature addition. I would find it really useful,
>> for one.
>>
>> Chet
>>
>> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik  wrote:
>>
>> Instead of a callback fn, its most useful if a PCollection is returned
>> containing the result of the sink so that any arbitrary additional
>> functions can be applied.
>>
>> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré 
>> wrote:
>>
>>> Agree, I would prefer to do the callback in the IO more than in the main.
>>>
>>> Regards
>>> JB
>>>
>>> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
>>>
 I do something almost exactly like this, but with BigtableIO instead.
 I have a pull request open here [1] (which reminds me I need to finish this
 up...).  It would really be nice for most IOs to support something like
 this.

 Essentially you do a GroupByKey (or some CombineFn) on the output from
 the BigtableIO, and then feed that into your function which will run when
 all writes finish.

 You probably want to avoid doing something in the main method because
 there's no guarantee it'll actually run (maybe the driver will die, get
 killed, machine will explode, etc).

 [1] https://github.com/apache/beam/pull/3997

 On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick >> nerdyn...@gmail.com>> wrote:

 Assuming you're in Java. You could just follow on in your Main
 method.
 Checking the state of the Result.

 Example:
 PipelineResult result = pipeline.run();
 try {
 result.waitUntilFinish();
 if(result.getState() == PipelineResult.State.DONE) {
 //DO ES work
 }
 } catch(Exception e) {
 result.cancel();
 throw e;
 }

 Otherwise you could also use Oozie to construct a work flow.

 On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré <
 j...@nanthrax.net
 > wrote:

 Hi,

 yes, we had a similar question some days ago.

 We can imagine to have a 

Re: Callbacks/other functions run after a PDone/output transform

2017-12-01 Thread Chet Aldrich
So I agree generally with the idea that returning a PCollection makes all of 
this easier so that arbitrary additional functions can be added, what exactly 
would write functions be returning in a PCollection that would make sense? The 
whole idea is that we’ve written to an external source and now the collection 
itself is no longer needed. 

Currently, that’s represented with a PDone, but currently that doesn’t allow 
any work to occur after it. I see a couple possible ways of handling this given 
this conversation, and am curious which solution sounds like the best way to 
deal with the problem:

1. Have output transforms always return something specific (which would be the 
same across transforms by convention), that is in the form of a PCollection, so 
operations can occur after it. 

2. Make either PDone or some new type that can act as a PCollection so we can 
run applies afterward. 

3. Make output transforms provide the facility for a callback function which 
runs after the transform is complete.

I went through these gymnastics recently when I was trying to build something 
that would move indices after writing to Algolia, and the solution was to 
co-opt code from the old Sink class that used to exist in Beam. The problem is 
that particular method requires the output transform in question
to return a PCollection, even if it is trivial or doesn’t make sense to return 
one. This seems like a bad solution, but unfortunately there isn’t a notion of 
a transform that has no explicit output that needs to have operations occur 
after it. 

The three potential solutions above address this issue, but I would like to 
hear on which would be preferable (or perhaps a different proposal 
altogether?). Perhaps we could also start up a ticket on this, since it seems 
like a worthwhile feature addition. I would find it really useful, for one. 

Chet

> On Dec 1, 2017, at 12:19 PM, Lukasz Cwik  wrote:
> 
> Instead of a callback fn, its most useful if a PCollection is returned 
> containing the result of the sink so that any arbitrary additional functions 
> can be applied.
> 
> On Fri, Dec 1, 2017 at 7:14 AM, Jean-Baptiste Onofré  > wrote:
> Agree, I would prefer to do the callback in the IO more than in the main.
> 
> Regards
> JB
> 
> On 12/01/2017 03:54 PM, Steve Niemitz wrote:
> I do something almost exactly like this, but with BigtableIO instead.  I have 
> a pull request open here [1] (which reminds me I need to finish this up...).  
> It would really be nice for most IOs to support something like this.
> 
> Essentially you do a GroupByKey (or some CombineFn) on the output from the 
> BigtableIO, and then feed that into your function which will run when all 
> writes finish.
> 
> You probably want to avoid doing something in the main method because there's 
> no guarantee it'll actually run (maybe the driver will die, get killed, 
> machine will explode, etc).
> 
> [1] https://github.com/apache/beam/pull/3997 
> 
> 
> On Fri, Dec 1, 2017 at 9:46 AM, NerdyNick    >> wrote:
> 
> Assuming you're in Java. You could just follow on in your Main method.
> Checking the state of the Result.
> 
> Example:
> PipelineResult result = pipeline.run();
> try {
> result.waitUntilFinish();
> if(result.getState() == PipelineResult.State.DONE) {
> //DO ES work
> }
> } catch(Exception e) {
> result.cancel();
> throw e;
> }
> 
> Otherwise you could also use Oozie to construct a work flow.
> 
> On Fri, Dec 1, 2017 at 2:02 AM, Jean-Baptiste Onofré  
> >> wrote:
> 
> Hi,
> 
> yes, we had a similar question some days ago.
> 
> We can imagine to have a user callback fn fired when the sink batch is
> complete.
> 
> Let me think about that.
> 
> Regards
> JB
> 
> On 12/01/2017 09:04 AM, Philip Chan wrote:
> 
> Hey JB,
> 
> Thanks for getting back so quickly.
> I suppose in that case I would need a way of monitoring when the 
> ES
> transform completes successfully before I can proceed with doing 
> the
> swap.
> The problem with this is that I can't think of a good way to
> determine that termination state short of polling the new index to
> check the document count compared to the size of input 
> PCollection.
> That, or maybe I'd need to use an external system like you 
> mentioned
> to poll on the state of the pipeline (I'm using Google Dataflow, 
> so
> maybe there's a way to do this with some API).
> But I would have thought that there would be an easy