Re: real real-time beam

2019-12-06 Thread Aaron Dixon
> Aaron - do you have the information you need to implement your sink? My
impression is that you have quite a good grasp of the issues even before
you asked.

Yes I do thank you. I really appreciate the thorough help from everyone
Thank you

On Wed, Dec 4, 2019 at 9:41 AM Jan Lukavský  wrote:

> Hi Kenn,
> On 12/4/19 5:38 AM, Kenneth Knowles wrote:
>
> Jan - let's try to defrag the threads on your time sorting proposal. This
> thread may have useful ideas but I want to focus on helping Aaron in this
> thread. You can link to this thread from other threads or from a design
> doc. Does this seem OK to you?
>
> sure. :-)
>
> I actually think the best thread to continue the discussion would be [1].
> The reason why this discussion probably got fragmented is that the other
> threads seem to die out without any conclusion. :-(
>
> Jan
>
> [1]
> https://lists.apache.org/thread.html/e2f729c7cea22553fc34421d4547132fa1c2ec01035eb4fb1a426873%40%3Cdev.beam.apache.org%3E
>
>
> Aaron - do you have the information you need to implement your sink? My
> impression is that you have quite a good grasp of the issues even before
> you asked.
>
> Kenn
>
> On Wed, Nov 27, 2019 at 3:05 AM Jan Lukavský  wrote:
>
>> > Trigger firings can have decreasing event timestamps w/ the minimum
>> timestamp combiner*. I do think the issue at hand is best analyzed in terms
>> of the explicit ordering on panes. And I do think we need to have an
>> explicit guarantee or annotation strong enough to describe a
>> correct-under-all-allowed runners sink. Today an antagonistic runner could
>> probably break a lot of things.
>>
>> Thanks for this insight. I didn't know about the relation between trigger
>> firing (event) time - which is always non-decreasing - and the resulting
>> timestamp of output pane - which can be affected by timestamp combiner and
>> decrease in cases you describe. What actually correlates with the pane
>> index at all times is processing time of trigger firings with the pane
>> index. Would you say, that if the "annotation that would guarantee ordering
>> of panes" could be viewed as a time ordering annotation with an additional
>> time domain (event time, processing time)? Could then these two be viewed
>> as a single one with some distinguishing parameter?
>>
>> @RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)
>>
>> ?
>>
>> Event time should be probably made the default, because that is
>> information that is accessible with every WindowedValue, while pane index
>> is available only after GBK (or generally might be available after every
>> keyed sequential operation, but is missing after source for instance).
>>
>> Jan
>> On 11/27/19 1:32 AM, Kenneth Knowles wrote:
>>
>>
>>
>> On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský  wrote:
>>
>>> > I will not try to formalize this notion in this email. But I will note
>>> that since it is universally assured, it would be zero cost and
>>> significantly safer to formalize it and add an annotation noting it was
>>> required. It has nothing to do with event time ordering, only trigger
>>> firing ordering.
>>>
>>> I cannot agree with the last sentence (and I'm really not doing this on
>>> purpose :-)). Panes generally arrive out of order, as mentioned several
>>> times in the discussions linked from this thread. If we want to ensure
>>> "trigger firing ordering", we can use the pane index, that is correct. But
>>> - that is actually equivalent to sorting by event time, because pane index
>>> order will be (nearly) the same as event time order. This is due to the
>>> fact, that pane index and event time correlate (both are monotonic).
>>>
>> Trigger firings can have decreasing event timestamps w/ the minimum
>> timestamp combiner*. I do think the issue at hand is best analyzed in terms
>> of the explicit ordering on panes. And I do think we need to have an
>> explicit guarantee or annotation strong enough to describe a
>> correct-under-all-allowed runners sink. Today an antagonistic runner could
>> probably break a lot of things.
>>
>> Kenn
>>
>> *In fact, they can decrease via the "maximum" timestamp combiner because
>> actually timestamp combiners only apply to the elements that particular
>> pane. This is weird, and maybe a design bug, but good to know about.
>>
>>
>>> The pane index "only" solves the issue of preserving ordering even in
>>> case where there are multiple firings within the same timestamp (regardless
>>> of granularity). This was mentioned in the initial discussion about event
>>> time ordering, and is part of the design doc - users should be allowed to
>>> provide UDF for extracting time-correlated ordering field (which means
>>> ability to choose a preferred, or authoritative, observer which assigns
>>> unambiguous ordering to events). Example of this might include Kafka
>>> offsets as well, or any queue index for that matter. This is not yet
>>> implemented, but could (should) be in the future.
>>>
>>> The only case where these two things are 

Re: real real-time beam

2019-12-04 Thread Jan Lukavský

Hi Kenn,

On 12/4/19 5:38 AM, Kenneth Knowles wrote:
Jan - let's try to defrag the threads on your time sorting proposal. 
This thread may have useful ideas but I want to focus on helping Aaron 
in this thread. You can link to this thread from other threads or from 
a design doc. Does this seem OK to you?


sure. :-)

