Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-29 Thread Leah Thomas
Thanks for the nits Matthias, I've updated the examples and language
accordingly.

Leah

On Tue, Jul 28, 2020 at 6:43 PM Matthias J. Sax  wrote:

> Thanks Leah. Overall LGTM.
>
> A few nits:
>
> - the first figure shows window [9,19] but the window is not aligned
> properly (it should be 1ms to the right; right now, it's aligned to
> window [8,18])
>
> - in the second figure, a hopping window would create more windows, ie,
> the first window would be [-6,14) and the last window would be [19,29),
> thus it's not just 10 windows but 26 windows (if I did not miss count)
>
> - "Two new windows will be created by the late record"
>
> late -> out-of-order
>
>
> -Matthias
>
>
>
> On 7/28/20 4:34 PM, Sophie Blee-Goldman wrote:
> > Thanks for the update Leah -- I think that all makes sense.
> >
> > Cheers,
> > Sophie
> >
> > On Tue, Jul 28, 2020 at 3:55 PM Leah Thomas 
> wrote:
> >
> >> Another minor tweak, instead of defining the window by the *size*, it
> will
> >> be defined by *timeDifference*, which is the maximum time difference
> >> between two events. This is a more precise way to define a window due to
> >> its inclusive ends, while allowing the user to create the window they
> >> expect. This definition fits with the current examples, where a record
> at
> >> *10* would fall into a window of time difference *5* from *[5,10]*. This
> >> window contains any records at 5, 6, 7, 8, 9, and 10, which is 6
> instances
> >> instead of 5. This semantic difference is why I've shifted *size* to
> >> *timeDifference*.
> >>
> >> The new builder will be *withTimeDifferenceAndGrace*, keeping with other
> >> conventions.
> >>
> >> Let me know if there are any concerns! The vote thread is open as well
> >> here:
> >> http://mail-archives.apache.org/mod_mbox/kafka-dev/202007.mbox/browser
> >>
> >> Best,
> >> Leah
> >>
> >> On Mon, Jul 27, 2020 at 3:58 PM Leah Thomas 
> wrote:
> >>
> >>> A small tweak - to make it more clear to users that grace is required,
> as
> >>> well as cleaning up some of the confusing grammar semantics of windows,
> >> the
> >>> main builder method for *slidingWindows* will be *withSizeAndGrace*
> >> instead
> >>> of *of*.  This looks like it'll be the last change (for now) on the
> >>> public API. If anyone has any comments or thoughts let me know.
> >> Otherwise,
> >>> I'll take this to vote shortly.
> >>>
> >>> Best,
> >>> Leah
> >>>
> >>> On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas 
> >> wrote:
> >>>
>  To accommodate the change to a final class, I've added another
>  *windowedBy()* function in *CogroupedKStream.java *to handle
>  SlidingWindows.
> 
>  As far as the discussion goes, I think this is the last change we've
>  talked about. If anyone has other comments or concerns, please let me
> >> know!
> 
>  Cheers,
>  Leah
> 
>  On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas 
> >> wrote:
> 
> > Thanks for the discussion about extending TimeWindows. I agree that
> > making it future proof is important, and the implementation of
> > SlidingWindows is unique enough that it seems logical to make it its
> >> own
> > final class.
> >
> > On that note, I've updated the KIP to make SlidingWindows a stand
> alone
> > final class, and added the *windowedBy() *API in *KGroupedStream *to
> > handle SlidingWindows. It seems that SlidingWindows would still be
> >> able to
> > leverage *TimeWindowedKStream by* creating a SlidingWindows version
> of
> > *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If
> > anyone sees issues with this implementation, please let me know.
> >
> > Best,
> > Leah
> >
> > On Wed, Jul 22, 2020 at 10:47 PM John Roesler 
> > wrote:
> >
> >> Thanks for the reply, Sophie.
> >>
> >> That all sounds about right to me.
> >>
> >> The Windows “interface”/algorithm is quite flexible, so it makes
> sense
> >> for it to be extensible. Different implementations can (and do)
> >> enumerate
> >> different windows to suit different use cases.
> >>
> >> On the other hand, I can’t think of any way to extend SessionWindows
> >> to
> >> do something different using the same algorithm, so it makes sense
> >> for it
> >> to stay final.
> >>
> >> If we think SlidingWindows is similarly not usefully extensible,
> then
> >> we can make it final. It’s easy to remove final later, but adding it
> >> is not
> >> possible. Or we could go the other route and just make it an
> >> interface, on
> >> general principle. Both of these choices are safe API design.
> >>
> >> Thanks again,
> >> John
> >>
> >> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:
> 
>  Users could pass in a custom `SessionWindows` as
>  long as the session algorithm works correctly for it.
> >>>
> >>>
> >>> Well not really, SessionWindows is a final class. TimeWindows is
> >> also
> 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-28 Thread Matthias J. Sax
Thanks Leah. Overall LGTM.

A few nits:

- the first figure shows window [9,19] but the window is not aligned
properly (it should be 1ms to the right; right now, it's aligned to
window [8,18])

- in the second figure, a hopping window would create more windows, ie,
the first window would be [-6,14) and the last window would be [19,29),
thus it's not just 10 windows but 26 windows (if I did not miss count)

- "Two new windows will be created by the late record"

late -> out-of-order


-Matthias



On 7/28/20 4:34 PM, Sophie Blee-Goldman wrote:
> Thanks for the update Leah -- I think that all makes sense.
> 
> Cheers,
> Sophie
> 
> On Tue, Jul 28, 2020 at 3:55 PM Leah Thomas  wrote:
> 
>> Another minor tweak, instead of defining the window by the *size*, it will
>> be defined by *timeDifference*, which is the maximum time difference
>> between two events. This is a more precise way to define a window due to
>> its inclusive ends, while allowing the user to create the window they
>> expect. This definition fits with the current examples, where a record at
>> *10* would fall into a window of time difference *5* from *[5,10]*. This
>> window contains any records at 5, 6, 7, 8, 9, and 10, which is 6 instances
>> instead of 5. This semantic difference is why I've shifted *size* to
>> *timeDifference*.
>>
>> The new builder will be *withTimeDifferenceAndGrace*, keeping with other
>> conventions.
>>
>> Let me know if there are any concerns! The vote thread is open as well
>> here:
>> http://mail-archives.apache.org/mod_mbox/kafka-dev/202007.mbox/browser
>>
>> Best,
>> Leah
>>
>> On Mon, Jul 27, 2020 at 3:58 PM Leah Thomas  wrote:
>>
>>> A small tweak - to make it more clear to users that grace is required, as
>>> well as cleaning up some of the confusing grammar semantics of windows,
>> the
>>> main builder method for *slidingWindows* will be *withSizeAndGrace*
>> instead
>>> of *of*.  This looks like it'll be the last change (for now) on the
>>> public API. If anyone has any comments or thoughts let me know.
>> Otherwise,
>>> I'll take this to vote shortly.
>>>
>>> Best,
>>> Leah
>>>
>>> On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas 
>> wrote:
>>>
 To accommodate the change to a final class, I've added another
 *windowedBy()* function in *CogroupedKStream.java *to handle
 SlidingWindows.

 As far as the discussion goes, I think this is the last change we've
 talked about. If anyone has other comments or concerns, please let me
>> know!

 Cheers,
 Leah

 On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas 
>> wrote:

> Thanks for the discussion about extending TimeWindows. I agree that
> making it future proof is important, and the implementation of
> SlidingWindows is unique enough that it seems logical to make it its
>> own
> final class.
>
> On that note, I've updated the KIP to make SlidingWindows a stand alone
> final class, and added the *windowedBy() *API in *KGroupedStream *to
> handle SlidingWindows. It seems that SlidingWindows would still be
>> able to
> leverage *TimeWindowedKStream by* creating a SlidingWindows version of
> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If
> anyone sees issues with this implementation, please let me know.
>
> Best,
> Leah
>
> On Wed, Jul 22, 2020 at 10:47 PM John Roesler 
> wrote:
>
>> Thanks for the reply, Sophie.
>>
>> That all sounds about right to me.
>>
>> The Windows “interface”/algorithm is quite flexible, so it makes sense
>> for it to be extensible. Different implementations can (and do)
>> enumerate
>> different windows to suit different use cases.
>>
>> On the other hand, I can’t think of any way to extend SessionWindows
>> to
>> do something different using the same algorithm, so it makes sense
>> for it
>> to stay final.
>>
>> If we think SlidingWindows is similarly not usefully extensible, then
>> we can make it final. It’s easy to remove final later, but adding it
>> is not
>> possible. Or we could go the other route and just make it an
>> interface, on
>> general principle. Both of these choices are safe API design.
>>
>> Thanks again,
>> John
>>
>> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:

 Users could pass in a custom `SessionWindows` as
 long as the session algorithm works correctly for it.
>>>
>>>
>>> Well not really, SessionWindows is a final class. TimeWindows is
>> also
>> a
>>> final class, so neither of these can be extended/customized. For a
>> given
>>> Windows then there would only be three (non-overlapping)
>> possibilities:
>>> either it's TimeWindows, SlidingWindows, or a custom  Windows. I
>> don't
>>> think there's any problem with determining what the user wants in
>> this case
>>> --
>>> we would just check if it's a SlidingWindows 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-28 Thread Sophie Blee-Goldman
Thanks for the update Leah -- I think that all makes sense.

Cheers,
Sophie

On Tue, Jul 28, 2020 at 3:55 PM Leah Thomas  wrote:

