Re: outputWithTimestamp

2020-01-13 Thread Reuven Lax
peline has to contend with, even
>>> without the deprecated allowedSkew facility, no?
>>>
>>> In other words both of these pipelines are semantically, behaviorally
>>> identical. The former just had the benefit of not requiring a custom window
>>> implementation.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Jan 12, 2020 at 12:12 PM Reuven Lax  wrote:
>>>
>>>> A few comments:
>>>>
>>>> 1. Yes, this already works on Dataflow (at Beam head). Flink support is
>>>> pending at pr/10534.
>>>>
>>>> 2. Just to make sure where on the same page: getAllowedTimestampSkew is
>>>> _not_ about outputting behind the watermark. Rather it's about outputting a
>>>> timestamp that's less than the current input timestamp. If for example the
>>>> watermark is 12:00 and the current input element has a timestamp of 11:00
>>>> (because it's late), then  you can output an element at 11:00 with no need
>>>> to set this parameter. It appears that the JavaDoc is somewhat confusing on
>>>> this method.
>>>>
>>>> 3. The reason for this parameter is that the watermark only correctly
>>>> tracks timestamps internal to the pipeline if your code doesn't make
>>>> timestamps travel back in time - i.e. a ParDo taking an element with a
>>>> timestamp of 12:00 and outputting another element. If you use
>>>> getAllowedTimestampSkew your elements produced might not be tracked by the
>>>> watermark and will show up late (even if the source element is on time).
>>>> What's worse, there's a chance that the elements will be older than
>>>> allowedLateness and will get dropped altogether (this can happen even if
>>>> allowedTimestampSkew < maxAllowedLateness, because the input element might
>>>> already be late and you'll then output an element that has an even earlier
>>>> timestamp).
>>>>
>>>> 4. It sounds like you both want and don't want a watermark. You want
>>>> the watermark to not be held up by your input (so that your aggregations
>>>> keep triggering), but you then want to output old data which might prevent
>>>> the watermark from working properly, and might cause data to be dropped.
>>>> Have you considered instead using either triggers or timers to trigger your
>>>> aggregations? That way you don't need to wait for the watermark to advance
>>>> to the end of the window to trigger the aggregation, but the end-of-window
>>>> aggregation will still be correct.
>>>>
>>>> Reuven
>>>>
>>>> On Sun, Jan 12, 2020 at 8:23 AM Aaron Dixon  wrote:
>>>>
>>>>> Reuven thanks for your insights so far. Just wanted to press a little
>>>>> more on the deprecation question as I'm still (so far) convinced that my
>>>>> use case is quite a straightforward justification (I'm looking for
>>>>> confirmation or correction to my thinking here.) I've simplified my use
>>>>> case a bit if it helps things:
>>>>>
>>>>> Use case: "For users that login on a given calendar day, what is the
>>>>> average login time?"
>>>>>
>>>>> So I have two event types LOGIN and LOGOUT. I capture a user login
>>>>> session (using custom windowing or state api, doesn't matter) and I use 
>>>>> the
>>>>> default TimestampCombiner/END_OF_WINDOW because I want my aggregations to
>>>>> not be delayed.
>>>>>
>>>>> However per my use case requirements I must window using the LOGIN
>>>>> time. So I use outputWithTimestamp plus skew configuration to this end.
>>>>>
>>>>> Since most of my users login and logout within the same calendar day,
>>>>> I get may per-day aggregations right on time in real-time.
>>>>>
>>>>> Only for the few users that logout after the day that they login will
>>>>> I see actual late aggregations produced in which case I can leverage 
>>>>> Beam's
>>>>> various lateness configuration levers to trade completeness for storage,
>>>>> etc.
>>>>>
>>>>> This to me seems a *very* straightforward justification for my use of
>>>>> DoFn#getAllowedTimestampSkew. Should this justify not deprecating that
>>>>> facility.
>>>>>
>>>>> I realize the

Re: outputWithTimestamp

2020-01-13 Thread Aaron Dixon
at Beam head). Flink support is
>>> pending at pr/10534.
>>>
>>> 2. Just to make sure where on the same page: getAllowedTimestampSkew is
>>> _not_ about outputting behind the watermark. Rather it's about outputting a
>>> timestamp that's less than the current input timestamp. If for example the
>>> watermark is 12:00 and the current input element has a timestamp of 11:00
>>> (because it's late), then  you can output an element at 11:00 with no need
>>> to set this parameter. It appears that the JavaDoc is somewhat confusing on
>>> this method.
>>>
>>> 3. The reason for this parameter is that the watermark only correctly
>>> tracks timestamps internal to the pipeline if your code doesn't make
>>> timestamps travel back in time - i.e. a ParDo taking an element with a
>>> timestamp of 12:00 and outputting another element. If you use
>>> getAllowedTimestampSkew your elements produced might not be tracked by the
>>> watermark and will show up late (even if the source element is on time).
>>> What's worse, there's a chance that the elements will be older than
>>> allowedLateness and will get dropped altogether (this can happen even if
>>> allowedTimestampSkew < maxAllowedLateness, because the input element might
>>> already be late and you'll then output an element that has an even earlier
>>> timestamp).
>>>
>>> 4. It sounds like you both want and don't want a watermark. You want the
>>> watermark to not be held up by your input (so that your aggregations keep
>>> triggering), but you then want to output old data which might prevent the
>>> watermark from working properly, and might cause data to be dropped. Have
>>> you considered instead using either triggers or timers to trigger your
>>> aggregations? That way you don't need to wait for the watermark to advance
>>> to the end of the window to trigger the aggregation, but the end-of-window
>>> aggregation will still be correct.
>>>
>>> Reuven
>>>
>>> On Sun, Jan 12, 2020 at 8:23 AM Aaron Dixon  wrote:
>>>
>>>> Reuven thanks for your insights so far. Just wanted to press a little
>>>> more on the deprecation question as I'm still (so far) convinced that my
>>>> use case is quite a straightforward justification (I'm looking for
>>>> confirmation or correction to my thinking here.) I've simplified my use
>>>> case a bit if it helps things:
>>>>
>>>> Use case: "For users that login on a given calendar day, what is the
>>>> average login time?"
>>>>
>>>> So I have two event types LOGIN and LOGOUT. I capture a user login
>>>> session (using custom windowing or state api, doesn't matter) and I use the
>>>> default TimestampCombiner/END_OF_WINDOW because I want my aggregations to
>>>> not be delayed.
>>>>
>>>> However per my use case requirements I must window using the LOGIN
>>>> time. So I use outputWithTimestamp plus skew configuration to this end.
>>>>
>>>> Since most of my users login and logout within the same calendar day, I
>>>> get may per-day aggregations right on time in real-time.
>>>>
>>>> Only for the few users that logout after the day that they login will I
>>>> see actual late aggregations produced in which case I can leverage Beam's
>>>> various lateness configuration levers to trade completeness for storage,
>>>> etc.
>>>>
>>>> This to me seems a *very* straightforward justification for my use of
>>>> DoFn#getAllowedTimestampSkew. Should this justify not deprecating that
>>>> facility.
>>>>
>>>> I realize there are other various solutions, now and coming soon, that
>>>> involve holding the watermark -- but any solution that requires holding the
>>>> watermark means that I have to give up getting on-time aggregations at the
>>>> very end of the calendar day (window). I would much rather (and reasonably
>>>> so?) get on-time aggregations covering the majority of my users and be
>>>> happy to refine these averages when my few latent users logout in a later
>>>> day.
>>>>
>>>> In some Beam documentation [1] there is the idea of "unobservably late
>>>> data". That is, I have specific elements that are output late (behind the
>>>> watermark) but because they are guaranteed to land *within the window* and
>>>> they are therefore promoted to be on-t

Re: outputWithTimestamp

2020-01-12 Thread Reuven Lax
amp of 12:00 and outputting another element. If you use
>> getAllowedTimestampSkew your elements produced might not be tracked by the
>> watermark and will show up late (even if the source element is on time).
>> What's worse, there's a chance that the elements will be older than
>> allowedLateness and will get dropped altogether (this can happen even if
>> allowedTimestampSkew < maxAllowedLateness, because the input element might
>> already be late and you'll then output an element that has an even earlier
>> timestamp).
>>
>> 4. It sounds like you both want and don't want a watermark. You want the
>> watermark to not be held up by your input (so that your aggregations keep
>> triggering), but you then want to output old data which might prevent the
>> watermark from working properly, and might cause data to be dropped. Have
>> you considered instead using either triggers or timers to trigger your
>> aggregations? That way you don't need to wait for the watermark to advance
>> to the end of the window to trigger the aggregation, but the end-of-window
>> aggregation will still be correct.
>>
>> Reuven
>>
>> On Sun, Jan 12, 2020 at 8:23 AM Aaron Dixon  wrote:
>>
>>> Reuven thanks for your insights so far. Just wanted to press a little
>>> more on the deprecation question as I'm still (so far) convinced that my
>>> use case is quite a straightforward justification (I'm looking for
>>> confirmation or correction to my thinking here.) I've simplified my use
>>> case a bit if it helps things:
>>>
>>> Use case: "For users that login on a given calendar day, what is the
>>> average login time?"
>>>
>>> So I have two event types LOGIN and LOGOUT. I capture a user login
>>> session (using custom windowing or state api, doesn't matter) and I use the
>>> default TimestampCombiner/END_OF_WINDOW because I want my aggregations to
>>> not be delayed.
>>>
>>> However per my use case requirements I must window using the LOGIN time.
>>> So I use outputWithTimestamp plus skew configuration to this end.
>>>
>>> Since most of my users login and logout within the same calendar day, I
>>> get may per-day aggregations right on time in real-time.
>>>
>>> Only for the few users that logout after the day that they login will I
>>> see actual late aggregations produced in which case I can leverage Beam's
>>> various lateness configuration levers to trade completeness for storage,
>>> etc.
>>>
>>> This to me seems a *very* straightforward justification for my use of
>>> DoFn#getAllowedTimestampSkew. Should this justify not deprecating that
>>> facility.
>>>
>>> I realize there are other various solutions, now and coming soon, that
>>> involve holding the watermark -- but any solution that requires holding the
>>> watermark means that I have to give up getting on-time aggregations at the
>>> very end of the calendar day (window). I would much rather (and reasonably
>>> so?) get on-time aggregations covering the majority of my users and be
>>> happy to refine these averages when my few latent users logout in a later
>>> day.
>>>
>>> In some Beam documentation [1] there is the idea of "unobservably late
>>> data". That is, I have specific elements that are output late (behind the
>>> watermark) but because they are guaranteed to land *within the window* and
>>> they are therefore promoted to be on-time. This conceptualization of things
>>> seems very well-suited to my simple use case but definitely open to a
>>> different way of thinking in my approach.
>>>
>>> My main concern is that my pipeline will be leveraging a Deprecated
>>> facility (DoFn#getAllowedTimestampSkew) but I don't see other viable
>>> options (within Beam) yet.
>>>
>>> (Hope I'm not pressing too hard on this question here. I think this use
>>> case is interesting because it ...seems... to be a rather simple/distilled
>>> justification for being able to output data behind the watermark
>>> mid-stream.)
>>>
>>> [1] https://beam.apache.org/blog/2016/10/20/test-stream.html
>>>
>>>
>>> On Sat, Jan 11, 2020 at 10:10 PM Aaron Dixon  wrote:
>>>
>>>> Oh nice—that will be great—will look forward to this one! Any idea of
>>>> Dataflow will support?
>>>>
>>>> On Sat, Jan 11, 2020 at 9:07 PM Reuven Lax  wrote:
>>>>
>>>>> There is now (as of last week)

Re: outputWithTimestamp

2020-01-12 Thread Aaron Dixon
Reuven thanks -- I understand each point although I'm trying to grapple
with your concerns expressed in #3; they don't seem avoidable even w/o the
allowedSkew feature.

Considering your response I see a revision to my solution that omits using
the allowed skew configuration but as far as I can tell still has the
concerns from #3 (i.e., difficulty in reasoning about which events may be
dropped.)

My pipeline using the skew config looks like this:

(1) CustomSessionWindow
emits -> (user, login, logout) @ 
(2) ParDo
-> re-emits same tuple but w/ *login* timestamp
(requires custom allowed-skew)
(3) CalendarWindow
-> 

Instead, I can write a CustomCalendarWindow that places the tuple element
in the right window based on the *login* timestamp, avoiding the need for
the middle/skewing ParDo:

(1) CustomSessionWindow
-> (user, login, logout) @ 
(2) CustomCalendarWindow
-> <*explicitly* places element in window based on the **login** timestamp>

So the use of the ParDo was simply a way to avoid having to write a custom
window; it essentially ensures the CalendarWindow windows based on login
time.

But I don't see how your concerns in #3 are obviated by this revision.
Elements going in to the calendar window may be already late...this is
something that any (multi-stage) Beam pipeline has to contend with, even
without the deprecated allowedSkew facility, no?

In other words both of these pipelines are semantically, behaviorally
identical. The former just had the benefit of not requiring a custom window
implementation.






On Sun, Jan 12, 2020 at 12:12 PM Reuven Lax  wrote:

> A few comments:
>
> 1. Yes, this already works on Dataflow (at Beam head). Flink support is
> pending at pr/10534.
>
> 2. Just to make sure where on the same page: getAllowedTimestampSkew is
> _not_ about outputting behind the watermark. Rather it's about outputting a
> timestamp that's less than the current input timestamp. If for example the
> watermark is 12:00 and the current input element has a timestamp of 11:00
> (because it's late), then  you can output an element at 11:00 with no need
> to set this parameter. It appears that the JavaDoc is somewhat confusing on
> this method.
>
> 3. The reason for this parameter is that the watermark only correctly
> tracks timestamps internal to the pipeline if your code doesn't make
> timestamps travel back in time - i.e. a ParDo taking an element with a
> timestamp of 12:00 and outputting another element. If you use
> getAllowedTimestampSkew your elements produced might not be tracked by the
> watermark and will show up late (even if the source element is on time).
> What's worse, there's a chance that the elements will be older than
> allowedLateness and will get dropped altogether (this can happen even if
> allowedTimestampSkew < maxAllowedLateness, because the input element might
> already be late and you'll then output an element that has an even earlier
> timestamp).
>
> 4. It sounds like you both want and don't want a watermark. You want the
> watermark to not be held up by your input (so that your aggregations keep
> triggering), but you then want to output old data which might prevent the
> watermark from working properly, and might cause data to be dropped. Have
> you considered instead using either triggers or timers to trigger your
> aggregations? That way you don't need to wait for the watermark to advance
> to the end of the window to trigger the aggregation, but the end-of-window
> aggregation will still be correct.
>
> Reuven
>
> On Sun, Jan 12, 2020 at 8:23 AM Aaron Dixon  wrote:
>
>> Reuven thanks for your insights so far. Just wanted to press a little
>> more on the deprecation question as I'm still (so far) convinced that my
>> use case is quite a straightforward justification (I'm looking for
>> confirmation or correction to my thinking here.) I've simplified my use
>> case a bit if it helps things:
>>
>> Use case: "For users that login on a given calendar day, what is the
>> average login time?"
>>
>> So I have two event types LOGIN and LOGOUT. I capture a user login
>> session (using custom windowing or state api, doesn't matter) and I use the
>> default TimestampCombiner/END_OF_WINDOW because I want my aggregations to
>> not be delayed.
>>
>> However per my use case requirements I must window using the LOGIN time.
>> So I use outputWithTimestamp plus skew configuration to this end.
>>
>> Since most of my users login and logout within the same calendar day, I
>> get may per-day aggregations right on time in real-time.
>>
>> Only for the few users that logout after the day that they login will I
>> see actual late aggregations produced in which case I can leverage Beam's
>> various late

Re: outputWithTimestamp

2020-01-12 Thread Reuven Lax
A few comments:

1. Yes, this already works on Dataflow (at Beam head). Flink support is
pending at pr/10534.

2. Just to make sure where on the same page: getAllowedTimestampSkew is
_not_ about outputting behind the watermark. Rather it's about outputting a
timestamp that's less than the current input timestamp. If for example the
watermark is 12:00 and the current input element has a timestamp of 11:00
(because it's late), then  you can output an element at 11:00 with no need
to set this parameter. It appears that the JavaDoc is somewhat confusing on
this method.

3. The reason for this parameter is that the watermark only correctly
tracks timestamps internal to the pipeline if your code doesn't make
timestamps travel back in time - i.e. a ParDo taking an element with a
timestamp of 12:00 and outputting another element. If you use
getAllowedTimestampSkew your elements produced might not be tracked by the
watermark and will show up late (even if the source element is on time).
What's worse, there's a chance that the elements will be older than
allowedLateness and will get dropped altogether (this can happen even if
allowedTimestampSkew < maxAllowedLateness, because the input element might
already be late and you'll then output an element that has an even earlier
timestamp).

4. It sounds like you both want and don't want a watermark. You want the
watermark to not be held up by your input (so that your aggregations keep
triggering), but you then want to output old data which might prevent the
watermark from working properly, and might cause data to be dropped. Have
you considered instead using either triggers or timers to trigger your
aggregations? That way you don't need to wait for the watermark to advance
to the end of the window to trigger the aggregation, but the end-of-window
aggregation will still be correct.

Reuven

On Sun, Jan 12, 2020 at 8:23 AM Aaron Dixon  wrote:

> Reuven thanks for your insights so far. Just wanted to press a little more
> on the deprecation question as I'm still (so far) convinced that my use
> case is quite a straightforward justification (I'm looking for confirmation
> or correction to my thinking here.) I've simplified my use case a bit if it
> helps things:
>
> Use case: "For users that login on a given calendar day, what is the
> average login time?"
>
> So I have two event types LOGIN and LOGOUT. I capture a user login session
> (using custom windowing or state api, doesn't matter) and I use the default
> TimestampCombiner/END_OF_WINDOW because I want my aggregations to not be
> delayed.
>
> However per my use case requirements I must window using the LOGIN time.
> So I use outputWithTimestamp plus skew configuration to this end.
>
> Since most of my users login and logout within the same calendar day, I
> get may per-day aggregations right on time in real-time.
>
> Only for the few users that logout after the day that they login will I
> see actual late aggregations produced in which case I can leverage Beam's
> various lateness configuration levers to trade completeness for storage,
> etc.
>
> This to me seems a *very* straightforward justification for my use of
> DoFn#getAllowedTimestampSkew. Should this justify not deprecating that
> facility.
>
> I realize there are other various solutions, now and coming soon, that
> involve holding the watermark -- but any solution that requires holding the
> watermark means that I have to give up getting on-time aggregations at the
> very end of the calendar day (window). I would much rather (and reasonably
> so?) get on-time aggregations covering the majority of my users and be
> happy to refine these averages when my few latent users logout in a later
> day.
>
> In some Beam documentation [1] there is the idea of "unobservably late
> data". That is, I have specific elements that are output late (behind the
> watermark) but because they are guaranteed to land *within the window* and
> they are therefore promoted to be on-time. This conceptualization of things
> seems very well-suited to my simple use case but definitely open to a
> different way of thinking in my approach.
>
> My main concern is that my pipeline will be leveraging a Deprecated
> facility (DoFn#getAllowedTimestampSkew) but I don't see other viable
> options (within Beam) yet.
>
> (Hope I'm not pressing too hard on this question here. I think this use
> case is interesting because it ...seems... to be a rather simple/distilled
> justification for being able to output data behind the watermark
> mid-stream.)
>
> [1] https://beam.apache.org/blog/2016/10/20/test-stream.html
>
>
> On Sat, Jan 11, 2020 at 10:10 PM Aaron Dixon  wrote:
>
>> Oh nice—that will be great—will look forward to this one! Any idea of
>> Dataflow will support?
>>
>> 

Re: outputWithTimestamp

2020-01-12 Thread Aaron Dixon
Reuven thanks for your insights so far. Just wanted to press a little more
on the deprecation question as I'm still (so far) convinced that my use
case is quite a straightforward justification (I'm looking for confirmation
or correction to my thinking here.) I've simplified my use case a bit if it
helps things:

Use case: "For users that login on a given calendar day, what is the
average login time?"

So I have two event types LOGIN and LOGOUT. I capture a user login session
(using custom windowing or state api, doesn't matter) and I use the default
TimestampCombiner/END_OF_WINDOW because I want my aggregations to not be
delayed.

However per my use case requirements I must window using the LOGIN time. So
I use outputWithTimestamp plus skew configuration to this end.

Since most of my users login and logout within the same calendar day, I get
may per-day aggregations right on time in real-time.

Only for the few users that logout after the day that they login will I see
actual late aggregations produced in which case I can leverage Beam's
various lateness configuration levers to trade completeness for storage,
etc.

This to me seems a *very* straightforward justification for my use of
DoFn#getAllowedTimestampSkew. Should this justify not deprecating that
facility.

I realize there are other various solutions, now and coming soon, that
involve holding the watermark -- but any solution that requires holding the
watermark means that I have to give up getting on-time aggregations at the
very end of the calendar day (window). I would much rather (and reasonably
so?) get on-time aggregations covering the majority of my users and be
happy to refine these averages when my few latent users logout in a later
day.

In some Beam documentation [1] there is the idea of "unobservably late
data". That is, I have specific elements that are output late (behind the
watermark) but because they are guaranteed to land *within the window* and
they are therefore promoted to be on-time. This conceptualization of things
seems very well-suited to my simple use case but definitely open to a
different way of thinking in my approach.

My main concern is that my pipeline will be leveraging a Deprecated
facility (DoFn#getAllowedTimestampSkew) but I don't see other viable
options (within Beam) yet.

(Hope I'm not pressing too hard on this question here. I think this use
case is interesting because it ...seems... to be a rather simple/distilled
justification for being able to output data behind the watermark
mid-stream.)

[1] https://beam.apache.org/blog/2016/10/20/test-stream.html


On Sat, Jan 11, 2020 at 10:10 PM Aaron Dixon  wrote:

> Oh nice—that will be great—will look forward to this one! Any idea of
> Dataflow will support?
>
> On Sat, Jan 11, 2020 at 9:07 PM Reuven Lax  wrote:
>
>> There is now (as of last week) a way to hold back the watermark with the
>> state API (though not yet in a released version of Beam). If you set a
>> timer using withOutputTimetstamp(t), the watermark will be held to t.
>>
>> On Sat, Jan 11, 2020 at 4:15 PM Aaron Dixon  wrote:
>>
>>> Hi Reuven thanks for your quick reply
>>>
>>>  I've tried that but the drag it puts on the watermark was too
>>> intrusive. For example, -- even if just a single user among many decided to
>>> remain logged-in for a few days then the watermark holds everything else
>>> back.
>>>
>>> This was when using a custom session window. I've recently been using
>>> the State API to do my custom session tracking to avoid issues with
>>> downward merging of windows (see earlier mailing list thread) ... with the
>>> State API .. I'm not able to hold the watermark back (I think) ... but in
>>> any case, I prefer the behavior where the watermark moves forward with the
>>> upstream events and to deal with the very few straggler users by a lateness
>>> configuration.
>>>
>>> Does that make sense? So far to me this seems very reasonable (to want
>>> to keep the watermark moving and deal w/ the late events the few of which
>>> actually fall out of the window using explicit lateness configuration.)
>>>
>>> On Sat, Jan 11, 2020 at 4:57 PM Reuven Lax  wrote:
>>>
>>>> Have you looked at using
>>>> withTimestampCombiner(TimestampCombiner.EARLIEST)? This will hold the
>>>> downstream watermark back to the beginning of the window (presumably the
>>>> timestamp of the LOGIN event), so you can .call outputWithTimestamp using
>>>> the CLICK GREEN timestamp without needing to set the allowed-lateness skew.
>>>>
>>>> Reuven
>>>>
>>>> On Sat, Jan 11, 2020 at 1:50 PM Aaron Dixon  wrote:
>>>>
>>>>> I've just built a

Re: outputWithTimestamp

2020-01-11 Thread Aaron Dixon
Oh nice—that will be great—will look forward to this one! Any idea of
Dataflow will support?

On Sat, Jan 11, 2020 at 9:07 PM Reuven Lax  wrote:

> There is now (as of last week) a way to hold back the watermark with the
> state API (though not yet in a released version of Beam). If you set a
> timer using withOutputTimetstamp(t), the watermark will be held to t.
>
> On Sat, Jan 11, 2020 at 4:15 PM Aaron Dixon  wrote:
>
>> Hi Reuven thanks for your quick reply
>>
>>  I've tried that but the drag it puts on the watermark was too intrusive.
>> For example, -- even if just a single user among many decided to remain
>> logged-in for a few days then the watermark holds everything else back.
>>
>> This was when using a custom session window. I've recently been using the
>> State API to do my custom session tracking to avoid issues with downward
>> merging of windows (see earlier mailing list thread) ... with the State API
>> .. I'm not able to hold the watermark back (I think) ... but in any case, I
>> prefer the behavior where the watermark moves forward with the upstream
>> events and to deal with the very few straggler users by a lateness
>> configuration.
>>
>> Does that make sense? So far to me this seems very reasonable (to want to
>> keep the watermark moving and deal w/ the late events the few of which
>> actually fall out of the window using explicit lateness configuration.)
>>
>> On Sat, Jan 11, 2020 at 4:57 PM Reuven Lax  wrote:
>>
>>> Have you looked at using
>>> withTimestampCombiner(TimestampCombiner.EARLIEST)? This will hold the
>>> downstream watermark back to the beginning of the window (presumably the
>>> timestamp of the LOGIN event), so you can .call outputWithTimestamp using
>>> the CLICK GREEN timestamp without needing to set the allowed-lateness skew.
>>>
>>> Reuven
>>>
>>> On Sat, Jan 11, 2020 at 1:50 PM Aaron Dixon  wrote:
>>>
>>>> I've just built a pipeline in Beam and after exploring several options
>>>> for my use case, I've ended up relying on the deprecated
>>>> .outputWithTimestamp() + DoFn#getAllowedTimestampSkew in what seems to me a
>>>> quite valid use case. So I suppose this is a vote for un-deprecating this
>>>> API (or a teachable moment in which I could be pointed to a more suitable
>>>> non-deprecated approach.)
>>>>
>>>> I'll stick with a previously simplification of my use case:
>>>>
>>>> I get these events from my users:
>>>> LOGIN
>>>> CLICK GREEN BUTTON
>>>> LOGOUT
>>>>
>>>> I capture user session duration (logout time *minus* login time) and I
>>>> want to perform a PER DAY average (i.e., my window is on CalendarDays) BUT
>>>> where the aggregation's timestamp is the time of the CLICK GREEN event.
>>>>
>>>> So once I calculate and emit a single user's session duration I need to
>>>> .outputWithTimestamp using the CLICK GREEN event's timestamp. This
>>>> involves, of course, outputting with a timestamp *before* the watermark.
>>>>
>>>> In most cases my users LOGOUT in the same day as the CLICK GREEN BUTTON
>>>> event, so even though I'm typically outputting a timestamp before the
>>>> watermark the CalendarDay window is not yet closed and so most user session
>>>> duration's do not affect a late aggregation for that CalendarDay.
>>>>
>>>> Only when a LOGOUT occurs on a day later than the CLICK GREEN event do
>>>> I have to contend with potentially late data contributing back to a prior
>>>> CalendarDay.
>>>>
>>>> In any case, I have .withAllowedLateness to allow me to make a call
>>>> here about what I'm willing tradeoff (keeping windows open vs. dropping
>>>> data for users with overly long sessions), etc.
>>>>
>>>> This here seems to be a simple scenario (it is effectively my
>>>> real-world scenario) and the
>>>> .outputWithTimestamp + DoFn#getAllowedTimestampSkew seem to cover it in a
>>>> straightforward, effective way.
>>>>
>>>> However of course I don't like building production code on deprecated
>>>> capabilities -- so advice on alternatives (or perhaps a reconsideration of
>>>> this deprecation :) ) would be appreciated.
>>>>
>>>>


Re: outputWithTimestamp

2020-01-11 Thread Reuven Lax
There is now (as of last week) a way to hold back the watermark with the
state API (though not yet in a released version of Beam). If you set a
timer using withOutputTimetstamp(t), the watermark will be held to t.

On Sat, Jan 11, 2020 at 4:15 PM Aaron Dixon  wrote:

> Hi Reuven thanks for your quick reply
>
>  I've tried that but the drag it puts on the watermark was too intrusive.
> For example, -- even if just a single user among many decided to remain
> logged-in for a few days then the watermark holds everything else back.
>
> This was when using a custom session window. I've recently been using the
> State API to do my custom session tracking to avoid issues with downward
> merging of windows (see earlier mailing list thread) ... with the State API
> .. I'm not able to hold the watermark back (I think) ... but in any case, I
> prefer the behavior where the watermark moves forward with the upstream
> events and to deal with the very few straggler users by a lateness
> configuration.
>
> Does that make sense? So far to me this seems very reasonable (to want to
> keep the watermark moving and deal w/ the late events the few of which
> actually fall out of the window using explicit lateness configuration.)
>
> On Sat, Jan 11, 2020 at 4:57 PM Reuven Lax  wrote:
>
>> Have you looked at using
>> withTimestampCombiner(TimestampCombiner.EARLIEST)? This will hold the
>> downstream watermark back to the beginning of the window (presumably the
>> timestamp of the LOGIN event), so you can .call outputWithTimestamp using
>> the CLICK GREEN timestamp without needing to set the allowed-lateness skew.
>>
>> Reuven
>>
>> On Sat, Jan 11, 2020 at 1:50 PM Aaron Dixon  wrote:
>>
>>> I've just built a pipeline in Beam and after exploring several options
>>> for my use case, I've ended up relying on the deprecated
>>> .outputWithTimestamp() + DoFn#getAllowedTimestampSkew in what seems to me a
>>> quite valid use case. So I suppose this is a vote for un-deprecating this
>>> API (or a teachable moment in which I could be pointed to a more suitable
>>> non-deprecated approach.)
>>>
>>> I'll stick with a previously simplification of my use case:
>>>
>>> I get these events from my users:
>>> LOGIN
>>> CLICK GREEN BUTTON
>>> LOGOUT
>>>
>>> I capture user session duration (logout time *minus* login time) and I
>>> want to perform a PER DAY average (i.e., my window is on CalendarDays) BUT
>>> where the aggregation's timestamp is the time of the CLICK GREEN event.
>>>
>>> So once I calculate and emit a single user's session duration I need to
>>> .outputWithTimestamp using the CLICK GREEN event's timestamp. This
>>> involves, of course, outputting with a timestamp *before* the watermark.
>>>
>>> In most cases my users LOGOUT in the same day as the CLICK GREEN BUTTON
>>> event, so even though I'm typically outputting a timestamp before the
>>> watermark the CalendarDay window is not yet closed and so most user session
>>> duration's do not affect a late aggregation for that CalendarDay.
>>>
>>> Only when a LOGOUT occurs on a day later than the CLICK GREEN event do I
>>> have to contend with potentially late data contributing back to a prior
>>> CalendarDay.
>>>
>>> In any case, I have .withAllowedLateness to allow me to make a call here
>>> about what I'm willing tradeoff (keeping windows open vs. dropping data for
>>> users with overly long sessions), etc.
>>>
>>> This here seems to be a simple scenario (it is effectively my real-world
>>> scenario) and the .outputWithTimestamp + DoFn#getAllowedTimestampSkew seem
>>> to cover it in a straightforward, effective way.
>>>
>>> However of course I don't like building production code on deprecated
>>> capabilities -- so advice on alternatives (or perhaps a reconsideration of
>>> this deprecation :) ) would be appreciated.
>>>
>>>


Re: outputWithTimestamp

2020-01-11 Thread Aaron Dixon
Hi Reuven thanks for your quick reply

 I've tried that but the drag it puts on the watermark was too intrusive.
For example, -- even if just a single user among many decided to remain
logged-in for a few days then the watermark holds everything else back.

This was when using a custom session window. I've recently been using the
State API to do my custom session tracking to avoid issues with downward
merging of windows (see earlier mailing list thread) ... with the State API
.. I'm not able to hold the watermark back (I think) ... but in any case, I
prefer the behavior where the watermark moves forward with the upstream
events and to deal with the very few straggler users by a lateness
configuration.

Does that make sense? So far to me this seems very reasonable (to want to
keep the watermark moving and deal w/ the late events the few of which
actually fall out of the window using explicit lateness configuration.)

On Sat, Jan 11, 2020 at 4:57 PM Reuven Lax  wrote:

> Have you looked at using
> withTimestampCombiner(TimestampCombiner.EARLIEST)? This will hold the
> downstream watermark back to the beginning of the window (presumably the
> timestamp of the LOGIN event), so you can .call outputWithTimestamp using
> the CLICK GREEN timestamp without needing to set the allowed-lateness skew.
>
> Reuven
>
> On Sat, Jan 11, 2020 at 1:50 PM Aaron Dixon  wrote:
>
>> I've just built a pipeline in Beam and after exploring several options
>> for my use case, I've ended up relying on the deprecated
>> .outputWithTimestamp() + DoFn#getAllowedTimestampSkew in what seems to me a
>> quite valid use case. So I suppose this is a vote for un-deprecating this
>> API (or a teachable moment in which I could be pointed to a more suitable
>> non-deprecated approach.)
>>
>> I'll stick with a previously simplification of my use case:
>>
>> I get these events from my users:
>> LOGIN
>> CLICK GREEN BUTTON
>> LOGOUT
>>
>> I capture user session duration (logout time *minus* login time) and I
>> want to perform a PER DAY average (i.e., my window is on CalendarDays) BUT
>> where the aggregation's timestamp is the time of the CLICK GREEN event.
>>
>> So once I calculate and emit a single user's session duration I need to
>> .outputWithTimestamp using the CLICK GREEN event's timestamp. This
>> involves, of course, outputting with a timestamp *before* the watermark.
>>
>> In most cases my users LOGOUT in the same day as the CLICK GREEN BUTTON
>> event, so even though I'm typically outputting a timestamp before the
>> watermark the CalendarDay window is not yet closed and so most user session
>> duration's do not affect a late aggregation for that CalendarDay.
>>
>> Only when a LOGOUT occurs on a day later than the CLICK GREEN event do I
>> have to contend with potentially late data contributing back to a prior
>> CalendarDay.
>>
>> In any case, I have .withAllowedLateness to allow me to make a call here
>> about what I'm willing tradeoff (keeping windows open vs. dropping data for
>> users with overly long sessions), etc.
>>
>> This here seems to be a simple scenario (it is effectively my real-world
>> scenario) and the .outputWithTimestamp + DoFn#getAllowedTimestampSkew seem
>> to cover it in a straightforward, effective way.
>>
>> However of course I don't like building production code on deprecated
>> capabilities -- so advice on alternatives (or perhaps a reconsideration of
>> this deprecation :) ) would be appreciated.
>>
>>


Re: outputWithTimestamp

2020-01-11 Thread Reuven Lax
Have you looked at using withTimestampCombiner(TimestampCombiner.EARLIEST)?
This will hold the downstream watermark back to the beginning of the window
(presumably the timestamp of the LOGIN event), so you can .call
outputWithTimestamp using the CLICK GREEN timestamp without needing to set
the allowed-lateness skew.

Reuven

On Sat, Jan 11, 2020 at 1:50 PM Aaron Dixon  wrote:

> I've just built a pipeline in Beam and after exploring several options for
> my use case, I've ended up relying on the deprecated
> .outputWithTimestamp() + DoFn#getAllowedTimestampSkew in what seems to me a
> quite valid use case. So I suppose this is a vote for un-deprecating this
> API (or a teachable moment in which I could be pointed to a more suitable
> non-deprecated approach.)
>
> I'll stick with a previously simplification of my use case:
>
> I get these events from my users:
> LOGIN
> CLICK GREEN BUTTON
> LOGOUT
>
> I capture user session duration (logout time *minus* login time) and I
> want to perform a PER DAY average (i.e., my window is on CalendarDays) BUT
> where the aggregation's timestamp is the time of the CLICK GREEN event.
>
> So once I calculate and emit a single user's session duration I need to
> .outputWithTimestamp using the CLICK GREEN event's timestamp. This
> involves, of course, outputting with a timestamp *before* the watermark.
>
> In most cases my users LOGOUT in the same day as the CLICK GREEN BUTTON
> event, so even though I'm typically outputting a timestamp before the
> watermark the CalendarDay window is not yet closed and so most user session
> duration's do not affect a late aggregation for that CalendarDay.
>
> Only when a LOGOUT occurs on a day later than the CLICK GREEN event do I
> have to contend with potentially late data contributing back to a prior
> CalendarDay.
>
> In any case, I have .withAllowedLateness to allow me to make a call here
> about what I'm willing tradeoff (keeping windows open vs. dropping data for
> users with overly long sessions), etc.
>
> This here seems to be a simple scenario (it is effectively my real-world
> scenario) and the .outputWithTimestamp + DoFn#getAllowedTimestampSkew seem
> to cover it in a straightforward, effective way.
>
> However of course I don't like building production code on deprecated
> capabilities -- so advice on alternatives (or perhaps a reconsideration of
> this deprecation :) ) would be appreciated.
>
>