I actually think the best thread to continue the discussion would be 
[1]. The reason why this discussion probably got fragmented is that the 
other threads seem to die out without any conclusion. :-(


Jan

[1] 
https://lists.apache.org/thread.html/e2f729c7cea22553fc34421d4547132fa1c2ec01035eb4fb1a426873%40%3Cdev.beam.apache.org%3E




Aaron - do you have the information you need to implement your sink? 
My impression is that you have quite a good grasp of the issues even 
before you asked.


Kenn

On Wed, Nov 27, 2019 at 3:05 AM Jan Lukavský > wrote:


> Trigger firings can have decreasing event timestamps w/ the
minimum timestamp combiner*. I do think the issue at hand is best
analyzed in terms of the explicit ordering on panes. And I do
think we need to have an explicit guarantee or annotation strong
enough to describe a correct-under-all-allowed runners sink. Today
an antagonistic runner could probably break a lot of things.

Thanks for this insight. I didn't know about the relation between
trigger firing (event) time - which is always non-decreasing - and
the resulting timestamp of output pane - which can be affected by
timestamp combiner and decrease in cases you describe. What
actually correlates with the pane index at all times is processing
time of trigger firings with the pane index. Would you say, that
if the "annotation that would guarantee ordering of panes" could
be viewed as a time ordering annotation with an additional time
domain (event time, processing time)? Could then these two be
viewed as a single one with some distinguishing parameter?

@RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)

?

Event time should be probably made the default, because that is
information that is accessible with every WindowedValue, while
pane index is available only after GBK (or generally might be
available after every keyed sequential operation, but is missing
after source for instance).

Jan

On 11/27/19 1:32 AM, Kenneth Knowles wrote:



On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

> I will not try to formalize this notion in this email. But
I will note that since it is universally assured, it would be
zero cost and significantly safer to formalize it and add an
annotation noting it was required. It has nothing to do with
event time ordering, only trigger firing ordering.

I cannot agree with the last sentence (and I'm really not
doing this on purpose :-)). Panes generally arrive out of
order, as mentioned several times in the discussions linked
from this thread. If we want to ensure "trigger firing
ordering", we can use the pane index, that is correct. But -
that is actually equivalent to sorting by event time, because
pane index order will be (nearly) the same as event time
order. This is due to the fact, that pane index and event
time correlate (both are monotonic).

Trigger firings can have decreasing event timestamps w/ the
minimum timestamp combiner*. I do think the issue at hand is best
analyzed in terms of the explicit ordering on panes. And I do
think we need to have an explicit guarantee or annotation strong
enough to describe a correct-under-all-allowed runners sink.
Today an antagonistic runner could probably break a lot of things.

Kenn

*In fact, they can decrease via the "maximum" timestamp combiner
because actually timestamp combiners only apply to the elements
that particular pane. This is weird, and maybe a design bug, but
good to know about.

The pane index "only" solves the issue of preserving ordering
even in case where there are multiple firings within the same
timestamp (regardless of granularity). This was mentioned in
the initial discussion about event time ordering, and is part
of the design doc - users should be allowed to provide UDF
for extracting time-correlated ordering field (which means
ability to choose a preferred, or authoritative, observer
which assigns unambiguous ordering to events). Example of
this might include Kafka offsets as well, or any queue index
for that matter. This is not yet implemented, but could
(should) be in the future.

The only case where these two things are (somewhat) different
is the case mentioned by @Steve - if the output is stateless
ParDo, which will get fused. But that is only because the

Re: real real-time beam

2019-12-03 Thread Kenneth Knowles
Jan - let's try to defrag the threads on your time sorting proposal. This
thread may have useful ideas but I want to focus on helping Aaron in this
thread. You can link to this thread from other threads or from a design
doc. Does this seem OK to you?

Aaron - do you have the information you need to implement your sink? My
impression is that you have quite a good grasp of the issues even before
you asked.

Kenn

On Wed, Nov 27, 2019 at 3:05 AM Jan Lukavský  wrote:

> > Trigger firings can have decreasing event timestamps w/ the minimum
> timestamp combiner*. I do think the issue at hand is best analyzed in terms
> of the explicit ordering on panes. And I do think we need to have an
> explicit guarantee or annotation strong enough to describe a
> correct-under-all-allowed runners sink. Today an antagonistic runner could
> probably break a lot of things.
>
> Thanks for this insight. I didn't know about the relation between trigger
> firing (event) time - which is always non-decreasing - and the resulting
> timestamp of output pane - which can be affected by timestamp combiner and
> decrease in cases you describe. What actually correlates with the pane
> index at all times is processing time of trigger firings with the pane
> index. Would you say, that if the "annotation that would guarantee ordering
> of panes" could be viewed as a time ordering annotation with an additional
> time domain (event time, processing time)? Could then these two be viewed
> as a single one with some distinguishing parameter?
>
> @RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)
>
> ?
>
> Event time should be probably made the default, because that is
> information that is accessible with every WindowedValue, while pane index
> is available only after GBK (or generally might be available after every
> keyed sequential operation, but is missing after source for instance).
>
> Jan
> On 11/27/19 1:32 AM, Kenneth Knowles wrote:
>
>
>
> On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský  wrote:
>
>> > I will not try to formalize this notion in this email. But I will note
>> that since it is universally assured, it would be zero cost and
>> significantly safer to formalize it and add an annotation noting it was
>> required. It has nothing to do with event time ordering, only trigger
>> firing ordering.
>>
>> I cannot agree with the last sentence (and I'm really not doing this on
>> purpose :-)). Panes generally arrive out of order, as mentioned several
>> times in the discussions linked from this thread. If we want to ensure
>> "trigger firing ordering", we can use the pane index, that is correct. But
>> - that is actually equivalent to sorting by event time, because pane index
>> order will be (nearly) the same as event time order. This is due to the
>> fact, that pane index and event time correlate (both are monotonic).
>>
> Trigger firings can have decreasing event timestamps w/ the minimum
> timestamp combiner*. I do think the issue at hand is best analyzed in terms
> of the explicit ordering on panes. And I do think we need to have an
> explicit guarantee or annotation strong enough to describe a
> correct-under-all-allowed runners sink. Today an antagonistic runner could
> probably break a lot of things.
>
> Kenn
>
> *In fact, they can decrease via the "maximum" timestamp combiner because
> actually timestamp combiners only apply to the elements that particular
> pane. This is weird, and maybe a design bug, but good to know about.
>
>
>> The pane index "only" solves the issue of preserving ordering even in
>> case where there are multiple firings within the same timestamp (regardless
>> of granularity). This was mentioned in the initial discussion about event
>> time ordering, and is part of the design doc - users should be allowed to
>> provide UDF for extracting time-correlated ordering field (which means
>> ability to choose a preferred, or authoritative, observer which assigns
>> unambiguous ordering to events). Example of this might include Kafka
>> offsets as well, or any queue index for that matter. This is not yet
>> implemented, but could (should) be in the future.
>>
>> The only case where these two things are (somewhat) different is the case
>> mentioned by @Steve - if the output is stateless ParDo, which will get
>> fused. But that is only because the processing is single-threaded per key,
>> and therefore the ordering is implied by timer ordering (and careful here,
>> many runners don't have this ordering 100% correct, as of now - this
>> problem luckily appears only when there are multiple timers per key).
>> Moreover, if there should be a failure, then the output might (would) get
>> back in time anyway. If there would be a shuffle operation after
>> GBK/Combine, then the ordering is no longer guaranteed and must be
>> explicitly taken care of.
>>
>> Last note, I must agree with @Rui that all these discussions are very
>> much related to retractions (precisely the ability to implement them).
>>
>> Jan
>> On 

Re: real real-time beam

2019-11-27 Thread Jan Lukavský
> Trigger firings can have decreasing event timestamps w/ the minimum 
timestamp combiner*. I do think the issue at hand is best analyzed in 
terms of the explicit ordering on panes. And I do think we need to have 
an explicit guarantee or annotation strong enough to describe a 
correct-under-all-allowed runners sink. Today an antagonistic runner 
could probably break a lot of things.


Thanks for this insight. I didn't know about the relation between 
trigger firing (event) time - which is always non-decreasing - and the 
resulting timestamp of output pane - which can be affected by timestamp 
combiner and decrease in cases you describe. What actually correlates 
with the pane index at all times is processing time of trigger firings 
with the pane index. Would you say, that if the "annotation that would 
guarantee ordering of panes" could be viewed as a time ordering 
annotation with an additional time domain (event time, processing time)? 
Could then these two be viewed as a single one with some distinguishing 
parameter?


@RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)

?

Event time should be probably made the default, because that is 
information that is accessible with every WindowedValue, while pane 
index is available only after GBK (or generally might be available after 
every keyed sequential operation, but is missing after source for instance).


Jan

On 11/27/19 1:32 AM, Kenneth Knowles wrote:



On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský > wrote:


> I will not try to formalize this notion in this email. But I
will note that since it is universally assured, it would be zero
cost and significantly safer to formalize it and add an annotation
noting it was required. It has nothing to do with event time
ordering, only trigger firing ordering.

I cannot agree with the last sentence (and I'm really not doing
this on purpose :-)). Panes generally arrive out of order, as
mentioned several times in the discussions linked from this
thread. If we want to ensure "trigger firing ordering", we can use
the pane index, that is correct. But - that is actually equivalent
to sorting by event time, because pane index order will be
(nearly) the same as event time order. This is due to the fact,
that pane index and event time correlate (both are monotonic).

Trigger firings can have decreasing event timestamps w/ the minimum 
timestamp combiner*. I do think the issue at hand is best analyzed in 
terms of the explicit ordering on panes. And I do think we need to 
have an explicit guarantee or annotation strong enough to describe a 
correct-under-all-allowed runners sink. Today an antagonistic runner 
could probably break a lot of things.


Kenn

*In fact, they can decrease via the "maximum" timestamp combiner 
because actually timestamp combiners only apply to the elements that 
particular pane. This is weird, and maybe a design bug, but good to 
know about.


The pane index "only" solves the issue of preserving ordering even
in case where there are multiple firings within the same timestamp
(regardless of granularity). This was mentioned in the initial
discussion about event time ordering, and is part of the design
doc - users should be allowed to provide UDF for extracting
time-correlated ordering field (which means ability to choose a
preferred, or authoritative, observer which assigns unambiguous
ordering to events). Example of this might include Kafka offsets
as well, or any queue index for that matter. This is not yet
implemented, but could (should) be in the future.