> Another minor tweak, instead of defining the window by the *size*, it will
> be defined by *timeDifference*, which is the maximum time difference
> between two events. This is a more precise way to define a window due to
> its inclusive ends, while allowing the user to create the window they
> expect. This definition fits with the current examples, where a record at
> *10* would fall into a window of time difference *5* from *[5,10]*. This
> window contains any records at 5, 6, 7, 8, 9, and 10, which is 6 instances
> instead of 5. This semantic difference is why I've shifted *size* to
> *timeDifference*.
>
> The new builder will be *withTimeDifferenceAndGrace*, keeping with other
> conventions.
>
> Let me know if there are any concerns! The vote thread is open as well
> here:
> http://mail-archives.apache.org/mod_mbox/kafka-dev/202007.mbox/browser
>
> Best,
> Leah
>
> On Mon, Jul 27, 2020 at 3:58 PM Leah Thomas  wrote:
>
> > A small tweak - to make it more clear to users that grace is required, as
> > well as cleaning up some of the confusing grammar semantics of windows,
> the
> > main builder method for *slidingWindows* will be *withSizeAndGrace*
> instead
> > of *of*.  This looks like it'll be the last change (for now) on the
> > public API. If anyone has any comments or thoughts let me know.
> Otherwise,
> > I'll take this to vote shortly.
> >
> > Best,
> > Leah
> >
> > On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas 
> wrote:
> >
> >> To accommodate the change to a final class, I've added another
> >> *windowedBy()* function in *CogroupedKStream.java *to handle
> >> SlidingWindows.
> >>
> >> As far as the discussion goes, I think this is the last change we've
> >> talked about. If anyone has other comments or concerns, please let me
> know!
> >>
> >> Cheers,
> >> Leah
> >>
> >> On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas 
> wrote:
> >>
> >>> Thanks for the discussion about extending TimeWindows. I agree that
> >>> making it future proof is important, and the implementation of
> >>> SlidingWindows is unique enough that it seems logical to make it its
> own
> >>> final class.
> >>>
> >>> On that note, I've updated the KIP to make SlidingWindows a stand alone
> >>> final class, and added the *windowedBy() *API in *KGroupedStream *to
> >>> handle SlidingWindows. It seems that SlidingWindows would still be
> able to
> >>> leverage *TimeWindowedKStream by* creating a SlidingWindows version of
> >>> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If
> >>> anyone sees issues with this implementation, please let me know.
> >>>
> >>> Best,
> >>> Leah
> >>>
> >>> On Wed, Jul 22, 2020 at 10:47 PM John Roesler 
> >>> wrote:
> >>>
>  Thanks for the reply, Sophie.
> 
>  That all sounds about right to me.
> 
>  The Windows “interface”/algorithm is quite flexible, so it makes sense
>  for it to be extensible. Different implementations can (and do)
> enumerate
>  different windows to suit different use cases.
> 
>  On the other hand, I can’t think of any way to extend SessionWindows
> to
>  do something different using the same algorithm, so it makes sense
> for it
>  to stay final.
> 
>  If we think SlidingWindows is similarly not usefully extensible, then
>  we can make it final. It’s easy to remove final later, but adding it
> is not
>  possible. Or we could go the other route and just make it an
> interface, on
>  general principle. Both of these choices are safe API design.
> 
>  Thanks again,
>  John
> 
>  On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:
>  > >
>  > > Users could pass in a custom `SessionWindows` as
>  > > long as the session algorithm works correctly for it.
>  >
>  >
>  > Well not really, SessionWindows is a final class. TimeWindows is
> also
>  a
>  > final class, so neither of these can be extended/customized. For a
>  given
>  > Windows then there would only be three (non-overlapping)
>  possibilities:
>  > either it's TimeWindows, SlidingWindows, or a custom  Windows. I
> don't
>  > think there's any problem with determining what the user wants in
>  this case
>  > --
>  > we would just check if it's a SlidingWindows and connect the new
>  processor,
>  > or else connect the existing hopping/tumbling window processor.
>  >
>  > I'll admit that last sentence does leave a bad taste in my mouth.
>  Part of
>  > that
>  > is probably the "leaking" API Matthias pointed out; we just assume
> the
>  > hopping/tumbling window implementation fits all custom windows. But
> I
>  guess
>  > if you really needed to customize the algorithm any further you may
>  as well
>  > stick in a transformer and do it all yourself.
>  >
>  > Anyways, given what 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-28 Thread Leah Thomas
Another minor tweak, instead of defining the window by the *size*, it will
be defined by *timeDifference*, which is the maximum time difference
between two events. This is a more precise way to define a window due to
its inclusive ends, while allowing the user to create the window they
expect. This definition fits with the current examples, where a record at
*10* would fall into a window of time difference *5* from *[5,10]*. This
window contains any records at 5, 6, 7, 8, 9, and 10, which is 6 instances
instead of 5. This semantic difference is why I've shifted *size* to
*timeDifference*.

The new builder will be *withTimeDifferenceAndGrace*, keeping with other
conventions.

Let me know if there are any concerns! The vote thread is open as well
here: http://mail-archives.apache.org/mod_mbox/kafka-dev/202007.mbox/browser

Best,
Leah

On Mon, Jul 27, 2020 at 3:58 PM Leah Thomas  wrote:

> A small tweak - to make it more clear to users that grace is required, as
> well as cleaning up some of the confusing grammar semantics of windows, the
> main builder method for *slidingWindows* will be *withSizeAndGrace* instead
> of *of*.  This looks like it'll be the last change (for now) on the
> public API. If anyone has any comments or thoughts let me know. Otherwise,
> I'll take this to vote shortly.
>
> Best,
> Leah
>
> On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas  wrote:
>
>> To accommodate the change to a final class, I've added another
>> *windowedBy()* function in *CogroupedKStream.java *to handle
>> SlidingWindows.
>>
>> As far as the discussion goes, I think this is the last change we've
>> talked about. If anyone has other comments or concerns, please let me know!
>>
>> Cheers,
>> Leah
>>
>> On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas  wrote:
>>
>>> Thanks for the discussion about extending TimeWindows. I agree that
>>> making it future proof is important, and the implementation of
>>> SlidingWindows is unique enough that it seems logical to make it its own
>>> final class.
>>>
>>> On that note, I've updated the KIP to make SlidingWindows a stand alone
>>> final class, and added the *windowedBy() *API in *KGroupedStream *to
>>> handle SlidingWindows. It seems that SlidingWindows would still be able to
>>> leverage *TimeWindowedKStream by* creating a SlidingWindows version of
>>> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If
>>> anyone sees issues with this implementation, please let me know.
>>>
>>> Best,
>>> Leah
>>>
>>> On Wed, Jul 22, 2020 at 10:47 PM John Roesler 
>>> wrote:
>>>
 Thanks for the reply, Sophie.

 That all sounds about right to me.

 The Windows “interface”/algorithm is quite flexible, so it makes sense
 for it to be extensible. Different implementations can (and do) enumerate
 different windows to suit different use cases.

 On the other hand, I can’t think of any way to extend SessionWindows to
 do something different using the same algorithm, so it makes sense for it
 to stay final.

 If we think SlidingWindows is similarly not usefully extensible, then
 we can make it final. It’s easy to remove final later, but adding it is not
 possible. Or we could go the other route and just make it an interface, on
 general principle. Both of these choices are safe API design.

 Thanks again,
 John

 On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:
 > >
 > > Users could pass in a custom `SessionWindows` as
 > > long as the session algorithm works correctly for it.
 >
 >
 > Well not really, SessionWindows is a final class. TimeWindows is also
 a
 > final class, so neither of these can be extended/customized. For a
 given
 > Windows then there would only be three (non-overlapping)
 possibilities:
 > either it's TimeWindows, SlidingWindows, or a custom  Windows. I don't
 > think there's any problem with determining what the user wants in
 this case
 > --
 > we would just check if it's a SlidingWindows and connect the new
 processor,
 > or else connect the existing hopping/tumbling window processor.
 >
 > I'll admit that last sentence does leave a bad taste in my mouth.
 Part of
 > that
 > is probably the "leaking" API Matthias pointed out; we just assume the
 > hopping/tumbling window implementation fits all custom windows. But I
 guess
 > if you really needed to customize the algorithm any further you may
 as well
 > stick in a transformer and do it all yourself.
 >
 > Anyways, given what we have, it does seem weird to apply one algorithm
 > for most Windows types and then swap in a different one for one
 specific
 > extension of Windows. So adding a new #windowedBy(SlidingWindows)
 > sounds reasonable to me.
 >
 > I'm still not convinced that we need a whole new TimeWindowedKStream
 > equivalent class for sliding windows though. It seems like 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-27 Thread Leah Thomas
A small tweak - to make it more clear to users that grace is required, as
well as cleaning up some of the confusing grammar semantics of windows, the
main builder method for *slidingWindows* will be *withSizeAndGrace* instead
of *of*.  This looks like it'll be the last change (for now) on the public
API. If anyone has any comments or thoughts let me know. Otherwise, I'll
take this to vote shortly.

Best,
Leah

On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas  wrote:

> To accommodate the change to a final class, I've added another
> *windowedBy()* function in *CogroupedKStream.java *to handle
> SlidingWindows.
>
> As far as the discussion goes, I think this is the last change we've
> talked about. If anyone has other comments or concerns, please let me know!
>
> Cheers,
> Leah
>
> On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas  wrote:
>
>> Thanks for the discussion about extending TimeWindows. I agree that
>> making it future proof is important, and the implementation of
>> SlidingWindows is unique enough that it seems logical to make it its own
>> final class.
>>
>> On that note, I've updated the KIP to make SlidingWindows a stand alone
>> final class, and added the *windowedBy() *API in *KGroupedStream *to
>> handle SlidingWindows. It seems that SlidingWindows would still be able to
>> leverage *TimeWindowedKStream by* creating a SlidingWindows version of
>> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If
>> anyone sees issues with this implementation, please let me know.
>>
>> Best,
>> Leah
>>
>> On Wed, Jul 22, 2020 at 10:47 PM John Roesler 
>> wrote:
>>
>>> Thanks for the reply, Sophie.
>>>
>>> That all sounds about right to me.
>>>
>>> The Windows “interface”/algorithm is quite flexible, so it makes sense
>>> for it to be extensible. Different implementations can (and do) enumerate
>>> different windows to suit different use cases.
>>>
>>> On the other hand, I can’t think of any way to extend SessionWindows to
>>> do something different using the same algorithm, so it makes sense for it
>>> to stay final.
>>>
>>> If we think SlidingWindows is similarly not usefully extensible, then we
>>> can make it final. It’s easy to remove final later, but adding it is not
>>> possible. Or we could go the other route and just make it an interface, on
>>> general principle. Both of these choices are safe API design.
>>>
>>> Thanks again,
>>> John
>>>
>>> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:
>>> > >
>>> > > Users could pass in a custom `SessionWindows` as
>>> > > long as the session algorithm works correctly for it.
>>> >
>>> >
>>> > Well not really, SessionWindows is a final class. TimeWindows is also a
>>> > final class, so neither of these can be extended/customized. For a
>>> given
>>> > Windows then there would only be three (non-overlapping) possibilities:
>>> > either it's TimeWindows, SlidingWindows, or a custom  Windows. I don't
>>> > think there's any problem with determining what the user wants in this
>>> case
>>> > --
>>> > we would just check if it's a SlidingWindows and connect the new
>>> processor,
>>> > or else connect the existing hopping/tumbling window processor.
>>> >
>>> > I'll admit that last sentence does leave a bad taste in my mouth. Part
>>> of
>>> > that
>>> > is probably the "leaking" API Matthias pointed out; we just assume the
>>> > hopping/tumbling window implementation fits all custom windows. But I
>>> guess
>>> > if you really needed to customize the algorithm any further you may as
>>> well
>>> > stick in a transformer and do it all yourself.
>>> >
>>> > Anyways, given what we have, it does seem weird to apply one algorithm
>>> > for most Windows types and then swap in a different one for one
>>> specific
>>> > extension of Windows. So adding a new #windowedBy(SlidingWindows)
>>> > sounds reasonable to me.
>>> >
>>> > I'm still not convinced that we need a whole new TimeWindowedKStream
>>> > equivalent class for sliding windows though. It seems like the
>>> > hopping/tumbling
>>> > window implementation could benefit just as much from a subtractor as
>>> the
>>> > sliding windows so the only future-proofing we get is the ability to be
>>> > lazy and
>>> > add the subtractor to one but not the other. Of course it would only
>>> be an
>>> > optimization so we could just not apply it to one type and nothing
>>> would
>>> > break.
>>> > It does make me nervous to go against the "future-proof" direction,
>>> though.
>>> > Are there any other examples of things we might want to add to one
>>> window
>>> > type but not to another?
>>> >
>>> > On another note, this actually brings up a new question: should
>>> > SlidingWindows
>>> > also be final? My take is "yes" since there's no reasonable
>>> customization of
>>> > sliding windows, at least not that I can think of. Thoughts?
>>> >
>>> >
>>> > On Wed, Jul 22, 2020 at 7:15 PM John Roesler 
>>> wrote:
>>> >
>>> > > Thanks, all,
>>> > >
>>> > > I can see how my conclusion was kind of a leap.
>>> > >
>>> 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-24 Thread Leah Thomas
To accommodate the change to a final class, I've added another
*windowedBy()* function in *CogroupedKStream.java *to handle SlidingWindows.

As far as the discussion goes, I think this is the last change we've talked
about. If anyone has other comments or concerns, please let me know!

Cheers,
Leah

On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas  wrote:

> Thanks for the discussion about extending TimeWindows. I agree that making
> it future proof is important, and the implementation of SlidingWindows is
> unique enough that it seems logical to make it its own final class.
>
> On that note, I've updated the KIP to make SlidingWindows a stand alone
> final class, and added the *windowedBy() *API in *KGroupedStream *to
> handle SlidingWindows. It seems that SlidingWindows would still be able to
> leverage *TimeWindowedKStream by* creating a SlidingWindows version of
> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If
> anyone sees issues with this implementation, please let me know.
>
> Best,
> Leah
>
> On Wed, Jul 22, 2020 at 10:47 PM John Roesler  wrote:
>
>> Thanks for the reply, Sophie.
>>
>> That all sounds about right to me.
>>
>> The Windows “interface”/algorithm is quite flexible, so it makes sense
>> for it to be extensible. Different implementations can (and do) enumerate
>> different windows to suit different use cases.
>>
>> On the other hand, I can’t think of any way to extend SessionWindows to
>> do something different using the same algorithm, so it makes sense for it
>> to stay final.
>>
>> If we think SlidingWindows is similarly not usefully extensible, then we
>> can make it final. It’s easy to remove final later, but adding it is not
>> possible. Or we could go the other route and just make it an interface, on
>> general principle. Both of these choices are safe API design.
>>
>> Thanks again,
>> John
>>
>> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:
>> > >
>> > > Users could pass in a custom `SessionWindows` as
>> > > long as the session algorithm works correctly for it.
>> >
>> >
>> > Well not really, SessionWindows is a final class. TimeWindows is also a
>> > final class, so neither of these can be extended/customized. For a given
>> > Windows then there would only be three (non-overlapping) possibilities:
>> > either it's TimeWindows, SlidingWindows, or a custom  Windows. I don't
>> > think there's any problem with determining what the user wants in this
>> case
>> > --
>> > we would just check if it's a SlidingWindows and connect the new
>> processor,
>> > or else connect the existing hopping/tumbling window processor.
>> >
>> > I'll admit that last sentence does leave a bad taste in my mouth. Part
>> of
>> > that
>> > is probably the "leaking" API Matthias pointed out; we just assume the
>> > hopping/tumbling window implementation fits all custom windows. But I
>> guess
>> > if you really needed to customize the algorithm any further you may as
>> well
>> > stick in a transformer and do it all yourself.
>> >
>> > Anyways, given what we have, it does seem weird to apply one algorithm
>> > for most Windows types and then swap in a different one for one specific
>> > extension of Windows. So adding a new #windowedBy(SlidingWindows)
>> > sounds reasonable to me.
>> >
>> > I'm still not convinced that we need a whole new TimeWindowedKStream
>> > equivalent class for sliding windows though. It seems like the
>> > hopping/tumbling
>> > window implementation could benefit just as much from a subtractor as
>> the
>> > sliding windows so the only future-proofing we get is the ability to be
>> > lazy and
>> > add the subtractor to one but not the other. Of course it would only be
>> an
>> > optimization so we could just not apply it to one type and nothing would
>> > break.
>> > It does make me nervous to go against the "future-proof" direction,
>> though.
>> > Are there any other examples of things we might want to add to one
>> window
>> > type but not to another?
>> >
>> > On another note, this actually brings up a new question: should
>> > SlidingWindows
>> > also be final? My take is "yes" since there's no reasonable
>> customization of
>> > sliding windows, at least not that I can think of. Thoughts?
>> >
>> >
>> > On Wed, Jul 22, 2020 at 7:15 PM John Roesler 
>> wrote:
>> >
>> > > Thanks, all,
>> > >
>> > > I can see how my conclusion was kind of a leap.
>> > >
>> > > What Matthias said is indeed what I was thinking. When you provide a
>> > > window definition to the windowBy() method, you are selecting an
>> algorithm
>> > > that will be used to compute the windows from the input data.
>> > >
>> > > I didn’t mean the code implementation  “algorithm”, but the high-level
>> > > algorithm that describes how the input stream will be transformed
>> into a
>> > > sequence of windows.
>> > >
>> > > For example, the algorithm for Windows is something like “given the
>> record
>> > > timestamp, include the record in each of the enumerated windows”.
>> Note that
>> 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-23 Thread Leah Thomas
Thanks for the discussion about extending TimeWindows. I agree that making
it future proof is important, and the implementation of SlidingWindows is
unique enough that it seems logical to make it its own final class.

On that note, I've updated the KIP to make SlidingWindows a stand alone
final class, and added the *windowedBy() *API in *KGroupedStream *to handle
SlidingWindows. It seems that SlidingWindows would still be able to
leverage *TimeWindowedKStream by* creating a SlidingWindows version of
*TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If anyone
sees issues with this implementation, please let me know.

Best,
Leah

On Wed, Jul 22, 2020 at 10:47 PM John Roesler  wrote:

> Thanks for the reply, Sophie.
>
> That all sounds about right to me.
>
> The Windows “interface”/algorithm is quite flexible, so it makes sense for
> it to be extensible. Different implementations can (and do) enumerate
> different windows to suit different use cases.
>
> On the other hand, I can’t think of any way to extend SessionWindows to do
> something different using the same algorithm, so it makes sense for it to
> stay final.
>
> If we think SlidingWindows is similarly not usefully extensible, then we
> can make it final. It’s easy to remove final later, but adding it is not
> possible. Or we could go the other route and just make it an interface, on
> general principle. Both of these choices are safe API design.
>
> Thanks again,
> John
>
> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:
> > >
> > > Users could pass in a custom `SessionWindows` as
> > > long as the session algorithm works correctly for it.
> >
> >
> > Well not really, SessionWindows is a final class. TimeWindows is also a
> > final class, so neither of these can be extended/customized. For a given
> > Windows then there would only be three (non-overlapping) possibilities:
> > either it's TimeWindows, SlidingWindows, or a custom  Windows. I don't
> > think there's any problem with determining what the user wants in this
> case
> > --
> > we would just check if it's a SlidingWindows and connect the new
> processor,
> > or else connect the existing hopping/tumbling window processor.
> >
> > I'll admit that last sentence does leave a bad taste in my mouth. Part of
> > that
> > is probably the "leaking" API Matthias pointed out; we just assume the
> > hopping/tumbling window implementation fits all custom windows. But I
> guess
> > if you really needed to customize the algorithm any further you may as
> well
> > stick in a transformer and do it all yourself.
> >
> > Anyways, given what we have, it does seem weird to apply one algorithm
> > for most Windows types and then swap in a different one for one specific
> > extension of Windows. So adding a new #windowedBy(SlidingWindows)
> > sounds reasonable to me.
> >
> > I'm still not convinced that we need a whole new TimeWindowedKStream
> > equivalent class for sliding windows though. It seems like the
> > hopping/tumbling
> > window implementation could benefit just as much from a subtractor as the
> > sliding windows so the only future-proofing we get is the ability to be
> > lazy and
> > add the subtractor to one but not the other. Of course it would only be
> an
> > optimization so we could just not apply it to one type and nothing would
> > break.
> > It does make me nervous to go against the "future-proof" direction,
> though.
> > Are there any other examples of things we might want to add to one window
> > type but not to another?
> >
> > On another note, this actually brings up a new question: should
> > SlidingWindows
> > also be final? My take is "yes" since there's no reasonable
> customization of
> > sliding windows, at least not that I can think of. Thoughts?
> >
> >
> > On Wed, Jul 22, 2020 at 7:15 PM John Roesler 
> wrote:
> >
> > > Thanks, all,
> > >
> > > I can see how my conclusion was kind of a leap.
> > >
> > > What Matthias said is indeed what I was thinking. When you provide a
> > > window definition to the windowBy() method, you are selecting an
> algorithm
> > > that will be used to compute the windows from the input data.
> > >
> > > I didn’t mean the code implementation  “algorithm”, but the high-level
> > > algorithm that describes how the input stream will be transformed into
> a
> > > sequence of windows.
> > >
> > > For example, the algorithm for Windows is something like “given the
> record
> > > timestamp, include the record in each of the enumerated windows”. Note
> that
> > > there can be a lot of variation in how the windows are enumerated,
> which is
> > > why there are at least a couple of implementations of Windows, and we
> are
> > > open to adding more (like for natural calendar boundaries).
> > >
> > > For SessionWindows, it’s more like “if any window is within the gap,
> > > extend its boundaries to include this record and if two windows touch,
> then
> > > merge them”.
> > >
> > > Clearly, the algorithm for SlidingWindows doesn’t fall into 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-22 Thread John Roesler
Thanks for the reply, Sophie. 

That all sounds about right to me.

The Windows “interface”/algorithm is quite flexible, so it makes sense for it 
to be extensible. Different implementations can (and do) enumerate different 
windows to suit different use cases. 

On the other hand, I can’t think of any way to extend SessionWindows to do 
something different using the same algorithm, so it makes sense for it to stay 
final.

If we think SlidingWindows is similarly not usefully extensible, then we can 
make it final. It’s easy to remove final later, but adding it is not possible. 
Or we could go the other route and just make it an interface, on general 
principle. Both of these choices are safe API design.

Thanks again,
John

On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:
> >
> > Users could pass in a custom `SessionWindows` as
> > long as the session algorithm works correctly for it.
> 
> 
> Well not really, SessionWindows is a final class. TimeWindows is also a
> final class, so neither of these can be extended/customized. For a given
> Windows then there would only be three (non-overlapping) possibilities:
> either it's TimeWindows, SlidingWindows, or a custom  Windows. I don't
> think there's any problem with determining what the user wants in this case
> -- 
> we would just check if it's a SlidingWindows and connect the new  processor,
> or else connect the existing hopping/tumbling window processor.
> 
> I'll admit that last sentence does leave a bad taste in my mouth. Part of
> that
> is probably the "leaking" API Matthias pointed out; we just assume the
> hopping/tumbling window implementation fits all custom windows. But I guess
> if you really needed to customize the algorithm any further you may as well
> stick in a transformer and do it all yourself.
> 
> Anyways, given what we have, it does seem weird to apply one algorithm
> for most Windows types and then swap in a different one for one specific
> extension of Windows. So adding a new #windowedBy(SlidingWindows)
> sounds reasonable to me.
> 
> I'm still not convinced that we need a whole new TimeWindowedKStream
> equivalent class for sliding windows though. It seems like the
> hopping/tumbling
> window implementation could benefit just as much from a subtractor as the
> sliding windows so the only future-proofing we get is the ability to be
> lazy and
> add the subtractor to one but not the other. Of course it would only be an
> optimization so we could just not apply it to one type and nothing would
> break.
> It does make me nervous to go against the "future-proof" direction, though.
> Are there any other examples of things we might want to add to one window
> type but not to another?
> 
> On another note, this actually brings up a new question: should
> SlidingWindows
> also be final? My take is "yes" since there's no reasonable customization of
> sliding windows, at least not that I can think of. Thoughts?
> 
> 
> On Wed, Jul 22, 2020 at 7:15 PM John Roesler  wrote:
> 
> > Thanks, all,
> >
> > I can see how my conclusion was kind of a leap.
> >
> > What Matthias said is indeed what I was thinking. When you provide a
> > window definition to the windowBy() method, you are selecting an algorithm
> > that will be used to compute the windows from the input data.
> >
> > I didn’t mean the code implementation  “algorithm”, but the high-level
> > algorithm that describes how the input stream will be transformed into a
> > sequence of windows.
> >
> > For example, the algorithm for Windows is something like “given the record
> > timestamp, include the record in each of the enumerated windows”. Note that
> > there can be a lot of variation in how the windows are enumerated, which is
> > why there are at least a couple of implementations of Windows, and we are
> > open to adding more (like for natural calendar boundaries).
> >
> > For SessionWindows, it’s more like “if any window is within the gap,
> > extend its boundaries to include this record and if two windows touch, then
> > merge them”.
> >
> > Clearly, the algorithm for SlidingWindows doesn’t fall into either
> > category, so it seems inappropriate to claim that it does in the API (by
> > implementing Windows with stubbed methods) and then cast internally to
> > execute a completely different algorithm.
> >
> > To your other observation, that the DSL object resulting from windowBy
> > would look the same for Windows and SessionWindows, maybe it makes sense
> > for windowBy(SessionWindows) also to return a TimeWindowedKStream.
> >
> > i.e.:
> > ===
> >  TimeWindowedKStream windowedBy(final Windows
> > windows);
> > TimeWindowedKStream windowedBy(final SlidingWindows windows);
> > ===
> >
> >  I can’t think of a reason this wouldn’t work. But then again, it would be
> > more future-proof to go ahead and specify a different return type now, if
> > we think we'll want to add subtractors and stuff later. I don't have a
> > strong feeling about 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-22 Thread Sophie Blee-Goldman
>
> Users could pass in a custom `SessionWindows` as
> long as the session algorithm works correctly for it.