outputWithTimestamp

2020-01-11 Thread Aaron Dixon
I've just built a pipeline in Beam and after exploring several options for
my use case, I've ended up relying on the deprecated
.outputWithTimestamp() + DoFn#getAllowedTimestampSkew in what seems to me a
quite valid use case. So I suppose this is a vote for un-deprecating this
API (or a teachable moment in which I could be pointed to a more suitable
non-deprecated approach.)

I'll stick with a previously simplification of my use case:

I get these events from my users:
LOGIN
CLICK GREEN BUTTON
LOGOUT

I capture user session duration (logout time *minus* login time) and I want
to perform a PER DAY average (i.e., my window is on CalendarDays) BUT where
the aggregation's timestamp is the time of the CLICK GREEN event.

So once I calculate and emit a single user's session duration I need to
.outputWithTimestamp using the CLICK GREEN event's timestamp. This
involves, of course, outputting with a timestamp *before* the watermark.

In most cases my users LOGOUT in the same day as the CLICK GREEN BUTTON
event, so even though I'm typically outputting a timestamp before the
watermark the CalendarDay window is not yet closed and so most user session
duration's do not affect a late aggregation for that CalendarDay.

Only when a LOGOUT occurs on a day later than the CLICK GREEN event do I
have to contend with potentially late data contributing back to a prior
CalendarDay.

In any case, I have .withAllowedLateness to allow me to make a call here
about what I'm willing tradeoff (keeping windows open vs. dropping data for
users with overly long sessions), etc.

This here seems to be a simple scenario (it is effectively my real-world
scenario) and the .outputWithTimestamp + DoFn#getAllowedTimestampSkew seem
to cover it in a straightforward, effective way.

However of course I don't like building production code on deprecated
capabilities -- so advice on alternatives (or perhaps a reconsideration of
this deprecation :) ) would be appreciated.