The only case where these two things are (somewhat) different is
the case mentioned by @Steve - if the output is stateless ParDo,
which will get fused. But that is only because the processing is
single-threaded per key, and therefore the ordering is implied by
timer ordering (and careful here, many runners don't have this
ordering 100% correct, as of now - this problem luckily appears
only when there are multiple timers per key). Moreover, if there
should be a failure, then the output might (would) get back in
time anyway. If there would be a shuffle operation after
GBK/Combine, then the ordering is no longer guaranteed and must be
explicitly taken care of.

Last note, I must agree with @Rui that all these discussions are
very much related to retractions (precisely the ability to
implement them).

Jan

On 11/26/19 7:34 AM, Kenneth Knowles wrote:

Hi Aaron,

Another insightful observation.

Whenever an aggregation (GBK / Combine per key) has a trigger
firing, there is a per-key sequence number attached. It is
included in metadata known as "PaneInfo" [1]. The value of
PaneInfo.getIndex() is colloquially referred to as the "pane
index". You can also make use of the "on time index" if you like.
The best way to access this 

Re: real real-time beam

2019-11-26 Thread Kenneth Knowles
On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský  wrote:

> > I will not try to formalize this notion in this email. But I will note
> that since it is universally assured, it would be zero cost and
> significantly safer to formalize it and add an annotation noting it was
> required. It has nothing to do with event time ordering, only trigger
> firing ordering.
>
> I cannot agree with the last sentence (and I'm really not doing this on
> purpose :-)). Panes generally arrive out of order, as mentioned several
> times in the discussions linked from this thread. If we want to ensure
> "trigger firing ordering", we can use the pane index, that is correct. But
> - that is actually equivalent to sorting by event time, because pane index
> order will be (nearly) the same as event time order. This is due to the
> fact, that pane index and event time correlate (both are monotonic).
>
Trigger firings can have decreasing event timestamps w/ the minimum
timestamp combiner*. I do think the issue at hand is best analyzed in terms
of the explicit ordering on panes. And I do think we need to have an
explicit guarantee or annotation strong enough to describe a
correct-under-all-allowed runners sink. Today an antagonistic runner could
probably break a lot of things.

Kenn

*In fact, they can decrease via the "maximum" timestamp combiner because
actually timestamp combiners only apply to the elements that particular
pane. This is weird, and maybe a design bug, but good to know about.


> The pane index "only" solves the issue of preserving ordering even in case
> where there are multiple firings within the same timestamp (regardless of
> granularity). This was mentioned in the initial discussion about event time
> ordering, and is part of the design doc - users should be allowed to
> provide UDF for extracting time-correlated ordering field (which means
> ability to choose a preferred, or authoritative, observer which assigns
> unambiguous ordering to events). Example of this might include Kafka
> offsets as well, or any queue index for that matter. This is not yet
> implemented, but could (should) be in the future.
>
> The only case where these two things are (somewhat) different is the case
> mentioned by @Steve - if the output is stateless ParDo, which will get
> fused. But that is only because the processing is single-threaded per key,
> and therefore the ordering is implied by timer ordering (and careful here,
> many runners don't have this ordering 100% correct, as of now - this
> problem luckily appears only when there are multiple timers per key).
> Moreover, if there should be a failure, then the output might (would) get
> back in time anyway. If there would be a shuffle operation after
> GBK/Combine, then the ordering is no longer guaranteed and must be
> explicitly taken care of.
>
> Last note, I must agree with @Rui that all these discussions are very much
> related to retractions (precisely the ability to implement them).
>
> Jan
> On 11/26/19 7:34 AM, Kenneth Knowles wrote:
>
> Hi Aaron,
>
> Another insightful observation.
>
> Whenever an aggregation (GBK / Combine per key) has a trigger firing,
> there is a per-key sequence number attached. It is included in metadata
> known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially
> referred to as the "pane index". You can also make use of the "on time
> index" if you like. The best way to access this metadata is to add a
> parameter of type PaneInfo to your DoFn's @ProcessElement method. This
> works for stateful or stateless DoFn.
>
> Most of Beam's IO connectors do not explicitly enforce that outputs occur
> in pane index order but instead rely on the hope that the runner delivers
> panes in order to the sink. IMO this is dangerous but it has not yet caused
> a known issue. In practice, each "input key to output key 'path' " through
> a pipeline's logic does preserve order for all existing runners AFAIK and
> it is the formalization that is missing. It is related to an observation by 
> +Rui
> Wang  that processing retractions requires the same
> key-to-key ordering.
>
> I will not try to formalize this notion in this email. But I will note
> that since it is universally assured, it would be zero cost and
> significantly safer to formalize it and add an annotation noting it was
> required. It has nothing to do with event time ordering, only trigger
> firing ordering.
>
> Kenn
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>
>
> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada  wrote:
>
>> The blog posts on stateful and timely computation with Beam should help
>> clarify a lot about how to use state and timers to do this:
>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>> 

Re: real real-time beam

2019-11-26 Thread Jan Lukavský
> I will not try to formalize this notion in this email. But I will 
note that since it is universally assured, it would be zero cost and 
significantly safer to formalize it and add an annotation noting it was 
required. It has nothing to do with event time ordering, only trigger 
firing ordering.