Well not really, SessionWindows is a final class. TimeWindows is also a
final class, so neither of these can be extended/customized. For a given
Windows then there would only be three (non-overlapping) possibilities:
either it's TimeWindows, SlidingWindows, or a custom  Windows. I don't
think there's any problem with determining what the user wants in this case
-- 
we would just check if it's a SlidingWindows and connect the new  processor,
or else connect the existing hopping/tumbling window processor.

I'll admit that last sentence does leave a bad taste in my mouth. Part of
that
is probably the "leaking" API Matthias pointed out; we just assume the
hopping/tumbling window implementation fits all custom windows. But I guess
if you really needed to customize the algorithm any further you may as well
stick in a transformer and do it all yourself.

Anyways, given what we have, it does seem weird to apply one algorithm
for most Windows types and then swap in a different one for one specific
extension of Windows. So adding a new #windowedBy(SlidingWindows)
sounds reasonable to me.

I'm still not convinced that we need a whole new TimeWindowedKStream
equivalent class for sliding windows though. It seems like the
hopping/tumbling
window implementation could benefit just as much from a subtractor as the
sliding windows so the only future-proofing we get is the ability to be
lazy and
add the subtractor to one but not the other. Of course it would only be an
optimization so we could just not apply it to one type and nothing would
break.
It does make me nervous to go against the "future-proof" direction, though.
Are there any other examples of things we might want to add to one window
type but not to another?

On another note, this actually brings up a new question: should
SlidingWindows
also be final? My take is "yes" since there's no reasonable customization of
sliding windows, at least not that I can think of. Thoughts?


On Wed, Jul 22, 2020 at 7:15 PM John Roesler  wrote:

> Thanks, all,
>
> I can see how my conclusion was kind of a leap.
>
> What Matthias said is indeed what I was thinking. When you provide a
> window definition to the windowBy() method, you are selecting an algorithm
> that will be used to compute the windows from the input data.
>
> I didn’t mean the code implementation  “algorithm”, but the high-level
> algorithm that describes how the input stream will be transformed into a
> sequence of windows.
>
> For example, the algorithm for Windows is something like “given the record
> timestamp, include the record in each of the enumerated windows”. Note that
> there can be a lot of variation in how the windows are enumerated, which is
> why there are at least a couple of implementations of Windows, and we are
> open to adding more (like for natural calendar boundaries).
>
> For SessionWindows, it’s more like “if any window is within the gap,
> extend its boundaries to include this record and if two windows touch, then
> merge them”.
>
> Clearly, the algorithm for SlidingWindows doesn’t fall into either
> category, so it seems inappropriate to claim that it does in the API (by
> implementing Windows with stubbed methods) and then cast internally to
> execute a completely different algorithm.
>
> To your other observation, that the DSL object resulting from windowBy
> would look the same for Windows and SessionWindows, maybe it makes sense
> for windowBy(SessionWindows) also to return a TimeWindowedKStream.
>
> i.e.:
> ===
>  TimeWindowedKStream windowedBy(final Windows
> windows);
> TimeWindowedKStream windowedBy(final SlidingWindows windows);
> ===
>
>  I can’t think of a reason this wouldn’t work. But then again, it would be
> more future-proof to go ahead and specify a different return type now, if
> we think we'll want to add subtractors and stuff later. I don't have a
> strong feeling about that part of the API. It seems to be independent of
> whether SlidingWindows extends Windows or not.
>
> Thanks,
> -John
>
> On Wed, Jul 22, 2020, at 19:41, Matthias J. Sax wrote:
> > I think what John tries to say is the following:
> >
> > We have `windowedBy(Windows)` that accept hopping/tumbling windows but
> > also custom window and we use a specific algorithm. Note, that custom
> > windows must "work" based on the used algorithm.
> >
> > For session windows we have `windowedBy(SessionWindows)` and apply a
> > different algorithm. Users could pass in a custom `SessionWindows` as
> > long as the session algorithm works correctly for it.
> >
> > For the new sliding windows, we want to use a different algorithm
> > compare to hopping/tumbling windows. If we let sliding window extend
> > `Windows`, we can decide at runtime if we need to use the
> > hopping/tumbling window algorithm for hopping/tumbling windows or the
> > new sliding window algorithm for 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-22 Thread John Roesler
Thanks, all,

I can see how my conclusion was kind of a leap.

What Matthias said is indeed what I was thinking. When you provide a window 
definition to the windowBy() method, you are selecting an algorithm that will 
be used to compute the windows from the input data.

I didn’t mean the code implementation  “algorithm”, but the high-level 
algorithm that describes how the input stream will be transformed into a 
sequence of windows.

For example, the algorithm for Windows is something like “given the record 
timestamp, include the record in each of the enumerated windows”. Note that 
there can be a lot of variation in how the windows are enumerated, which is why 
there are at least a couple of implementations of Windows, and we are open to 
adding more (like for natural calendar boundaries).

For SessionWindows, it’s more like “if any window is within the gap, extend its 
boundaries to include this record and if two windows touch, then merge them”.

Clearly, the algorithm for SlidingWindows doesn’t fall into either category, so 
it seems inappropriate to claim that it does in the API (by implementing 
Windows with stubbed methods) and then cast internally to execute a completely 
different algorithm.

To your other observation, that the DSL object resulting from windowBy would 
look the same for Windows and SessionWindows, maybe it makes sense for 
windowBy(SessionWindows) also to return a TimeWindowedKStream.

i.e.:
===
 TimeWindowedKStream windowedBy(final Windows 
windows);
TimeWindowedKStream windowedBy(final SlidingWindows windows);
===

 I can’t think of a reason this wouldn’t work. But then again, it would be more 
future-proof to go ahead and specify a different return type now, if we think 
we'll want to add subtractors and stuff later. I don't have a strong feeling 
about that part of the API. It seems to be independent of whether 
SlidingWindows extends Windows or not.

Thanks,
-John

On Wed, Jul 22, 2020, at 19:41, Matthias J. Sax wrote:
> I think what John tries to say is the following:
> 
> We have `windowedBy(Windows)` that accept hopping/tumbling windows but
> also custom window and we use a specific algorithm. Note, that custom
> windows must "work" based on the used algorithm.
> 
> For session windows we have `windowedBy(SessionWindows)` and apply a
> different algorithm. Users could pass in a custom `SessionWindows` as
> long as the session algorithm works correctly for it.
> 
> For the new sliding windows, we want to use a different algorithm
> compare to hopping/tumbling windows. If we let sliding window extend
> `Windows`, we can decide at runtime if we need to use the
> hopping/tumbling window algorithm for hopping/tumbling windows or the
> new sliding window algorithm for sliding windows. However, if we get a
> custom window, which algorithm do we pick now? The existing
> tumbling/hopping window algorithm of the new sliding window algorithm?
> Both a custom "time-window" and custom "sliding window" implement the
> generic `Windows` class and thus we cannot make a decision as we don't
> know the user's intent.
> 
> As a matter of fact, even if the user might not be aware of it, the
> algorithm we use does already leak into the API (if a user extends
> `Windows` is must work with our hopping/tumbling window algorithm and if
> a user extends `SessionWindows` it must work with our session algorithm)
> and it seems we need to preserve this property for sliding window.
> 
> 
> -Matthias
> 
> On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote:
> > Hey John,
> > 
> > Just a few follow-up questions/comments about the whole Windows thing:
> > 
> > That's a good way of looking at things; in particular the point about
> > SessionWindows
> > for example requiring a Merger while other "statically enumerable" windows
> > require
> > only an adder seems to touch on the heart of the matter.
> > 
> >  It seems like what Time and Universal (and any other Windows
> >> implementation) have in common is that the windows are statically
> >> enumerable.
> >> As a consequence, they can all rely on an aggregation maintenence algorithm
> >> that involves enumerating each of the windows and updating it. That
> >> also means that their DSL object (TimeWindowedKStream) doesn't need
> >> "subtractors" or "mergers", but only "adders"; again, this is a consequence
> >> of the fact that the windows are enumerable.
> > 
> > 
> > Given that, I'm a bit confused why you conclude that sliding windows are
> > fundamentally
> > different from the "statically enumerable" windows -- sliding windows
> > require only an
> > adder too. I'm not sure it's a consequence of being enumerable, or that
> > being enumerable
> > is the fundamental property that unites all Windows (ignoring JoinWindows
> > here). Yes,  it
> > currently does apply to all Windows implementations, but we shouldn't
> > assume that it
> > *has *to be that way on the basis that it currently happens to be.
> > 
> > Also, 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-22 Thread Matthias J. Sax
I think what John tries to say is the following:

We have `windowedBy(Windows)` that accept hopping/tumbling windows but
also custom window and we use a specific algorithm. Note, that custom
windows must "work" based on the used algorithm.

For session windows we have `windowedBy(SessionWindows)` and apply a
different algorithm. Users could pass in a custom `SessionWindows` as
long as the session algorithm works correctly for it.

For the new sliding windows, we want to use a different algorithm
compare to hopping/tumbling windows. If we let sliding window extend
`Windows`, we can decide at runtime if we need to use the
hopping/tumbling window algorithm for hopping/tumbling windows or the
new sliding window algorithm for sliding windows. However, if we get a
custom window, which algorithm do we pick now? The existing
tumbling/hopping window algorithm of the new sliding window algorithm?
Both a custom "time-window" and custom "sliding window" implement the
generic `Windows` class and thus we cannot make a decision as we don't
know the user's intent.

As a matter of fact, even if the user might not be aware of it, the
algorithm we use does already leak into the API (if a user extends
`Windows` is must work with our hopping/tumbling window algorithm and if
a user extends `SessionWindows` it must work with our session algorithm)
and it seems we need to preserve this property for sliding window.


-Matthias

On 7/22/20 4:35 PM, Sophie Blee-Goldman wrote:
> Hey John,
> 
> Just a few follow-up questions/comments about the whole Windows thing:
> 
> That's a good way of looking at things; in particular the point about
> SessionWindows
> for example requiring a Merger while other "statically enumerable" windows
> require
> only an adder seems to touch on the heart of the matter.
> 
>  It seems like what Time and Universal (and any other Windows
>> implementation) have in common is that the windows are statically
>> enumerable.
>> As a consequence, they can all rely on an aggregation maintenence algorithm
>> that involves enumerating each of the windows and updating it. That
>> also means that their DSL object (TimeWindowedKStream) doesn't need
>> "subtractors" or "mergers", but only "adders"; again, this is a consequence
>> of the fact that the windows are enumerable.
> 
> 
> Given that, I'm a bit confused why you conclude that sliding windows are
> fundamentally
> different from the "statically enumerable" windows -- sliding windows
> require only an
> adder too. I'm not sure it's a consequence of being enumerable, or that
> being enumerable
> is the fundamental property that unites all Windows (ignoring JoinWindows
> here). Yes,  it
> currently does apply to all Windows implementations, but we shouldn't
> assume that it
> *has *to be that way on the basis that it currently happens to be.
> 
> Also, the fact that they can all rely on the same aggregation algorithm
> seems like an
> implementation detail and it would be weird to force a separate/new DSL API
> just because
> under the covers we swap in a different processor.
> 
> To be fair, I don't think there's a strong reason *against* not extending
> Windows -- in the end
> it will just mean adding a new #windowedBy method and copy/pasting
> everything from
>  TimeWindowedKStream pretty much word for word. But anytime you find
> yourself
> copying over code almost exactly, there should probably be a good reason
> why :)
> 
> 
> On Wed, Jul 22, 2020 at 3:48 PM John Roesler  wrote:
> 
>> Thanks Leah!
>>
>> 5) Regarding the empty windows, I'm wondering if we should simply propose
>> that the windows should not be emitted downstream of the operator or
>> visible in IQ. Then, it'll be up to the implementation to make it happen.
>> I'm
>> personally not concerned about it, since it seems like there are multiple
>> ways to accomplish this.
>>
>> Note, the discrepancy Matthias pointed out is actually a design bug. The
>> windowed aggregation (like every operation in Streams) produces a "view",
>> which then forms the basis of downstream operations. When we pass the
>> Materialized option to the operation, all we're doing is saying to
>> "materialize"
>> the view (aka, actually store the computed view) and also make it
>> queriable.
>> It would be illegal for the "queriable, materialized view" to differ in
>> any way
>> from the "view". So, it seems we must either propose to emit the empty
>> windows AND make them visible in IQ, or propose NOT to emit the empty
>> windows AND NOT make them visible in IQ.
>>
>> 7) Regarding whether we can extend TimeWindows (or Windows):
>> I've been mulling this over more. I think it's worth asking the question of
>> what these classes even mean. For example, why is SessionWindows a
>> different thing from TimeWindows and UniversalWindows (which are both
>> Windows)?
>>
>> This conversation is extra complicated because of the incomplete and
>> mis-matched class hierarchy, but we can try to look past it for now.
>>
>> It seems like what 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-22 Thread Sophie Blee-Goldman
Hey John,

