I figured out the Never.ever() approach and it seems to work. Will finish
this up and send a PR at some point. Woohoo, thanks Kenn! Seems like this
will be quite a useful transform.
On Mon, Dec 18, 2017 at 1:23 PM Eugene Kirpichov
wrote:
> I'm a bit confused by all of these suggestions: they sound plausible at a
> high level, but I'm having a hard time making any one of them concrete.
>
> So suppose we want to create a transform Wait.on(PCollection signal):
> PCollection -> PCollection.
> a.apply(Wait.on(sig)) returns a PCollection that is mostly identical to
> "a", but buffers panes of "a" in any given window until the final pane of
> "sig" in the same window is fired (or, if it's never fired, until the
> window closes? could use a deadletter for that maybe).
>
> This transform I suppose would need to have a keyed and unkeyed version.
>
> The keyed version would support merging window fns, and would require "a"
> and "sig" to be keyed by the same key, and would work using a CoGbk -
> followed by a stateful ParDo? Or is there a way to get away without a
> stateful ParDo here? (not all runners support it)
>
> The unkeyed version would not support merging window fns. Reuven, can you
> elaborate how your combiner idea would work here - in particular, what do
> you mean by "triggering only on the final pane"? Do you mean filter
> non-final panes before entering the combiner? I wonder if that'll work,
> probably worth a shot. And Kenn, can you elaborate on "re-trigger on the
> side input with a Never.ever() trigger"?
>
> Thanks.
>
> On Sun, Dec 17, 2017 at 1:28 PM Reuven Lax wrote:
>
>> This is an interesting point.
>>
>> In the past, we've often just though about sequencing some action to take
>> place after the sink, in which case you can simply use the sink output as a
>> main input. However if you want to run a transform with another PCollection
>> as a main input, this doesn't work. And as you've discovered, triggered
>> side inputs are defined to be non-deterministic, and there's no way to make
>> things line up.
>>
>> What you're describing only makes sense if you're blocking against the
>> final pane (since otherwise there's no reasonable way to match up somePC
>> panes with the sink panes). There are multiple ways you can do this: one
>> would be to CGBK the two PCollections together, and trigger the new
>> transform only on the final pane. Another would be to add a combiner that
>> returns a Void, triggering only on the final pane, and then make this
>> singleton Void a side input. You could also do something explicit with the
>> state API.
>>
>> Reuven
>>
>> On Fri, Dec 15, 2017 at 5:31 PM, Eugene Kirpichov
>> wrote:
>>
>>> So this appears not as easy as anticipated (surprise!)
>>>
>>> Suppose we have a PCollection "donePanes" with an element per
>>> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of
>>> data has been written; this pane is: final / non-final".
>>>
>>> Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn))
>>> happens only after the final pane has been written.
>>>
>>> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to happen
>>> when c emits a *final* pane.
>>>
>>> Unfortunately, using
>>> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) doesn't do
>>> the trick: the side input becomes ready the moment *the first *pane of
>>> data has been written.
>>>
>>> But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter
>>> only final panes...).apply(View.asSingleton())). It also becomes ready the
>>> moment *the first* pane has been written, you just get an exception if
>>> you access the side input before the *final* pane was written.
>>>
>>> I can't think of a pure-Beam solution to this: either "donePanes" will
>>> be used as a main input to something (and then everything else can only be
>>> a side input, which is not general enough), or it will be used as a side
>>> input (and then we can't achieve "trigger only after the final pane fires").
>>>
>>> It seems that we need a way to control the side input pushback, and
>>> configure whether a view becomes ready when its first pane has fired or
>>> when its last pane has fired. I could see this be a property on the View
>>> transform itself. In terms of implementation - I tried to figure out how
>>> side input readiness is determined, in the direct runner and Dataflow
>>> runner, and I'm completely lost and would appreciate some help.
>>>
>>> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax wrote:
>>>
This sounds great!
On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers
wrote:
> This would be absolutely great! It seems somewhat similar to the
> changes that were made to the BigQuery sink to support WriteResult (
>