I cannot agree with the last sentence (and I'm really not doing this on 
purpose :-)). Panes generally arrive out of order, as mentioned several 
times in the discussions linked from this thread. If we want to ensure 
"trigger firing ordering", we can use the pane index, that is correct. 
But - that is actually equivalent to sorting by event time, because pane 
index order will be (nearly) the same as event time order. This is due 
to the fact, that pane index and event time correlate (both are 
monotonic). The pane index "only" solves the issue of preserving 
ordering even in case where there are multiple firings within the same 
timestamp (regardless of granularity). This was mentioned in the initial 
discussion about event time ordering, and is part of the design doc - 
users should be allowed to provide UDF for extracting time-correlated 
ordering field (which means ability to choose a preferred, or 
authoritative, observer which assigns unambiguous ordering to events). 
Example of this might include Kafka offsets as well, or any queue index 
for that matter. This is not yet implemented, but could (should) be in 
the future.


The only case where these two things are (somewhat) different is the 
case mentioned by @Steve - if the output is stateless ParDo, which will 
get fused. But that is only because the processing is single-threaded 
per key, and therefore the ordering is implied by timer ordering (and 
careful here, many runners don't have this ordering 100% correct, as of 
now - this problem luckily appears only when there are multiple timers 
per key). Moreover, if there should be a failure, then the output might 
(would) get back in time anyway. If there would be a shuffle operation 
after GBK/Combine, then the ordering is no longer guaranteed and must be 
explicitly taken care of.


Last note, I must agree with @Rui that all these discussions are very 
much related to retractions (precisely the ability to implement them).


Jan

On 11/26/19 7:34 AM, Kenneth Knowles wrote:

Hi Aaron,

Another insightful observation.

Whenever an aggregation (GBK / Combine per key) has a trigger firing, 
there is a per-key sequence number attached. It is included in 
metadata known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is 
colloquially referred to as the "pane index". You can also make use of 
the "on time index" if you like. The best way to access this metadata 
is to add a parameter of type PaneInfo to your DoFn's @ProcessElement 
method. This works for stateful or stateless DoFn.


Most of Beam's IO connectors do not explicitly enforce that outputs 
occur in pane index order but instead rely on the hope that the runner 
delivers panes in order to the sink. IMO this is dangerous but it has 
not yet caused a known issue. In practice, each "input key to output 
key 'path' " through a pipeline's logic does preserve order for all 
existing runners AFAIK and it is the formalization that is missing. It 
is related to an observation by +Rui Wang 
 that processing retractions requires the 
same key-to-key ordering.


I will not try to formalize this notion in this email. But I will note 
that since it is universally assured, it would be zero cost and 
significantly safer to formalize it and add an annotation noting it 
was required. It has nothing to do with event time ordering, only 
trigger firing ordering.


Kenn

[1] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
[2] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557



On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada > wrote:


The blog posts on stateful and timely computation with Beam should
help clarify a lot about how to use state and timers to do this:
https://beam.apache.org/blog/2017/02/13/stateful-processing.html
https://beam.apache.org/blog/2017/08/28/timely-processing.html

You'll see there how there's an implicit per-single-element
grouping for each key, so state and timers should support your use
case very well.

Best
-P.

On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz mailto:sniem...@apache.org>> wrote:

If you have a pipeline that looks like Input -> GroupByKey ->
ParDo, while it is not guaranteed, in practice the sink will
observe the trigger firings in order (per key), since it'll be
fused to the output of the GBK operation (in all runners I
know of).

There have been a couple threads about trigger ordering as
well on the list recently that might 

Re: real real-time beam

2019-11-25 Thread Kenneth Knowles
Hi Aaron,

Another insightful observation.

Whenever an aggregation (GBK / Combine per key) has a trigger firing, there
is a per-key sequence number attached. It is included in metadata known as
"PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially referred
to as the "pane index". You can also make use of the "on time index" if you
like. The best way to access this metadata is to add a parameter of type
PaneInfo to your DoFn's @ProcessElement method. This works for stateful or
stateless DoFn.

Most of Beam's IO connectors do not explicitly enforce that outputs occur
in pane index order but instead rely on the hope that the runner delivers
panes in order to the sink. IMO this is dangerous but it has not yet caused
a known issue. In practice, each "input key to output key 'path' " through
a pipeline's logic does preserve order for all existing runners AFAIK and
it is the formalization that is missing. It is related to an
observation by +Rui
Wang  that processing retractions requires the same
key-to-key ordering.

I will not try to formalize this notion in this email. But I will note that
since it is universally assured, it would be zero cost and significantly
safer to formalize it and add an annotation noting it was required. It has
nothing to do with event time ordering, only trigger firing ordering.

Kenn

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557


On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada  wrote:

> The blog posts on stateful and timely computation with Beam should help
> clarify a lot about how to use state and timers to do this:
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
> You'll see there how there's an implicit per-single-element grouping for
> each key, so state and timers should support your use case very well.
>
> Best
> -P.
>
> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz  wrote:
>
>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo,
>> while it is not guaranteed, in practice the sink will observe the trigger
>> firings in order (per key), since it'll be fused to the output of the GBK
>> operation (in all runners I know of).
>>
>> There have been a couple threads about trigger ordering as well on the
>> list recently that might have more information:
>>
>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>
>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>
>>
>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon  wrote:
>>
>>> @Jan @Pablo Thank you
>>>
>>> @Pablo In this case it's a single global windowed Combine/perKey,
>>> triggered per element. Keys are few (client accounts) so they can live
>>> forever.
>>>
>>> It looks like just by virtue of using a stateful ParDo I could get this
>>> final execution to be "serialized" per key. (Then I could simply do the
>>> compare-and-swap using Beam's state mechanism to keep track of the "latest
>>> trigger timestamp" instead of having to orchestrate compare-and-swap in the
>>> target store :thinking:.)
>>>
>>>
>>>
>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský  wrote:
>>>
 One addition, to make the list of options exhaustive, there is probably
 one more option

   c) create a ParDo keyed by primary key of your sink, cache the last
 write in there and compare it locally, without the need to query the
 database

 It would still need some timer to clear values after watermark +
 allowed
 lateness, because otherwise you would have to cache your whole database
 on workers. But because you don't need actual ordering, you just need
 the most recent value (if I got it right) this might be an option.

 Jan

 On 11/25/19 10:53 PM, Jan Lukavský wrote:
 > Hi Aaron,
 >
 > maybe someone else will give another option, but if I understand
 > correctly what you want to solve, then you essentially have to do
 either:
 >
 >  a) use the compare & swap mechanism in the sink you described
 >
 >  b) use a buffer to buffer elements inside the outputting ParDo and
 > only output them when watermark passes (using a timer).
 >
 > There is actually an ongoing discussion about how to make option b)
 > user-friendly and part of Beam itself, but currently there is no
 > out-of-the-box solution for that.
 >
 > Jan
 >
 > On 11/25/19 10:27 PM, Aaron Dixon wrote:
 >> Suppose I trigger a Combine per-element (in a high-volume stream)
 and
 >> use a ParDo as a sink.
 >>
 >> I assume there is no guarantee about the order that my ParDo will
 see
 >> these 

Re: real real-time beam

2019-11-25 Thread Pablo Estrada
The blog posts on stateful and timely computation with Beam should help
clarify a lot about how to use state and timers to do this:
https://beam.apache.org/blog/2017/02/13/stateful-processing.html
https://beam.apache.org/blog/2017/08/28/timely-processing.html

You'll see there how there's an implicit per-single-element grouping for
each key, so state and timers should support your use case very well.

Best
-P.

On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz  wrote:

> If you have a pipeline that looks like Input -> GroupByKey -> ParDo, while
> it is not guaranteed, in practice the sink will observe the trigger firings
> in order (per key), since it'll be fused to the output of the GBK operation
> (in all runners I know of).
>
> There have been a couple threads about trigger ordering as well on the
> list recently that might have more information:
>
> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>
> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>
>
> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon  wrote:
>
>> @Jan @Pablo Thank you
>>
>> @Pablo In this case it's a single global windowed Combine/perKey,
>> triggered per element. Keys are few (client accounts) so they can live
>> forever.
>>
>> It looks like just by virtue of using a stateful ParDo I could get this
>> final execution to be "serialized" per key. (Then I could simply do the
>> compare-and-swap using Beam's state mechanism to keep track of the "latest
>> trigger timestamp" instead of having to orchestrate compare-and-swap in the
>> target store :thinking:.)
>>
>>
>>
>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský  wrote:
>>
>>> One addition, to make the list of options exhaustive, there is probably
>>> one more option
>>>
>>>   c) create a ParDo keyed by primary key of your sink, cache the last
>>> write in there and compare it locally, without the need to query the
>>> database
>>>
>>> It would still need some timer to clear values after watermark + allowed
>>> lateness, because otherwise you would have to cache your whole database
>>> on workers. But because you don't need actual ordering, you just need
>>> the most recent value (if I got it right) this might be an option.
>>>
>>> Jan
>>>
>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>> > Hi Aaron,
>>> >
>>> > maybe someone else will give another option, but if I understand
>>> > correctly what you want to solve, then you essentially have to do
>>> either:
>>> >
>>> >  a) use the compare & swap mechanism in the sink you described
>>> >
>>> >  b) use a buffer to buffer elements inside the outputting ParDo and
>>> > only output them when watermark passes (using a timer).
>>> >
>>> > There is actually an ongoing discussion about how to make option b)
>>> > user-friendly and part of Beam itself, but currently there is no
>>> > out-of-the-box solution for that.
>>> >
>>> > Jan
>>> >
>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>> >> Suppose I trigger a Combine per-element (in a high-volume stream) and
>>> >> use a ParDo as a sink.
>>> >>
>>> >> I assume there is no guarantee about the order that my ParDo will see
>>> >> these triggers, especially as it processes in parallel, anyway.
>>> >>
>>> >> That said, my sink writes to a db or cache and I would not like the
>>> >> cache to ever regress its value to something "before" what it has
>>> >> already written.
>>> >>
>>> >> Is the best way to solve this problem to always write the event-time
>>> >> in the cache and do a compare-and-swap only updating the sink if the
>>> >> triggered value in-hand is later than the target value?
>>> >>
>>> >> Or is there a better way to guarantee that my ParDo sink will process
>>> >> elements in-order? (Eg, if I can give up per-event/real-time, then a
>>> >> delay-based trigger would probably be sufficient I imagine.)
>>> >>
>>> >> Thanks for advice!
>>>
>>