Just a few follow-up questions/comments about the whole Windows thing:

That's a good way of looking at things; in particular the point about
SessionWindows
for example requiring a Merger while other "statically enumerable" windows
require
only an adder seems to touch on the heart of the matter.

 It seems like what Time and Universal (and any other Windows
> implementation) have in common is that the windows are statically
> enumerable.
> As a consequence, they can all rely on an aggregation maintenence algorithm
> that involves enumerating each of the windows and updating it. That
> also means that their DSL object (TimeWindowedKStream) doesn't need
> "subtractors" or "mergers", but only "adders"; again, this is a consequence
> of the fact that the windows are enumerable.


Given that, I'm a bit confused why you conclude that sliding windows are
fundamentally
different from the "statically enumerable" windows -- sliding windows
require only an
adder too. I'm not sure it's a consequence of being enumerable, or that
being enumerable
is the fundamental property that unites all Windows (ignoring JoinWindows
here). Yes,  it
currently does apply to all Windows implementations, but we shouldn't
assume that it
*has *to be that way on the basis that it currently happens to be.

Also, the fact that they can all rely on the same aggregation algorithm
seems like an
implementation detail and it would be weird to force a separate/new DSL API
just because
under the covers we swap in a different processor.

To be fair, I don't think there's a strong reason *against* not extending
Windows -- in the end
it will just mean adding a new #windowedBy method and copy/pasting
everything from
 TimeWindowedKStream pretty much word for word. But anytime you find
yourself
copying over code almost exactly, there should probably be a good reason
why :)


On Wed, Jul 22, 2020 at 3:48 PM John Roesler  wrote:

> Thanks Leah!
>
> 5) Regarding the empty windows, I'm wondering if we should simply propose
> that the windows should not be emitted downstream of the operator or
> visible in IQ. Then, it'll be up to the implementation to make it happen.
> I'm
> personally not concerned about it, since it seems like there are multiple
> ways to accomplish this.
>
> Note, the discrepancy Matthias pointed out is actually a design bug. The
> windowed aggregation (like every operation in Streams) produces a "view",
> which then forms the basis of downstream operations. When we pass the
> Materialized option to the operation, all we're doing is saying to
> "materialize"
> the view (aka, actually store the computed view) and also make it
> queriable.
> It would be illegal for the "queriable, materialized view" to differ in
> any way
> from the "view". So, it seems we must either propose to emit the empty
> windows AND make them visible in IQ, or propose NOT to emit the empty
> windows AND NOT make them visible in IQ.
>
> 7) Regarding whether we can extend TimeWindows (or Windows):
> I've been mulling this over more. I think it's worth asking the question of
> what these classes even mean. For example, why is SessionWindows a
> different thing from TimeWindows and UniversalWindows (which are both
> Windows)?
>
> This conversation is extra complicated because of the incomplete and
> mis-matched class hierarchy, but we can try to look past it for now.
>
> It seems like what Time and Universal (and any other Windows
> implementation) have in common is that the windows are statically
> enumerable.
> As a consequence, they can all rely on an aggregation maintenence algorithm
> that involves enumerating each of the windows and updating it. That
> also means that their DSL object (TimeWindowedKStream) doesn't need
> "subtractors" or "mergers", but only "adders"; again, this is a consequence
> of the fact that the windows are enumerable.
>
> In contrast, session windows are data driven, so they are not statically
> enumerable. Their algorithm has to rely on scans, and to do the scans,
> it needs to know the "inactivity gap", which needs to be part of the window
> definition. Likewise, session windows have the property that they need
> to be merged, so their DSL object also requires mergers.
>
> It really seems like your new window definition doesn't fit into either
> category. It uses a different algorithm, which relies on scans, but it is
> also fixed in size, so it doesn't need mergers. In this situation, it seems
> like the safe bet is to just create SessionWindows with no interface and
> add a separate set of DSL operations and objects. It's a little extra code,
> but it seems to keep everything tidier and more comprehensible, both
> for us and for users.
>
> What do you think?
> -John
>
>
>
> On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote:
> > Hi Matthias,
> >
> > Thanks for the suggestions, I've updated the KIP and child page
> accordingly
> > and addressed some below.
> >
> > 1) For the mandatory grace period, we should use a static 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-22 Thread John Roesler
Thanks Leah!

5) Regarding the empty windows, I'm wondering if we should simply propose
that the windows should not be emitted downstream of the operator or
visible in IQ. Then, it'll be up to the implementation to make it happen. I'm
personally not concerned about it, since it seems like there are multiple
ways to accomplish this.

Note, the discrepancy Matthias pointed out is actually a design bug. The
windowed aggregation (like every operation in Streams) produces a "view",
which then forms the basis of downstream operations. When we pass the
Materialized option to the operation, all we're doing is saying to "materialize"
the view (aka, actually store the computed view) and also make it queriable.
It would be illegal for the "queriable, materialized view" to differ in any way
from the "view". So, it seems we must either propose to emit the empty
windows AND make them visible in IQ, or propose NOT to emit the empty
windows AND NOT make them visible in IQ.

7) Regarding whether we can extend TimeWindows (or Windows):
I've been mulling this over more. I think it's worth asking the question of
what these classes even mean. For example, why is SessionWindows a
different thing from TimeWindows and UniversalWindows (which are both
Windows)?

This conversation is extra complicated because of the incomplete and
mis-matched class hierarchy, but we can try to look past it for now.

It seems like what Time and Universal (and any other Windows 
implementation) have in common is that the windows are statically enumerable.
As a consequence, they can all rely on an aggregation maintenence algorithm
that involves enumerating each of the windows and updating it. That
also means that their DSL object (TimeWindowedKStream) doesn't need
"subtractors" or "mergers", but only "adders"; again, this is a consequence
of the fact that the windows are enumerable.

In contrast, session windows are data driven, so they are not statically
enumerable. Their algorithm has to rely on scans, and to do the scans,
it needs to know the "inactivity gap", which needs to be part of the window
definition. Likewise, session windows have the property that they need
to be merged, so their DSL object also requires mergers.

It really seems like your new window definition doesn't fit into either
category. It uses a different algorithm, which relies on scans, but it is
also fixed in size, so it doesn't need mergers. In this situation, it seems
like the safe bet is to just create SessionWindows with no interface and
add a separate set of DSL operations and objects. It's a little extra code,
but it seems to keep everything tidier and more comprehensible, both
for us and for users.

What do you think?
-John



On Wed, Jul 22, 2020, at 10:30, Leah Thomas wrote:
> Hi Matthias,
> 
> Thanks for the suggestions, I've updated the KIP and child page accordingly
> and addressed some below.
> 
> 1) For the mandatory grace period, we should use a static builder method
> > that take two parameters.
> >
> 
>  That makes sense, I've changed that in the public API.
> 
> Btw: this implementation actually raises an issue for IQ: those empty
> > windows would be returned.
> 
> 
> This is a great point, with the current implementation plan empty windows
> would be returned. I think creating a second window store would definitely
> work, but there would be more overhead in having two stores and switching
> windows between the stores, as well as doing scans in both stores to find
> existing windows. There might be a way to do avoid emitting empty windows
> without creating a second window store, I'll look more into it.
> 
> Cheers,
> Leah
> 
> On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax  wrote:
> 
> > Thanks for updating the KIP.
> >
> > Couple of follow up comments:
> >
> > 1) For the mandatory grace period, we should use a static builder method
> > that take two parameters. This provides a better API as user cannot
> > forget to set the grace period. Throwing a runtime exception seems not
> > to be the best way to handle this case.
> >
> >
> >
> > 2) In Fig.2 you list 10 hopping windows. I believe it should actually be
> > more? There first hopping window would be [-6,-4[ and the last one would
> > be from [19,29[ -- hence, the cost saving are actually much higher.
> >
> >
> >
> > 3a) IQ: you are saying that the user need to compute the start time as
> >
> > > windowSize+the time they're looking at
> >
> > Should this be "targetTime - windowSize" instead?
> >
> >
> >
> > 3b) IQ: in you example you say "window size of 10 minutes" with an
> > incident at 9:15.
> >
> > > they're looking for a window with the start time of 8:15.
> >
> > The example does not seem to add up?
> >
> >
> >
> > 4) For "Processing Windows": you describe a three step approach: I just
> > want to point out, that step (1) is not necessary for each input record,
> > because timestamps are not guaranteed to be unique and thus a previous
> > record with the same key and timestamp might have create 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-22 Thread Leah Thomas
Hi Matthias,

Thanks for the suggestions, I've updated the KIP and child page accordingly
and addressed some below.

1) For the mandatory grace period, we should use a static builder method
> that take two parameters.
>

 That makes sense, I've changed that in the public API.

Btw: this implementation actually raises an issue for IQ: those empty
> windows would be returned.


This is a great point, with the current implementation plan empty windows
would be returned. I think creating a second window store would definitely
work, but there would be more overhead in having two stores and switching
windows between the stores, as well as doing scans in both stores to find
existing windows. There might be a way to do avoid emitting empty windows
without creating a second window store, I'll look more into it.

Cheers,
Leah

On Tue, Jul 21, 2020 at 1:25 PM Matthias J. Sax  wrote:

> Thanks for updating the KIP.
>
> Couple of follow up comments:
>
> 1) For the mandatory grace period, we should use a static builder method
> that take two parameters. This provides a better API as user cannot
> forget to set the grace period. Throwing a runtime exception seems not
> to be the best way to handle this case.
>
>
>
> 2) In Fig.2 you list 10 hopping windows. I believe it should actually be
> more? There first hopping window would be [-6,-4[ and the last one would
> be from [19,29[ -- hence, the cost saving are actually much higher.
>
>
>
> 3a) IQ: you are saying that the user need to compute the start time as
>
> > windowSize+the time they're looking at
>
> Should this be "targetTime - windowSize" instead?
>
>
>
> 3b) IQ: in you example you say "window size of 10 minutes" with an
> incident at 9:15.
>
> > they're looking for a window with the start time of 8:15.
>
> The example does not seem to add up?
>
>
>
> 4) For "Processing Windows": you describe a three step approach: I just
> want to point out, that step (1) is not necessary for each input record,
> because timestamps are not guaranteed to be unique and thus a previous
> record with the same key and timestamp might have create the windows
> already.
>
> Nit: I am also not exactly sure what you mean by step (3) as you use the
> word "send". I guess you mean "put"?
>
> It seem there are actually more details in the sub-page:
>
> > A new record for SlidingWindows will always create two new windows. If
> either of those windows already exist in the windows store, their
> aggregation will simply be updated to include the new record, but no
> duplicate window will be added to the WindowStore.
>
> However, the first and second sentence contradict each other a little
> bit. I think the first sentence is not correct.
>
> Nit:
>
> > For in-order records, the left window will always be empty.
>
> This should be "right window" ?
>
>
>
> 5) "Emitting Results": it might be worth to point out, that a
> second/future window of a new record is create with no records, and
> thus, even if it's initialized it won't be emitted. Only if a
> consecutive record falls into the window, the window would be updates
> and the window result (for a window content of one record) would be sent
> downstream.
>
> Again, the sub-page contains this details. Might still be worth to add
> to the top level page, too.
>
> Btw: this implementation actually raises an issue for IQ: those empty
> windows would be returned. Thus I am wondering if we need to use two
> stores internally? One store for actual windows and one store for empty
> windows? If an empty window is updated, it's move to the other store?
> For IQ, we only allow to query the non-empty-window store?
>
>
>
> 6) On the sub-page:
>
> > The left window of in-order records and both windows for out-of-order
> records need to be updated with the values of records that have already
> been processed.
>
> Why "both windows for out-of-order records"? IMHO, we don't know how
> many existing windows needs to be updated when processing an
> out-of-order record. Of course, an out-of-order record could not fall
> into any existing window but create two new windows, too.
>
> >  Because each record creates one new window that includes itself and one
> window that does not
>
> As state above, this does not seem to hold. I understand why you mean,
> but it would be good to be exact.
>
> Figure 2: You use the term "late" but you mean "out-of-order" I guess --
> a record is _late_ if it's not processed any longer as the grace period
> passed already.
>
> Figure 2: "Late" should be out-or-order. The example text say a window
> [16,26] should be created but the figure shows the green window as [15,20].
>
> About the blue window: maybe add not that the blue window contains the
> aggregate we need for the green window, _before_ the new record `a` is
> added to the blue window.
>
>
>
> 7) I am not really happy to extend TimeWindows and I think the argument
> about JoinWindows is not the best (IMHO, JoinWindows do it already wrong
> and we just repeat the same 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-21 Thread Matthias J. Sax
Thanks for updating the KIP.

Couple of follow up comments:

1) For the mandatory grace period, we should use a static builder method
that take two parameters. This provides a better API as user cannot
forget to set the grace period. Throwing a runtime exception seems not
to be the best way to handle this case.



2) In Fig.2 you list 10 hopping windows. I believe it should actually be
more? There first hopping window would be [-6,-4[ and the last one would
be from [19,29[ -- hence, the cost saving are actually much higher.



3a) IQ: you are saying that the user need to compute the start time as

> windowSize+the time they're looking at

Should this be "targetTime - windowSize" instead?



3b) IQ: in you example you say "window size of 10 minutes" with an
incident at 9:15.

> they're looking for a window with the start time of 8:15.

The example does not seem to add up?



4) For "Processing Windows": you describe a three step approach: I just
want to point out, that step (1) is not necessary for each input record,
because timestamps are not guaranteed to be unique and thus a previous
record with the same key and timestamp might have create the windows
already.

Nit: I am also not exactly sure what you mean by step (3) as you use the
word "send". I guess you mean "put"?

It seem there are actually more details in the sub-page:

> A new record for SlidingWindows will always create two new windows. If either 
> of those windows already exist in the windows store, their aggregation will 
> simply be updated to include the new record, but no duplicate window will be 
> added to the WindowStore.

However, the first and second sentence contradict each other a little
bit. I think the first sentence is not correct.

Nit:

> For in-order records, the left window will always be empty.

This should be "right window" ?



5) "Emitting Results": it might be worth to point out, that a
second/future window of a new record is create with no records, and
thus, even if it's initialized it won't be emitted. Only if a
consecutive record falls into the window, the window would be updates
and the window result (for a window content of one record) would be sent
downstream.

Again, the sub-page contains this details. Might still be worth to add
to the top level page, too.

Btw: this implementation actually raises an issue for IQ: those empty
windows would be returned. Thus I am wondering if we need to use two
stores internally? One store for actual windows and one store for empty
windows? If an empty window is updated, it's move to the other store?
For IQ, we only allow to query the non-empty-window store?



6) On the sub-page:

> The left window of in-order records and both windows for out-of-order records 
> need to be updated with the values of records that have already been 
> processed.

Why "both windows for out-of-order records"? IMHO, we don't know how
many existing windows needs to be updated when processing an
out-of-order record. Of course, an out-of-order record could not fall
into any existing window but create two new windows, too.

>  Because each record creates one new window that includes itself and one 
> window that does not

As state above, this does not seem to hold. I understand why you mean,
but it would be good to be exact.

Figure 2: You use the term "late" but you mean "out-of-order" I guess --
a record is _late_ if it's not processed any longer as the grace period
passed already.

Figure 2: "Late" should be out-or-order. The example text say a window
[16,26] should be created but the figure shows the green window as [15,20].

About the blue window: maybe add not that the blue window contains the
aggregate we need for the green window, _before_ the new record `a` is
added to the blue window.



7) I am not really happy to extend TimeWindows and I think the argument
about JoinWindows is not the best (IMHO, JoinWindows do it already wrong
and we just repeat the same mistake). However, it seems our window
hierarchy is "broken" already and it might be out of scope for this KIP
to fix it. Hence, I am ok that we bite the bullet for now and clean it
up later.



-Matthias


On 7/20/20 5:18 PM, Guozhang Wang wrote:
> Hi Leah,
> 
> Thanks for the updated KIP. I agree that extending SlidingWindows from
> Windows is fine for the sake of not introducing more public APIs (and their
> internal xxxImpl classes), and its cons is small enough to tolerate to me.
> 
> 
> Guozhang
> 
> 
> On Mon, Jul 20, 2020 at 1:49 PM Leah Thomas  wrote:
> 
>> Hi all,
>>
>> Thanks for the feedback on the KIP. I've updated the KIP page
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
>>>
>> to address these points and have created a child page
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450
>>>
>> to go more in depth on certain implementation details.
>>
>> *Grace Period:*
>> I think Sophie raises a good point that the default grace 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-20 Thread Guozhang Wang
Hi Leah,

Thanks for the updated KIP. I agree that extending SlidingWindows from
Windows is fine for the sake of not introducing more public APIs (and their
internal xxxImpl classes), and its cons is small enough to tolerate to me.


Guozhang


On Mon, Jul 20, 2020 at 1:49 PM Leah Thomas  wrote:

> Hi all,
>
> Thanks for the feedback on the KIP. I've updated the KIP page
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >
> to address these points and have created a child page
> <
> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450
> >
> to go more in depth on certain implementation details.
>
> *Grace Period:*
> I think Sophie raises a good point that the default grace period of 24
> hours is often too long and was chosen when retention time and grace period
> were the same. For SlidingWindows, I propose we make the grace period
> mandatory. To keep formatting consistent with other types of windows, grace
> period won't be an additional parameter in the #of method, but will still
> look like it does in other use cases:
> .windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds). If grace
> period isn't properly initialized, an error will be thrown through the
> process method.
>
> *Storage Layer + Aggregation:*
> SlidingWindows will use a WindowStore because computation can be done with
> the information stored in a WindowStore (window timestamp and value). Using
> the WindowStore also simplifies computation as SlidingWindows can leverage
> existing processes. Because we are using a WindowStore, the aggregation
> process will be similar to that of a hopping window. As records come in
> their value is added to the aggregation that already exists, following the
> same procedure as hopping windows. The aggregation difference between
> SlidingWindows and HoppingWindows comes in creating new windows for a
> SlidingWindow, where you need to find the existing records that belong to
> the new window. This computation is similar to the aggregation in
> SessionWindows and requires a scan to the WindowStore to find the window
> with the aggregation needed, which will always be pre-computed. The scan
> requires creating an iterator, but should have minimal performance effects
> as this strategy is already implemented in SessionWindows. More details on
> finding the aggregation that needs to be put in a new window can be found
> on the implementation page
> <
> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450
> >
> .
>
> *Extending Windows, Windows or nothing*
> Because SlidingWindows are still defined by a windowSize (whereas
> SessionWindows are defined purely by data), I think it makes sense to
> leverage the existing Window processes instead of creating a new store type
> that would be very similar to the WindowStore. While it's true that the
> #windowsFor method isn't necessary for SlidingWindows, JoinWindows also
> extends Windows and throws an UnsupportedOperationException in the
> #windowsFor method, which is what SlidingWindows can do. The difference
> between extending Windows or Windows is minimal, as
> both are ways to pass window parameters. Extending Windows will
> give us more leverage in utilizing existing processes.
>
> *Emit Strategy*
> I would argue that emitting for every update is still the best way to go
> for SlidingWindows because it mimics the other types of windows, and
> suppression can be leveraged to limit what SlidingWindows emits. While some
> users may only want to see the last value, others may want to see more, and
> leaving the emit strategy to emit partial results allows both users to
> access what they want.
>
> *Additional Features*
> Supporting sliding windows inherently, and shifting inefficient hopping
> windows to sliding windows, is an interesting idea and could be built on
> top of SlidingWindows when they are finished, but right now seems out of
> scope for the needs of this KIP. Similarly, including a `subtraction`
> feature could have performance improvements, but doesn't seem necessary for
> the implementation of this KIP.
>
> Let me know what you think of the updates,
>
> Leah
>
> On Thu, Jul 16, 2020 at 11:57 AM John Roesler  wrote:
>
> > Hello all,
> >
> > Thanks for the KIP, Leah!
> >
> > Regarding (1): I'd go farther actually. Making Windows an abstract
> > class was a mistake from the beginning that led to us not being
> > able to fix a very confusing situation for users around retention times,
> > final results emitting, etc. Thus, I would not suggest extending
> > TimeWindows for sure, but would also not suggest extending Windows.
> >
> > The very simplest thing to do is follow the example of SessionWindows,
> > which is just a completely self-contained class. If we don't mess with
> > class inheritance, we won't ever have any of the problems related to
> > class inheritance. This is my preferred solution.
> >
> > Still, Sliding 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-20 Thread Leah Thomas
Hi all,

Thanks for the feedback on the KIP. I've updated the KIP page

to address these points and have created a child page

to go more in depth on certain implementation details.

*Grace Period:*
I think Sophie raises a good point that the default grace period of 24
hours is often too long and was chosen when retention time and grace period
were the same. For SlidingWindows, I propose we make the grace period
mandatory. To keep formatting consistent with other types of windows, grace
period won't be an additional parameter in the #of method, but will still
look like it does in other use cases:
.windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds). If grace
period isn't properly initialized, an error will be thrown through the
process method.

*Storage Layer + Aggregation:*
SlidingWindows will use a WindowStore because computation can be done with
the information stored in a WindowStore (window timestamp and value). Using
the WindowStore also simplifies computation as SlidingWindows can leverage
existing processes. Because we are using a WindowStore, the aggregation
process will be similar to that of a hopping window. As records come in
their value is added to the aggregation that already exists, following the
same procedure as hopping windows. The aggregation difference between
SlidingWindows and HoppingWindows comes in creating new windows for a
SlidingWindow, where you need to find the existing records that belong to
the new window. This computation is similar to the aggregation in
SessionWindows and requires a scan to the WindowStore to find the window
with the aggregation needed, which will always be pre-computed. The scan
requires creating an iterator, but should have minimal performance effects
as this strategy is already implemented in SessionWindows. More details on
finding the aggregation that needs to be put in a new window can be found
on the implementation page

.

*Extending Windows, Windows or nothing*
Because SlidingWindows are still defined by a windowSize (whereas
SessionWindows are defined purely by data), I think it makes sense to
leverage the existing Window processes instead of creating a new store type
that would be very similar to the WindowStore. While it's true that the
#windowsFor method isn't necessary for SlidingWindows, JoinWindows also
extends Windows and throws an UnsupportedOperationException in the
#windowsFor method, which is what SlidingWindows can do. The difference
between extending Windows or Windows is minimal, as
both are ways to pass window parameters. Extending Windows will
give us more leverage in utilizing existing processes.

*Emit Strategy*
I would argue that emitting for every update is still the best way to go
for SlidingWindows because it mimics the other types of windows, and
suppression can be leveraged to limit what SlidingWindows emits. While some
users may only want to see the last value, others may want to see more, and
leaving the emit strategy to emit partial results allows both users to
access what they want.

*Additional Features*
Supporting sliding windows inherently, and shifting inefficient hopping
windows to sliding windows, is an interesting idea and could be built on
top of SlidingWindows when they are finished, but right now seems out of
scope for the needs of this KIP. Similarly, including a `subtraction`
feature could have performance improvements, but doesn't seem necessary for
the implementation of this KIP.

Let me know what you think of the updates,

Leah

On Thu, Jul 16, 2020 at 11:57 AM John Roesler  wrote:

> Hello all,
>
> Thanks for the KIP, Leah!
>
> Regarding (1): I'd go farther actually. Making Windows an abstract
> class was a mistake from the beginning that led to us not being
> able to fix a very confusing situation for users around retention times,
> final results emitting, etc. Thus, I would not suggest extending
> TimeWindows for sure, but would also not suggest extending Windows.
>
> The very simplest thing to do is follow the example of SessionWindows,
> which is just a completely self-contained class. If we don't mess with
> class inheritance, we won't ever have any of the problems related to
> class inheritance. This is my preferred solution.
>
> Still, Sliding windows has a lot in common with TimeWindows and other
> fixed-size windows, namely that the windows are fixed in size. If we want
> to preserve the current two-part windowing API in which you can window
> by either "fixed" or "data driven" modes, I'd suggest we avoid increasing
> the blast radius of Windows by taking the opportunity to replace it with
> a proper interface and implement that interface instead.
>
> For example:
> https://github.com/apache/kafka/pull/9031
>
> Then, 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-16 Thread John Roesler
Hello all, 

Thanks for the KIP, Leah!

Regarding (1): I'd go farther actually. Making Windows an abstract
class was a mistake from the beginning that led to us not being
able to fix a very confusing situation for users around retention times,
final results emitting, etc. Thus, I would not suggest extending
TimeWindows for sure, but would also not suggest extending Windows.

The very simplest thing to do is follow the example of SessionWindows,
which is just a completely self-contained class. If we don't mess with
class inheritance, we won't ever have any of the problems related to
class inheritance. This is my preferred solution.

Still, Sliding windows has a lot in common with TimeWindows and other
fixed-size windows, namely that the windows are fixed in size. If we want
to preserve the current two-part windowing API in which you can window
by either "fixed" or "data driven" modes, I'd suggest we avoid increasing
the blast radius of Windows by taking the opportunity to replace it with
a proper interface and implement that interface instead.

For example:
https://github.com/apache/kafka/pull/9031

Then, SlidingWindows would just implement FixedSizeWindowDefinition

==

Regarding (2), it seems more straightforward as a user of Streams
to just have one mental model. _All_ of our aggregation operations
follow an eager emission model, in which we just emit an update whenever
an update is available. We already provided Suppression to explicitly apply
different update semantics in the case it's required. Why should we define
a snowflake operation with completely different semantics from everything
else? I.e., systems are generally easier to use when they follow a few
simple, composable rules than when they have a lot of different, specific
rules.


==

New point: (4):
It would be nice to include some examples of user code that would use the
new API, which should include:
1. using the DSL with the sliding window definition
2. accessing the stored results of a sliding window aggregation via IQ
3. defining a custom processor to access sliding windows in a store

It generally helps reviewers wrap their heads around the proposal, as well
as shaking out any design issues that would otherwise only come up during
implementation/testing/review.

Thanks again for the awesome proposal!
-John


On Tue, Jul 14, 2020, at 12:31, Guozhang Wang wrote:
> Hello Leah,
> 
> Thanks for the nice written KIP. A few thoughts:
> 
> 1) I echo the other reviewer's comments regarding the typing: why extending
> TimeWindow instead of just extending Window?
> 
> 2) I also feel that emitting policy for this type of windowing aggregation
> may be different from the existing ones. Existing emitting policy is very
> simple: emit every time when window get updates, and emit every time on
> out-of-ordering data within grace period, this is because for time-windows
> the window close time is strictly depend on the window start time which is
> fixed, while for session-windows although the window open/close time is
> also data-dependent it is relatively infrequent compared to the
> sliding-windows. For this KIP, since each new data would cause a
> new sliding-window, the num. windows maintained logically could be much
> larger and hence emitting on each update may be too aggressive.
> 
> 3) Although KIP itself should be focusing on user face interfaces, I'd
> suggest we create a children page of KIP-450 discussing about its
> implementations as well, since some of that may drive the interface design.
> E.g. personally I think having a combiner interface in addition to
> aggregator would be useful but that's based on my 2cents about the
> implementation design (I once created a child page describing it:
> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
> ).
> 
> 
> Guozhang
> 
> 
> 
> 
> On Tue, Jul 14, 2020 at 5:31 AM Bruno Cadonna  wrote:
> 
> > Hi Leah,
> >
> > Thank you for the KIP!
> >
> > Here is my feedback:
> >
> > 1. The KIP would benefit from some code examples that show how to use
> > sliding windows in aggregations.
> >
> > 2. The different sliding windows in Figure 1 and 2 are really hard to
> > distinguish. Could you please try to make them graphically better
> > distinguishable? You could try to draw the frames of consecutive
> > windows shifted to each other.
> >
> > 3. I agree with Matthias, that extending Windows does not
> > seem to be the best approach. What would be the result of
> > windowsFor()?
> >
> > 4. In the section "Public Interfaces" you should remove implementation
> > details like private constructors and private fields.
> >
> > 5. Do we need a new store interface or can we use WindowStore? Some
> > words about that would be informative.
> >
> > 6. @Matthias, if the subtrator is not strictly needed, I would skip it
> > for now and add it later.
> >
> > 7. I also agree that having a section that describes how to handle
> > out-of-order records would be good to understand what is 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-14 Thread Guozhang Wang
Hello Leah,

Thanks for the nice written KIP. A few thoughts:

1) I echo the other reviewer's comments regarding the typing: why extending
TimeWindow instead of just extending Window?

2) I also feel that emitting policy for this type of windowing aggregation
may be different from the existing ones. Existing emitting policy is very
simple: emit every time when window get updates, and emit every time on
out-of-ordering data within grace period, this is because for time-windows
the window close time is strictly depend on the window start time which is
fixed, while for session-windows although the window open/close time is
also data-dependent it is relatively infrequent compared to the
sliding-windows. For this KIP, since each new data would cause a
new sliding-window, the num. windows maintained logically could be much
larger and hence emitting on each update may be too aggressive.

3) Although KIP itself should be focusing on user face interfaces, I'd
suggest we create a children page of KIP-450 discussing about its
implementations as well, since some of that may drive the interface design.
E.g. personally I think having a combiner interface in addition to
aggregator would be useful but that's based on my 2cents about the
implementation design (I once created a child page describing it:
https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
).


Guozhang




On Tue, Jul 14, 2020 at 5:31 AM Bruno Cadonna  wrote:

> Hi Leah,
>
> Thank you for the KIP!
>
> Here is my feedback:
>
> 1. The KIP would benefit from some code examples that show how to use
> sliding windows in aggregations.
>
> 2. The different sliding windows in Figure 1 and 2 are really hard to
> distinguish. Could you please try to make them graphically better
> distinguishable? You could try to draw the frames of consecutive
> windows shifted to each other.
>
> 3. I agree with Matthias, that extending Windows does not
> seem to be the best approach. What would be the result of
> windowsFor()?
>
> 4. In the section "Public Interfaces" you should remove implementation
> details like private constructors and private fields.
>
> 5. Do we need a new store interface or can we use WindowStore? Some
> words about that would be informative.
>
> 6. @Matthias, if the subtrator is not strictly needed, I would skip it
> for now and add it later.
>
> 7. I also agree that having a section that describes how to handle
> out-of-order records would be good to understand what is still missing
> and what we can reuse.
>
> Best,
> Bruno
>
> On Sat, Jul 11, 2020 at 9:16 PM Matthias J. Sax  wrote:
> >
> > Leah,
> >
> > thanks for your update. However, it does not completely answer my
> question.
> >
> > In our current window implementations, we emit a window result update
> > record (ie, early/partial result) for each input record. When an
> > out-of-order record arrives, we just update to corresponding old window
> > and emit another update.
> >
> > It's unclear from the KIP if you propose the same emit strategy? -- For
> > sliding windows it might be worth to consider to use a different emit
> > strategy and only support emitting the final result only (ie, after the
> > grace period passed)?
> >
> >
> >
> > Boyang, also raises a good point that relates to my point from above
> > about pre-aggregations and storage layout. Our current time windows are
> > all pre-aggregated and stored in parallel. We can also lookup windows
> > efficiently, as we can compute the windowed-key given the input record
> > key and timestamp based on the window definition.
> >
> > However, for sliding windows, window boundaries are data dependent and
> > thus we cannot compute them upfront. Thus, how can we "find" existing
> > window efficiently? Furthermore, out-of-order data would create new
> > windows in the past and we need to be able to handle this case.
> >
> > Thus, to handle out-of-order data correctly, we need to store all raw
> > input events. Additionally, we could also store pre-aggregated results
> > if we thinks it's benfitial. -- If we apply "emit only final results"
> > strategy, storing pre-aggregated result would not be necessary though.
> >
> >
> > Btw: for sliding windows it might also be useful to consider allowing
> > users to supply a `Subtractor` -- this subtractor could be applied on
> > the current window result (in case we store it) if a record drops out of
> > the window. Of course, not all aggregation functions are subtractable
> > and we can consider this as a follow up task, too, and not include in
> > this KIP for now. Thoughts?
> >
> >
> >
> > I was also thinking about the type hierarchy. I am not sure if extending
> > TimeWindow is the best approach? For TimeWindows, we can pre-compute
> > window boundaries (cf `windowsFor()`) while for a sliding window the
> > boundaries are data dependent. Session windows are also data dependent
> > and thus they don't inherit from TimeWindow (Maybe check out the KIP
> > that added session 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-14 Thread Bruno Cadonna
Hi Leah,

Thank you for the KIP!

Here is my feedback:

1. The KIP would benefit from some code examples that show how to use
sliding windows in aggregations.

2. The different sliding windows in Figure 1 and 2 are really hard to
distinguish. Could you please try to make them graphically better
distinguishable? You could try to draw the frames of consecutive
windows shifted to each other.

3. I agree with Matthias, that extending Windows does not
seem to be the best approach. What would be the result of
windowsFor()?

4. In the section "Public Interfaces" you should remove implementation
details like private constructors and private fields.

5. Do we need a new store interface or can we use WindowStore? Some
words about that would be informative.

6. @Matthias, if the subtrator is not strictly needed, I would skip it
for now and add it later.

7. I also agree that having a section that describes how to handle
out-of-order records would be good to understand what is still missing
and what we can reuse.

Best,
Bruno