Re: real real-time beam

2019-11-25 Thread Steve Niemitz
If you have a pipeline that looks like Input -> GroupByKey -> ParDo, while
it is not guaranteed, in practice the sink will observe the trigger firings
in order (per key), since it'll be fused to the output of the GBK operation
(in all runners I know of).

There have been a couple threads about trigger ordering as well on the list
recently that might have more information:
https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E


On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon  wrote:

> @Jan @Pablo Thank you
>
> @Pablo In this case it's a single global windowed Combine/perKey,
> triggered per element. Keys are few (client accounts) so they can live
> forever.
>
> It looks like just by virtue of using a stateful ParDo I could get this
> final execution to be "serialized" per key. (Then I could simply do the
> compare-and-swap using Beam's state mechanism to keep track of the "latest
> trigger timestamp" instead of having to orchestrate compare-and-swap in the
> target store :thinking:.)
>
>
>
> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský  wrote:
>
>> One addition, to make the list of options exhaustive, there is probably
>> one more option
>>
>>   c) create a ParDo keyed by primary key of your sink, cache the last
>> write in there and compare it locally, without the need to query the
>> database
>>
>> It would still need some timer to clear values after watermark + allowed
>> lateness, because otherwise you would have to cache your whole database
>> on workers. But because you don't need actual ordering, you just need
>> the most recent value (if I got it right) this might be an option.
>>
>> Jan
>>
>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>> > Hi Aaron,
>> >
>> > maybe someone else will give another option, but if I understand
>> > correctly what you want to solve, then you essentially have to do
>> either:
>> >
>> >  a) use the compare & swap mechanism in the sink you described
>> >
>> >  b) use a buffer to buffer elements inside the outputting ParDo and
>> > only output them when watermark passes (using a timer).
>> >
>> > There is actually an ongoing discussion about how to make option b)
>> > user-friendly and part of Beam itself, but currently there is no
>> > out-of-the-box solution for that.
>> >
>> > Jan
>> >
>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>> >> Suppose I trigger a Combine per-element (in a high-volume stream) and
>> >> use a ParDo as a sink.
>> >>
>> >> I assume there is no guarantee about the order that my ParDo will see
>> >> these triggers, especially as it processes in parallel, anyway.
>> >>
>> >> That said, my sink writes to a db or cache and I would not like the
>> >> cache to ever regress its value to something "before" what it has
>> >> already written.
>> >>
>> >> Is the best way to solve this problem to always write the event-time
>> >> in the cache and do a compare-and-swap only updating the sink if the
>> >> triggered value in-hand is later than the target value?
>> >>
>> >> Or is there a better way to guarantee that my ParDo sink will process
>> >> elements in-order? (Eg, if I can give up per-event/real-time, then a
>> >> delay-based trigger would probably be sufficient I imagine.)
>> >>
>> >> Thanks for advice!
>>
>


Re: real real-time beam

2019-11-25 Thread Aaron Dixon
@Jan @Pablo Thank you

@Pablo In this case it's a single global windowed Combine/perKey, triggered
per element. Keys are few (client accounts) so they can live forever.

It looks like just by virtue of using a stateful ParDo I could get this
final execution to be "serialized" per key. (Then I could simply do the
compare-and-swap using Beam's state mechanism to keep track of the "latest
trigger timestamp" instead of having to orchestrate compare-and-swap in the
target store :thinking:.)



On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský  wrote:

> One addition, to make the list of options exhaustive, there is probably
> one more option
>
>   c) create a ParDo keyed by primary key of your sink, cache the last
> write in there and compare it locally, without the need to query the
> database
>
> It would still need some timer to clear values after watermark + allowed
> lateness, because otherwise you would have to cache your whole database
> on workers. But because you don't need actual ordering, you just need
> the most recent value (if I got it right) this might be an option.
>
> Jan
>
> On 11/25/19 10:53 PM, Jan Lukavský wrote:
> > Hi Aaron,
> >
> > maybe someone else will give another option, but if I understand
> > correctly what you want to solve, then you essentially have to do either:
> >
> >  a) use the compare & swap mechanism in the sink you described
> >
> >  b) use a buffer to buffer elements inside the outputting ParDo and
> > only output them when watermark passes (using a timer).
> >
> > There is actually an ongoing discussion about how to make option b)
> > user-friendly and part of Beam itself, but currently there is no
> > out-of-the-box solution for that.
> >
> > Jan
> >
> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
> >> Suppose I trigger a Combine per-element (in a high-volume stream) and
> >> use a ParDo as a sink.
> >>
> >> I assume there is no guarantee about the order that my ParDo will see
> >> these triggers, especially as it processes in parallel, anyway.
> >>
> >> That said, my sink writes to a db or cache and I would not like the
> >> cache to ever regress its value to something "before" what it has
> >> already written.
> >>
> >> Is the best way to solve this problem to always write the event-time
> >> in the cache and do a compare-and-swap only updating the sink if the
> >> triggered value in-hand is later than the target value?
> >>
> >> Or is there a better way to guarantee that my ParDo sink will process
> >> elements in-order? (Eg, if I can give up per-event/real-time, then a
> >> delay-based trigger would probably be sufficient I imagine.)
> >>
> >> Thanks for advice!
>


Re: real real-time beam

2019-11-25 Thread Jan Lukavský
One addition, to make the list of options exhaustive, there is probably 
one more option


 c) create a ParDo keyed by primary key of your sink, cache the last 
write in there and compare it locally, without the need to query the 
database


It would still need some timer to clear values after watermark + allowed 
lateness, because otherwise you would have to cache your whole database 
on workers. But because you don't need actual ordering, you just need 
the most recent value (if I got it right) this might be an option.


Jan

On 11/25/19 10:53 PM, Jan Lukavský wrote:

Hi Aaron,

maybe someone else will give another option, but if I understand 
correctly what you want to solve, then you essentially have to do either:


 a) use the compare & swap mechanism in the sink you described

 b) use a buffer to buffer elements inside the outputting ParDo and 
only output them when watermark passes (using a timer).


There is actually an ongoing discussion about how to make option b) 
user-friendly and part of Beam itself, but currently there is no 
out-of-the-box solution for that.


Jan

On 11/25/19 10:27 PM, Aaron Dixon wrote:
Suppose I trigger a Combine per-element (in a high-volume stream) and 
use a ParDo as a sink.


I assume there is no guarantee about the order that my ParDo will see 
these triggers, especially as it processes in parallel, anyway.


That said, my sink writes to a db or cache and I would not like the 
cache to ever regress its value to something "before" what it has 
already written.


Is the best way to solve this problem to always write the event-time 
in the cache and do a compare-and-swap only updating the sink if the 
triggered value in-hand is later than the target value?


Or is there a better way to guarantee that my ParDo sink will process 
elements in-order? (Eg, if I can give up per-event/real-time, then a 
delay-based trigger would probably be sufficient I imagine.)


Thanks for advice!


Re: real real-time beam

2019-11-25 Thread Jan Lukavský

Hi Aaron,

maybe someone else will give another option, but if I understand 
correctly what you want to solve, then you essentially have to do either:


 a) use the compare & swap mechanism in the sink you described

 b) use a buffer to buffer elements inside the outputting ParDo and 
only output them when watermark passes (using a timer).


There is actually an ongoing discussion about how to make option b) 
user-friendly and part of Beam itself, but currently there is no 
out-of-the-box solution for that.


Jan

On 11/25/19 10:27 PM, Aaron Dixon wrote:
Suppose I trigger a Combine per-element (in a high-volume stream) and 
use a ParDo as a sink.


I assume there is no guarantee about the order that my ParDo will see 
these triggers, especially as it processes in parallel, anyway.


That said, my sink writes to a db or cache and I would not like the 
cache to ever regress its value to something "before" what it has 
already written.


Is the best way to solve this problem to always write the event-time 
in the cache and do a compare-and-swap only updating the sink if the 
triggered value in-hand is later than the target value?


Or is there a better way to guarantee that my ParDo sink will process 
elements in-order? (Eg, if I can give up per-event/real-time, then a 
delay-based trigger would probably be sufficient I imagine.)


Thanks for advice!


Re: real real-time beam

2019-11-25 Thread Pablo Estrada
If I understand correctly - your pipeline has some kind of windowing, and
on every trigger downstream of the combiner, the pipeline updates a cache
with a single, non-windowed value. Is that correct?

What are your keys for this pipeline? You could work this out with, as you
noted, a timer that fires periodically, and keeps some state with the value
that you want to update to the cache.

Is this a Python or Java pipeline? What is the runner?
Best
-P.

On Mon, Nov 25, 2019 at 1:27 PM Aaron Dixon  wrote:

> Suppose I trigger a Combine per-element (in a high-volume stream) and use
> a ParDo as a sink.
>
> I assume there is no guarantee about the order that my ParDo will see
> these triggers, especially as it processes in parallel, anyway.
>
> That said, my sink writes to a db or cache and I would not like the cache
> to ever regress its value to something "before" what it has already written.
>
> Is the best way to solve this problem to always write the event-time in
> the cache and do a compare-and-swap only updating the sink if the triggered
> value in-hand is later than the target value?
>
> Or is there a better way to guarantee that my ParDo sink will process
> elements in-order? (Eg, if I can give up per-event/real-time, then a
> delay-based trigger would probably be sufficient I imagine.)
>
> Thanks for advice!
>


real real-time beam

2019-11-25 Thread Aaron Dixon
Suppose I trigger a Combine per-element (in a high-volume stream) and use a
ParDo as a sink.

I assume there is no guarantee about the order that my ParDo will see these
triggers, especially as it processes in parallel, anyway.

That said, my sink writes to a db or cache and I would not like the cache
to ever regress its value to something "before" what it has already written.

Is the best way to solve this problem to always write the event-time in the
cache and do a compare-and-swap only updating the sink if the triggered
value in-hand is later than the target value?

Or is there a better way to guarantee that my ParDo sink will process
elements in-order? (Eg, if I can give up per-event/real-time, then a
delay-based trigger would probably be sufficient I imagine.)

Thanks for advice!