On Sat, Jul 11, 2020 at 9:16 PM Matthias J. Sax  wrote:
>
> Leah,
>
> thanks for your update. However, it does not completely answer my question.
>
> In our current window implementations, we emit a window result update
> record (ie, early/partial result) for each input record. When an
> out-of-order record arrives, we just update to corresponding old window
> and emit another update.
>
> It's unclear from the KIP if you propose the same emit strategy? -- For
> sliding windows it might be worth to consider to use a different emit
> strategy and only support emitting the final result only (ie, after the
> grace period passed)?
>
>
>
> Boyang, also raises a good point that relates to my point from above
> about pre-aggregations and storage layout. Our current time windows are
> all pre-aggregated and stored in parallel. We can also lookup windows
> efficiently, as we can compute the windowed-key given the input record
> key and timestamp based on the window definition.
>
> However, for sliding windows, window boundaries are data dependent and
> thus we cannot compute them upfront. Thus, how can we "find" existing
> window efficiently? Furthermore, out-of-order data would create new
> windows in the past and we need to be able to handle this case.
>
> Thus, to handle out-of-order data correctly, we need to store all raw
> input events. Additionally, we could also store pre-aggregated results
> if we thinks it's benfitial. -- If we apply "emit only final results"
> strategy, storing pre-aggregated result would not be necessary though.
>
>
> Btw: for sliding windows it might also be useful to consider allowing
> users to supply a `Subtractor` -- this subtractor could be applied on
> the current window result (in case we store it) if a record drops out of
> the window. Of course, not all aggregation functions are subtractable
> and we can consider this as a follow up task, too, and not include in
> this KIP for now. Thoughts?
>
>
>
> I was also thinking about the type hierarchy. I am not sure if extending
> TimeWindow is the best approach? For TimeWindows, we can pre-compute
> window boundaries (cf `windowsFor()`) while for a sliding window the
> boundaries are data dependent. Session windows are also data dependent
> and thus they don't inherit from TimeWindow (Maybe check out the KIP
> that added session windows? It could provides some good insights.) -- I
> believe the same rational applies to sliding windows?
>
>
>
> -Matthias
>
>
>
>
> On 7/10/20 12:47 PM, Boyang Chen wrote:
> > Thanks Leah and Sophie for the KIP.
> >
> > 1. I'm a bit surprised that we don't have an advance time. Could we
> > elaborate how the storage layer is structured?
> >
> > 2. IIUC, there will be extra cost in terms of fetching aggregation results,
> > since we couldn't pre-aggregate until the user asks for it. Would be good
> > to also discuss it.
> >
> > 3. We haven't discussed the possibility of supporting sliding windows
> > inherently. For a user who actually uses a hopping window, Streams could
> > detect such an inefficiency doing a window_size/advance_time ratio to reach
> > a conclusion on whether the write amplification is too high compared with
> > some configured threshold. The benefit of doing so is that existing Streams
> > users don't need to change their code, learn a new API, but only to upgrade
> > Streams library to get benefits for their inefficient hopping window
> > implementation. There might be some compatibility issues for sure, but
> > worth listing them out for trade-off.
> >
> > Boyang
> >
> > On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas  wrote:
> >
> >> Hey Matthias,
> >>
> >> Thanks for pointing that out. I added the following to the Propose Changes
> >> section of the KIP:
> >>
> >> "Records that come out of order will be processed the same way as in-order
> >> records, as long as they fall within the grace period. Any new windows
> >> created by the late record will still be created, and the 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-11 Thread Matthias J. Sax
Leah,

thanks for your update. However, it does not completely answer my question.

In our current window implementations, we emit a window result update
record (ie, early/partial result) for each input record. When an
out-of-order record arrives, we just update to corresponding old window
and emit another update.

It's unclear from the KIP if you propose the same emit strategy? -- For
sliding windows it might be worth to consider to use a different emit
strategy and only support emitting the final result only (ie, after the
grace period passed)?



Boyang, also raises a good point that relates to my point from above
about pre-aggregations and storage layout. Our current time windows are
all pre-aggregated and stored in parallel. We can also lookup windows
efficiently, as we can compute the windowed-key given the input record
key and timestamp based on the window definition.

However, for sliding windows, window boundaries are data dependent and
thus we cannot compute them upfront. Thus, how can we "find" existing
window efficiently? Furthermore, out-of-order data would create new
windows in the past and we need to be able to handle this case.

Thus, to handle out-of-order data correctly, we need to store all raw
input events. Additionally, we could also store pre-aggregated results
if we thinks it's benfitial. -- If we apply "emit only final results"
strategy, storing pre-aggregated result would not be necessary though.


Btw: for sliding windows it might also be useful to consider allowing
users to supply a `Subtractor` -- this subtractor could be applied on
the current window result (in case we store it) if a record drops out of
the window. Of course, not all aggregation functions are subtractable
and we can consider this as a follow up task, too, and not include in
this KIP for now. Thoughts?



I was also thinking about the type hierarchy. I am not sure if extending
TimeWindow is the best approach? For TimeWindows, we can pre-compute
window boundaries (cf `windowsFor()`) while for a sliding window the
boundaries are data dependent. Session windows are also data dependent
and thus they don't inherit from TimeWindow (Maybe check out the KIP
that added session windows? It could provides some good insights.) -- I
believe the same rational applies to sliding windows?



-Matthias




On 7/10/20 12:47 PM, Boyang Chen wrote:
> Thanks Leah and Sophie for the KIP.
> 
> 1. I'm a bit surprised that we don't have an advance time. Could we
> elaborate how the storage layer is structured?
> 
> 2. IIUC, there will be extra cost in terms of fetching aggregation results,
> since we couldn't pre-aggregate until the user asks for it. Would be good
> to also discuss it.
> 
> 3. We haven't discussed the possibility of supporting sliding windows
> inherently. For a user who actually uses a hopping window, Streams could
> detect such an inefficiency doing a window_size/advance_time ratio to reach
> a conclusion on whether the write amplification is too high compared with
> some configured threshold. The benefit of doing so is that existing Streams
> users don't need to change their code, learn a new API, but only to upgrade
> Streams library to get benefits for their inefficient hopping window
> implementation. There might be some compatibility issues for sure, but
> worth listing them out for trade-off.
> 
> Boyang
> 
> On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas  wrote:
> 
>> Hey Matthias,
>>
>> Thanks for pointing that out. I added the following to the Propose Changes
>> section of the KIP:
>>
>> "Records that come out of order will be processed the same way as in-order
>> records, as long as they fall within the grace period. Any new windows
>> created by the late record will still be created, and the existing windows
>> that are changed by the late record will be updated. Any record that falls
>> outside of the grace period (either user defined or default) will be
>> discarded. "
>>
>> All the best,
>> Leah
>>
>> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax  wrote:
>>
>>> Leah,
>>>
>>> thanks a lot for the KIP. Very well written.
>>>
>>> The KIP does not talk about the handling of out-of-order data though.
>>> How do you propose to address this?
>>>
>>>
>>> -Matthias
>>>
>>> On 7/8/20 5:33 PM, Leah Thomas wrote:
 Hi all,
 I'd like to kick-off the discussion for KIP-450, adding sliding window
 aggregation support to Kafka Streams.


>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL

 Let me know what you think,
 Leah

>>>
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-10 Thread Boyang Chen
Thanks Leah and Sophie for the KIP.

1. I'm a bit surprised that we don't have an advance time. Could we
elaborate how the storage layer is structured?

2. IIUC, there will be extra cost in terms of fetching aggregation results,
since we couldn't pre-aggregate until the user asks for it. Would be good
to also discuss it.

3. We haven't discussed the possibility of supporting sliding windows
inherently. For a user who actually uses a hopping window, Streams could
detect such an inefficiency doing a window_size/advance_time ratio to reach
a conclusion on whether the write amplification is too high compared with
some configured threshold. The benefit of doing so is that existing Streams
users don't need to change their code, learn a new API, but only to upgrade
Streams library to get benefits for their inefficient hopping window
implementation. There might be some compatibility issues for sure, but
worth listing them out for trade-off.

Boyang

On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas  wrote:

> Hey Matthias,
>
> Thanks for pointing that out. I added the following to the Propose Changes
> section of the KIP:
>
> "Records that come out of order will be processed the same way as in-order
> records, as long as they fall within the grace period. Any new windows
> created by the late record will still be created, and the existing windows
> that are changed by the late record will be updated. Any record that falls
> outside of the grace period (either user defined or default) will be
> discarded. "
>
> All the best,
> Leah
>
> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax  wrote:
>
> > Leah,
> >
> > thanks a lot for the KIP. Very well written.
> >
> > The KIP does not talk about the handling of out-of-order data though.
> > How do you propose to address this?
> >
> >
> > -Matthias
> >
> > On 7/8/20 5:33 PM, Leah Thomas wrote:
> > > Hi all,
> > > I'd like to kick-off the discussion for KIP-450, adding sliding window
> > > aggregation support to Kafka Streams.
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> > >
> > > Let me know what you think,
> > > Leah
> > >
> >
> >
>


Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-10 Thread Leah Thomas
Hey Matthias,

Thanks for pointing that out. I added the following to the Propose Changes
section of the KIP:

"Records that come out of order will be processed the same way as in-order
records, as long as they fall within the grace period. Any new windows
created by the late record will still be created, and the existing windows
that are changed by the late record will be updated. Any record that falls
outside of the grace period (either user defined or default) will be
discarded. "

All the best,
Leah

On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax  wrote:

> Leah,
>
> thanks a lot for the KIP. Very well written.
>
> The KIP does not talk about the handling of out-of-order data though.
> How do you propose to address this?
>
>
> -Matthias
>
> On 7/8/20 5:33 PM, Leah Thomas wrote:
> > Hi all,
> > I'd like to kick-off the discussion for KIP-450, adding sliding window
> > aggregation support to Kafka Streams.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >
> > Let me know what you think,
> > Leah
> >
>
>


Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-10 Thread Sophie Blee-Goldman
Thanks Leah!

This kind of assumes an implicit answer to Matthias's question, but I was
wondering
if we should take this opportunity to choose a better default value for the
grace period.
Note that the default of -1 in the TimeWindows class, for example,
ultimately gets
translated into a default value of 24 hours. Which is kind of a long time.

This has hit users of suppression especially hard, since it means you won't
see *any*
output for 24 hours. It's pretty frustrating when you're trying to unit
test your topology
and nothing happens just because you didn't explicitly override the default
grace period.

Unfortunately, we're stuck with the 24hr grace period for our existing
operators for
compatibility reasons. But since this is a new kind of aggregation, we have
the opportunity
to consider alternatives and try to improve on this pain point for users.

Of course, the obvious question now is: what would be a good grace period?
We've
discussed this a number of times before and as far as I know were never
came up with
a good answer. It might also be somewhat confusing for users if different
kinds of
windowed aggregations had different default grace periods, although that's
not a
great reason to keep doing something that's obviously causing problems.

On the other hand, someone recently brought up what I thought was a good
suggestion:
just make the grace period a required parameter. This seems to solve the
existing
problem while dodging the question of what a "good" universal default would
be.

WDYT?

Cheers,
Sophie

On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax  wrote:

> Leah,
>
> thanks a lot for the KIP. Very well written.
>
> The KIP does not talk about the handling of out-of-order data though.
> How do you propose to address this?
>
>
> -Matthias
>
> On 7/8/20 5:33 PM, Leah Thomas wrote:
> > Hi all,
> > I'd like to kick-off the discussion for KIP-450, adding sliding window
> > aggregation support to Kafka Streams.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >
> > Let me know what you think,
> > Leah
> >
>
>


Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-09 Thread Matthias J. Sax
Leah,

thanks a lot for the KIP. Very well written.

The KIP does not talk about the handling of out-of-order data though.
How do you propose to address this?


-Matthias

On 7/8/20 5:33 PM, Leah Thomas wrote:
> Hi all,
> I'd like to kick-off the discussion for KIP-450, adding sliding window
> aggregation support to Kafka Streams.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> 
> Let me know what you think,
> Leah
> 



signature.asc
Description: OpenPGP digital signature