Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-09-09 Thread Guozhang Wang
Hello Matthias,

Thanks for your thoughts!

On Mon, Sep 9, 2019 at 6:02 PM Matthias J. Sax 
wrote:

> From my point of view, a Tumbling/Hopping window has different semantics
> than a Sliding-Window, and hence, I am not convinced atm that it's a
> good idea to use 1ms-hopping-windows.
>
>
>
> (1) I think that the window bounds are different, ie, while a
> time-window hast the lower-start-time as an inclusive bound and the
> larger-end-time as exclusive bound, both sliding-window both bounds are
> inclusive:
>
> 5ms Hopping-Window with 1ms advance: [0,5), [1,6), [2,7)
> 5ms Sliding-Window: [0,4] [1,5] [2,6]
>
> It might seem like a subtle difference, because the end-timestamp of the
> sliding-windows is one smaller compared to the corresponding
> hopping-window (and hence both windows span the same time effectively);
> however, I think it is an important difference, as the result record
> will have windowed-keys with corresponding (ie, different) start/end
> timestamps.
>
>
> I agree there's a semantic difference on inclusive / exclusive window
start/end-times, but personally I think for sliding windows another common
semantic is to have (start, end] instead of [start, end].

Nevertheless, I'm not sure if this semantics in practice would have a huge
impact still: a result record indicating an aggregation of either [start,
end], or [start, end), or (start, end] would not be very different
statistically, assuming our time unit is small enough (like milliseonds),
also since with any windowing mechanisms, there will be no gapping period,
i.e. any timestamp would fall in at least one window when accounting for
their contributions to aggregations, it means they would be accounted most
likely in either of the consecutive windows anyways. From an application's
point of view, such difference would not be significant --- even in the
case where window boundaries are specifically designed as alignment of
boundaries of minutes / hours / days, counting the records as the last
millisecond of the minute/hour/day or as the first millisecond of the next
minute/hour/day should not concern application developers.


>
> (2) A sliding-window should only emit data if the content changes, but
> not for every ms-instance.
>
> Assume records with timestamps (5) and (7). For hopping windows we would
> instantiate the following windows:
>
> [1,6) [2,7) [3,8) [4,9) [5,10) [6,11) [7,12)
>
> For sliding windows we only get 3 window instances:
>
> [1,5]   [3,7] [7,11] [8,12]
>
> A sliding window is only created if the content changes, while a hopping
> window is created for each time-span than contains data. Note, that the
> sliding window should also emit an empty aggregation when it becomes empty.
>
> To me this is more related to implementation details rather than the
semantics: from user's perspective, they would care when a result would be
emitted to downstream, and what time-range (i.e. window) it would
represent. E.g. we can also only emit

[1,5]   [3,7] [7,11] [8,12]

Under time-windowed operations as well with windowed-aggregation +
suppression implementations today --- in fact, it is already the case that
we create windowed entries in underlying store only when necessary. And
when should we emit the result can still be determined only at the
suppression operator level although internal implementations can be
optimized to take both aggregator / suppressor into considerations.


>
>
> (3) I am also not sure if the idea to use "panes" of size 1ms would
> really be efficient; note, that we need panes of that size to mimic
> sliding-windows and we cannot have larger panes. Hence, the
> data-reduction to only update a single pane instead of all overlapping
> hopping-window might not be significant enough. (I am not saying that we
> cannot use panes for tumbling/hopping windows, but for those, panes can
> be much larger).
>
> We do not necessarily need to create all small-sized panes if they do not
contain any data. In practice, I think with a relatively large window
length and a very small pane length (say, each window is composed of
thousands of panes), only a few panes would actually contain any data per
key at all and we only need to create those entries in the underlying store
and aggregate them upon request (of course, we need to record which panes
are non-empty). Assuming we can have a better way of representing a
secondary index out of the local state storage engine, I feel such
implementation should be sufficiently better than our current
implementation.


>
>
> -Matthias
>
>
>
>
> On 9/9/19 1:54 PM, Guozhang Wang wrote:
> > Hello John,
> >
> > I like your idea of adding a new Combinator interface better! In addition
> > to your arguments, we can also leverage on each overloaded function that
> > users supplies for different aggregation implementation (i.e. if
> combinator
> > is provided we can do window-slicing, otherwise we follow the current
> > approach). This is similar as 

Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-09-09 Thread Matthias J. Sax
From my point of view, a Tumbling/Hopping window has different semantics
than a Sliding-Window, and hence, I am not convinced atm that it's a
good idea to use 1ms-hopping-windows.



(1) I think that the window bounds are different, ie, while a
time-window hast the lower-start-time as an inclusive bound and the
larger-end-time as exclusive bound, both sliding-window both bounds are
inclusive:

5ms Hopping-Window with 1ms advance: [0,5), [1,6), [2,7)
5ms Sliding-Window: [0,4] [1,5] [2,6]

It might seem like a subtle difference, because the end-timestamp of the
sliding-windows is one smaller compared to the corresponding
hopping-window (and hence both windows span the same time effectively);
however, I think it is an important difference, as the result record
will have windowed-keys with corresponding (ie, different) start/end
timestamps.



(2) A sliding-window should only emit data if the content changes, but
not for every ms-instance.

Assume records with timestamps (5) and (7). For hopping windows we would
instantiate the following windows:

[1,6) [2,7) [3,8) [4,9) [5,10) [6,11) [7,12)

For sliding windows we only get 3 window instances:

[1,5]   [3,7] [7,11] [8,12]

A sliding window is only created if the content changes, while a hopping
window is created for each time-span than contains data. Note, that the
sliding window should also emit an empty aggregation when it becomes empty.



(3) I am also not sure if the idea to use "panes" of size 1ms would
really be efficient; note, that we need panes of that size to mimic
sliding-windows and we cannot have larger panes. Hence, the
data-reduction to only update a single pane instead of all overlapping
hopping-window might not be significant enough. (I am not saying that we
cannot use panes for tumbling/hopping windows, but for those, panes can
be much larger).



-Matthias




On 9/9/19 1:54 PM, Guozhang Wang wrote:
> Hello John,
> 
> I like your idea of adding a new Combinator interface better! In addition
> to your arguments, we can also leverage on each overloaded function that
> users supplies for different aggregation implementation (i.e. if combinator
> is provided we can do window-slicing, otherwise we follow the current
> approach). This is similar as existing optimizations in other frameworks
> like Spark.
> 
> Will update the corresponding wiki page.
> 
> Guozhang
> 
> 
> On Fri, Sep 6, 2019 at 11:08 AM John Roesler  wrote:
> 
>> Thanks for this idea, Guozhang, it does seem to be a nice way to solve
>> the problem.
>>
>> I'm a _little_ concerned about the interface, though. It might be
>> better to just add a new argument to a new method overload like
>> `(initializer, aggregator, merger/combinator/whatever)`.
>>
>> Two reasons come to mind for this:
>> 1) CombineAggregator is no longer a functional interface, so users
>> have to switch to anonymous classes
>> 2) there's a discoverability problem, because the API doesn't
>> advertise CombineAggregator anywhere, it's just a magic parameter you
>> can pass to get more efficient executions
>>
>> On the other hand, adding an argument (initializer, aggregator,
>> merger/combinator/whatever) lets you supply lambdas for all the args,
>> and also makes it clear that you're getting different (more efficient)
>> execution behavior.
>>
>> WDYT?
>>
>> Thanks again,
>> -John
>>
>>
>> On Wed, Sep 4, 2019 at 7:53 PM Guozhang Wang  wrote:
>>>
>>> Hi folks,
>>>
>>> I've been thinking more about this KIP and my understanding is that we
>> want
>>> to introduce a new SlidingWindow notion for aggregation since our current
>>> TimeWindow aggregation is not very efficient with very small steps. So
>> I'm
>>> wondering that rather than introducing a new implementation mechanism,
>> what
>>> if we just optimize the TimeWindowed aggregations where we can allow a
>> very
>>> small advance step (which would in practice sufficient mimic the sliding
>>> window behavior) compared to the window length itself, e.g. a window
>> length
>>> of 10 minutes with 1 second advance.
>>>
>>> I've quickly write up an alternative proposal for KIP-450 here:
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
>>> Please
>>> let me know your thoughts.
>>>
>>>
>>> Guozhang
>>>
>>> On Tue, Apr 16, 2019 at 3:14 PM Matthias J. Sax 
>>> wrote:
>>>
 Thanks Sophie!


 Regarding (4), I am in favor to support both. Not sure if we can reuse
 existing window store (with enabling to store duplicates) for this case
 or not though, or if we need to design a new store to keep all raw
>> records?

 Btw: for holistic aggregations, like media, we would need to support a
 different store layout for existing aggregations (time-window,
 session-window), too. Thus, if we add support for this, we might be
>> able
 to kill two birds with one stone. Of course, we would still need new
 APIs for existing aggregations to allow users to pick between both
>> cases.
>>

Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-09-09 Thread Guozhang Wang
Hello John,

I like your idea of adding a new Combinator interface better! In addition
to your arguments, we can also leverage on each overloaded function that
users supplies for different aggregation implementation (i.e. if combinator
is provided we can do window-slicing, otherwise we follow the current
approach). This is similar as existing optimizations in other frameworks
like Spark.

Will update the corresponding wiki page.

Guozhang


On Fri, Sep 6, 2019 at 11:08 AM John Roesler  wrote:

> Thanks for this idea, Guozhang, it does seem to be a nice way to solve
> the problem.
>
> I'm a _little_ concerned about the interface, though. It might be
> better to just add a new argument to a new method overload like
> `(initializer, aggregator, merger/combinator/whatever)`.
>
> Two reasons come to mind for this:
> 1) CombineAggregator is no longer a functional interface, so users
> have to switch to anonymous classes
> 2) there's a discoverability problem, because the API doesn't
> advertise CombineAggregator anywhere, it's just a magic parameter you
> can pass to get more efficient executions
>
> On the other hand, adding an argument (initializer, aggregator,
> merger/combinator/whatever) lets you supply lambdas for all the args,
> and also makes it clear that you're getting different (more efficient)
> execution behavior.
>
> WDYT?
>
> Thanks again,
> -John
>
>
> On Wed, Sep 4, 2019 at 7:53 PM Guozhang Wang  wrote:
> >
> > Hi folks,
> >
> > I've been thinking more about this KIP and my understanding is that we
> want
> > to introduce a new SlidingWindow notion for aggregation since our current
> > TimeWindow aggregation is not very efficient with very small steps. So
> I'm
> > wondering that rather than introducing a new implementation mechanism,
> what
> > if we just optimize the TimeWindowed aggregations where we can allow a
> very
> > small advance step (which would in practice sufficient mimic the sliding
> > window behavior) compared to the window length itself, e.g. a window
> length
> > of 10 minutes with 1 second advance.
> >
> > I've quickly write up an alternative proposal for KIP-450 here:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
> > Please
> > let me know your thoughts.
> >
> >
> > Guozhang
> >
> > On Tue, Apr 16, 2019 at 3:14 PM Matthias J. Sax 
> > wrote:
> >
> > > Thanks Sophie!
> > >
> > >
> > > Regarding (4), I am in favor to support both. Not sure if we can reuse
> > > existing window store (with enabling to store duplicates) for this case
> > > or not though, or if we need to design a new store to keep all raw
> records?
> > >
> > > Btw: for holistic aggregations, like media, we would need to support a
> > > different store layout for existing aggregations (time-window,
> > > session-window), too. Thus, if we add support for this, we might be
> able
> > > to kill two birds with one stone. Of course, we would still need new
> > > APIs for existing aggregations to allow users to pick between both
> cases.
> > >
> > > I only bring this up, because it might make sense to design the store
> in
> > > a way such that we can use it for all cases.
> > >
> > >
> > > About (3): atm we support wall-clock time via the corresponding
> > > `WallclockTimestampeExtractor`. Maybe Bill can elaborate a little bit
> > > more what he has in mind exactly, and why using this extractor would
> not
> > > meet the requirements for processing-time sliding windows?
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 4/16/19 10:16 AM, Guozhang Wang wrote:
> > > > Regarding 4): yes I agree with you that invertibility is not a common
> > > > property for agg-functions. Just to be clear about our current APIs:
> for
> > > > stream.aggregate we only require a single Adder function, whereas for
> > > > table.aggregate we require both Adder and Subtractor, but these are
> not
> > > > used to leverage any properties just that the incoming table
> changelog
> > > > stream may contain "tombstones" and hence we need to negate the
> effect of
> > > > the previous record that has been deleted by this tombstone.
> > > >
> > > > What I'm proposing is exactly having two APIs, one for Adder only
> (like
> > > > other Streams aggregations) and one for Subtractor + Adder (for agg
> > > > functions users think are invertible) for efficiency. Some other
> > > frameworks
> > > > (e.g. Spark) have similar options for users and will recommend using
> the
> > > > latter so that some optimization in implementation can be done.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Apr 15, 2019 at 12:29 PM Sophie Blee-Goldman <
> > > sop...@confluent.io>
> > > > wrote:
> > > >
> > > >> Thanks for the feedback Matthias and Bill. After discussing offline
> we
> > > >> realized the type of windows I originally had in mind were quite
> > > different,
> > > >> and I agree now that the semantics outlined by Matthias are the
> > > direction
> > > >> to go in here. I will update the KIP accordingly with the 

Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-09-06 Thread John Roesler
Thanks for this idea, Guozhang, it does seem to be a nice way to solve
the problem.

I'm a _little_ concerned about the interface, though. It might be
better to just add a new argument to a new method overload like
`(initializer, aggregator, merger/combinator/whatever)`.

Two reasons come to mind for this:
1) CombineAggregator is no longer a functional interface, so users
have to switch to anonymous classes
2) there's a discoverability problem, because the API doesn't
advertise CombineAggregator anywhere, it's just a magic parameter you
can pass to get more efficient executions

On the other hand, adding an argument (initializer, aggregator,
merger/combinator/whatever) lets you supply lambdas for all the args,
and also makes it clear that you're getting different (more efficient)
execution behavior.

WDYT?

Thanks again,
-John


On Wed, Sep 4, 2019 at 7:53 PM Guozhang Wang  wrote:
>
> Hi folks,
>
> I've been thinking more about this KIP and my understanding is that we want
> to introduce a new SlidingWindow notion for aggregation since our current
> TimeWindow aggregation is not very efficient with very small steps. So I'm
> wondering that rather than introducing a new implementation mechanism, what
> if we just optimize the TimeWindowed aggregations where we can allow a very
> small advance step (which would in practice sufficient mimic the sliding
> window behavior) compared to the window length itself, e.g. a window length
> of 10 minutes with 1 second advance.
>
> I've quickly write up an alternative proposal for KIP-450 here:
> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
> Please
> let me know your thoughts.
>
>
> Guozhang
>
> On Tue, Apr 16, 2019 at 3:14 PM Matthias J. Sax 
> wrote:
>
> > Thanks Sophie!
> >
> >
> > Regarding (4), I am in favor to support both. Not sure if we can reuse
> > existing window store (with enabling to store duplicates) for this case
> > or not though, or if we need to design a new store to keep all raw records?
> >
> > Btw: for holistic aggregations, like media, we would need to support a
> > different store layout for existing aggregations (time-window,
> > session-window), too. Thus, if we add support for this, we might be able
> > to kill two birds with one stone. Of course, we would still need new
> > APIs for existing aggregations to allow users to pick between both cases.
> >
> > I only bring this up, because it might make sense to design the store in
> > a way such that we can use it for all cases.
> >
> >
> > About (3): atm we support wall-clock time via the corresponding
> > `WallclockTimestampeExtractor`. Maybe Bill can elaborate a little bit
> > more what he has in mind exactly, and why using this extractor would not
> > meet the requirements for processing-time sliding windows?
> >
> >
> > -Matthias
> >
> >
> > On 4/16/19 10:16 AM, Guozhang Wang wrote:
> > > Regarding 4): yes I agree with you that invertibility is not a common
> > > property for agg-functions. Just to be clear about our current APIs: for
> > > stream.aggregate we only require a single Adder function, whereas for
> > > table.aggregate we require both Adder and Subtractor, but these are not
> > > used to leverage any properties just that the incoming table changelog
> > > stream may contain "tombstones" and hence we need to negate the effect of
> > > the previous record that has been deleted by this tombstone.
> > >
> > > What I'm proposing is exactly having two APIs, one for Adder only (like
> > > other Streams aggregations) and one for Subtractor + Adder (for agg
> > > functions users think are invertible) for efficiency. Some other
> > frameworks
> > > (e.g. Spark) have similar options for users and will recommend using the
> > > latter so that some optimization in implementation can be done.
> > >
> > >
> > > Guozhang
> > >
> > > On Mon, Apr 15, 2019 at 12:29 PM Sophie Blee-Goldman <
> > sop...@confluent.io>
> > > wrote:
> > >
> > >> Thanks for the feedback Matthias and Bill. After discussing offline we
> > >> realized the type of windows I originally had in mind were quite
> > different,
> > >> and I agree now that the semantics outlined by Matthias are the
> > direction
> > >> to go in here. I will update the KIP accordingly with the new semantics
> > >> (and corresponding design) and restart the discussion from there.
> > >>
> > >> In the meantime, to respond to some other points:
> > >>
> > >> 1) API:
> > >>
> > >> I propose adding only the one class -- public class SlidingWindows
> > extends
> > >> Windows {} --  so I do not believe we need any new Serdes?
> > It
> > >> will still be a fixed size TimeWindow, but handled a bit differently.
> > I've
> > >> updated the KIP to state explicitly all of the classes/methods being
> > added
> > >>
> > >> 2) Zero grace period
> > >>
> > >> The "zero grace period" was essentially just consequence of my original
> > >> definition for sliding windows; with the new semantics we can (and
> > should)
> > >> allow for a

Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-09-04 Thread Guozhang Wang
Hi folks,

I've been thinking more about this KIP and my understanding is that we want
to introduce a new SlidingWindow notion for aggregation since our current
TimeWindow aggregation is not very efficient with very small steps. So I'm
wondering that rather than introducing a new implementation mechanism, what
if we just optimize the TimeWindowed aggregations where we can allow a very
small advance step (which would in practice sufficient mimic the sliding
window behavior) compared to the window length itself, e.g. a window length
of 10 minutes with 1 second advance.

I've quickly write up an alternative proposal for KIP-450 here:
https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
Please
let me know your thoughts.


Guozhang

On Tue, Apr 16, 2019 at 3:14 PM Matthias J. Sax 
wrote:

> Thanks Sophie!
>
>
> Regarding (4), I am in favor to support both. Not sure if we can reuse
> existing window store (with enabling to store duplicates) for this case
> or not though, or if we need to design a new store to keep all raw records?
>
> Btw: for holistic aggregations, like media, we would need to support a
> different store layout for existing aggregations (time-window,
> session-window), too. Thus, if we add support for this, we might be able
> to kill two birds with one stone. Of course, we would still need new
> APIs for existing aggregations to allow users to pick between both cases.
>
> I only bring this up, because it might make sense to design the store in
> a way such that we can use it for all cases.
>
>
> About (3): atm we support wall-clock time via the corresponding
> `WallclockTimestampeExtractor`. Maybe Bill can elaborate a little bit
> more what he has in mind exactly, and why using this extractor would not
> meet the requirements for processing-time sliding windows?
>
>
> -Matthias
>
>
> On 4/16/19 10:16 AM, Guozhang Wang wrote:
> > Regarding 4): yes I agree with you that invertibility is not a common
> > property for agg-functions. Just to be clear about our current APIs: for
> > stream.aggregate we only require a single Adder function, whereas for
> > table.aggregate we require both Adder and Subtractor, but these are not
> > used to leverage any properties just that the incoming table changelog
> > stream may contain "tombstones" and hence we need to negate the effect of
> > the previous record that has been deleted by this tombstone.
> >
> > What I'm proposing is exactly having two APIs, one for Adder only (like
> > other Streams aggregations) and one for Subtractor + Adder (for agg
> > functions users think are invertible) for efficiency. Some other
> frameworks
> > (e.g. Spark) have similar options for users and will recommend using the
> > latter so that some optimization in implementation can be done.
> >
> >
> > Guozhang
> >
> > On Mon, Apr 15, 2019 at 12:29 PM Sophie Blee-Goldman <
> sop...@confluent.io>
> > wrote:
> >
> >> Thanks for the feedback Matthias and Bill. After discussing offline we
> >> realized the type of windows I originally had in mind were quite
> different,
> >> and I agree now that the semantics outlined by Matthias are the
> direction
> >> to go in here. I will update the KIP accordingly with the new semantics
> >> (and corresponding design) and restart the discussion from there.
> >>
> >> In the meantime, to respond to some other points:
> >>
> >> 1) API:
> >>
> >> I propose adding only the one class -- public class SlidingWindows
> extends
> >> Windows {} --  so I do not believe we need any new Serdes?
> It
> >> will still be a fixed size TimeWindow, but handled a bit differently.
> I've
> >> updated the KIP to state explicitly all of the classes/methods being
> added
> >>
> >> 2) Zero grace period
> >>
> >> The "zero grace period" was essentially just consequence of my original
> >> definition for sliding windows; with the new semantics we can (and
> should)
> >> allow for a nonzero grace period
> >>
> >> 3) Wall-clock time
> >>
> >> Hm, I had not considered this yet but it may be a good idea to keep in
> mind
> >> while rethinking the design. To clarify, we don't support wall-clock
> based
> >> aggregations with hopping or tumbling windows though (yet?)
> >>
> >> 4) Commutative vs associative vs invertible aggregations
> >>
> >> I agree that it's reasonable to assume commutativity and associativity,
> but
> >> that's not the same as being subtractable -- that requires
> invertibility,
> >> which is broken by a lot of very simple functions and is not, I think,
> ok
> >> to assume. However we could consider adding a separate API which also
> takes
> >> a subtractor and corresponds to a completely different implementation.
> We
> >> could also consider an API that takes a function that aggregates two
> >> aggregates together in addition to the existing aggregator (which
> >> aggregates a single value with an existing aggregate) WDYT?
> >>
> >>
> >>
> >>
> >> On Thu, Apr 11, 2019 at 1:13 AM Matthias J. Sax 
> >> wrote:
> >>
> >>> Thanks for t

Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-04-16 Thread Matthias J. Sax
Thanks Sophie!


Regarding (4), I am in favor to support both. Not sure if we can reuse
existing window store (with enabling to store duplicates) for this case
or not though, or if we need to design a new store to keep all raw records?

Btw: for holistic aggregations, like media, we would need to support a
different store layout for existing aggregations (time-window,
session-window), too. Thus, if we add support for this, we might be able
to kill two birds with one stone. Of course, we would still need new
APIs for existing aggregations to allow users to pick between both cases.

I only bring this up, because it might make sense to design the store in
a way such that we can use it for all cases.


About (3): atm we support wall-clock time via the corresponding
`WallclockTimestampeExtractor`. Maybe Bill can elaborate a little bit
more what he has in mind exactly, and why using this extractor would not
meet the requirements for processing-time sliding windows?


-Matthias


On 4/16/19 10:16 AM, Guozhang Wang wrote:
> Regarding 4): yes I agree with you that invertibility is not a common
> property for agg-functions. Just to be clear about our current APIs: for
> stream.aggregate we only require a single Adder function, whereas for
> table.aggregate we require both Adder and Subtractor, but these are not
> used to leverage any properties just that the incoming table changelog
> stream may contain "tombstones" and hence we need to negate the effect of
> the previous record that has been deleted by this tombstone.
> 
> What I'm proposing is exactly having two APIs, one for Adder only (like
> other Streams aggregations) and one for Subtractor + Adder (for agg
> functions users think are invertible) for efficiency. Some other frameworks
> (e.g. Spark) have similar options for users and will recommend using the
> latter so that some optimization in implementation can be done.
> 
> 
> Guozhang
> 
> On Mon, Apr 15, 2019 at 12:29 PM Sophie Blee-Goldman 
> wrote:
> 
>> Thanks for the feedback Matthias and Bill. After discussing offline we
>> realized the type of windows I originally had in mind were quite different,
>> and I agree now that the semantics outlined by Matthias are the direction
>> to go in here. I will update the KIP accordingly with the new semantics
>> (and corresponding design) and restart the discussion from there.
>>
>> In the meantime, to respond to some other points:
>>
>> 1) API:
>>
>> I propose adding only the one class -- public class SlidingWindows extends
>> Windows {} --  so I do not believe we need any new Serdes? It
>> will still be a fixed size TimeWindow, but handled a bit differently. I've
>> updated the KIP to state explicitly all of the classes/methods being added
>>
>> 2) Zero grace period
>>
>> The "zero grace period" was essentially just consequence of my original
>> definition for sliding windows; with the new semantics we can (and should)
>> allow for a nonzero grace period
>>
>> 3) Wall-clock time
>>
>> Hm, I had not considered this yet but it may be a good idea to keep in mind
>> while rethinking the design. To clarify, we don't support wall-clock based
>> aggregations with hopping or tumbling windows though (yet?)
>>
>> 4) Commutative vs associative vs invertible aggregations
>>
>> I agree that it's reasonable to assume commutativity and associativity, but
>> that's not the same as being subtractable -- that requires invertibility,
>> which is broken by a lot of very simple functions and is not, I think, ok
>> to assume. However we could consider adding a separate API which also takes
>> a subtractor and corresponds to a completely different implementation. We
>> could also consider an API that takes a function that aggregates two
>> aggregates together in addition to the existing aggregator (which
>> aggregates a single value with an existing aggregate) WDYT?
>>
>>
>>
>>
>> On Thu, Apr 11, 2019 at 1:13 AM Matthias J. Sax 
>> wrote:
>>
>>> Thanks for the KIP Sophie. Couple of comments:
>>>
>>> It's a little unclear to me, what public API you propose. It seems you
>>> want to add
>>>
 public class SlidingWindow extends TimeWindow {}
>>>
>>> and
>>>
 public class SlidingWindows extends TimeWindows {} // or maybe `extends
>>> Windows`
>>>
>>> If yes, should we add corresponding public Serdes classes?
>>>
>>> Also, can you list all newly added classes/methods explicitly in the
>> wiki?
>>>
>>>
>>> About the semantics of the operator.
>>>
 "Only one single window is defined at a time,"
>>>
>>> Should this be "one window per key" instead?
>>>
>>> I agree that both window boundaries should be inclusive. However, I am
>>> not sure about:
>>>
 At most one record is forwarded when new data arrives
>>>
>>> (1) For what case, no output would be produced?
>>>
>>> (2) I think, if we advance in time, it can also happen that we emit
>>> multiple records. If a window "slides" (not "hops"), we cannot just
>>> advance it to the current record stream time but would need to emit mo

Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-04-16 Thread Guozhang Wang
Regarding 4): yes I agree with you that invertibility is not a common
property for agg-functions. Just to be clear about our current APIs: for
stream.aggregate we only require a single Adder function, whereas for
table.aggregate we require both Adder and Subtractor, but these are not
used to leverage any properties just that the incoming table changelog
stream may contain "tombstones" and hence we need to negate the effect of
the previous record that has been deleted by this tombstone.

What I'm proposing is exactly having two APIs, one for Adder only (like
other Streams aggregations) and one for Subtractor + Adder (for agg
functions users think are invertible) for efficiency. Some other frameworks
(e.g. Spark) have similar options for users and will recommend using the
latter so that some optimization in implementation can be done.


Guozhang

On Mon, Apr 15, 2019 at 12:29 PM Sophie Blee-Goldman 
wrote:

> Thanks for the feedback Matthias and Bill. After discussing offline we
> realized the type of windows I originally had in mind were quite different,
> and I agree now that the semantics outlined by Matthias are the direction
> to go in here. I will update the KIP accordingly with the new semantics
> (and corresponding design) and restart the discussion from there.
>
> In the meantime, to respond to some other points:
>
> 1) API:
>
> I propose adding only the one class -- public class SlidingWindows extends
> Windows {} --  so I do not believe we need any new Serdes? It
> will still be a fixed size TimeWindow, but handled a bit differently. I've
> updated the KIP to state explicitly all of the classes/methods being added
>
> 2) Zero grace period
>
> The "zero grace period" was essentially just consequence of my original
> definition for sliding windows; with the new semantics we can (and should)
> allow for a nonzero grace period
>
> 3) Wall-clock time
>
> Hm, I had not considered this yet but it may be a good idea to keep in mind
> while rethinking the design. To clarify, we don't support wall-clock based
> aggregations with hopping or tumbling windows though (yet?)
>
> 4) Commutative vs associative vs invertible aggregations
>
> I agree that it's reasonable to assume commutativity and associativity, but
> that's not the same as being subtractable -- that requires invertibility,
> which is broken by a lot of very simple functions and is not, I think, ok
> to assume. However we could consider adding a separate API which also takes
> a subtractor and corresponds to a completely different implementation. We
> could also consider an API that takes a function that aggregates two
> aggregates together in addition to the existing aggregator (which
> aggregates a single value with an existing aggregate) WDYT?
>
>
>
>
> On Thu, Apr 11, 2019 at 1:13 AM Matthias J. Sax 
> wrote:
>
> > Thanks for the KIP Sophie. Couple of comments:
> >
> > It's a little unclear to me, what public API you propose. It seems you
> > want to add
> >
> > > public class SlidingWindow extends TimeWindow {}
> >
> > and
> >
> > > public class SlidingWindows extends TimeWindows {} // or maybe `extends
> > Windows`
> >
> > If yes, should we add corresponding public Serdes classes?
> >
> > Also, can you list all newly added classes/methods explicitly in the
> wiki?
> >
> >
> > About the semantics of the operator.
> >
> > > "Only one single window is defined at a time,"
> >
> > Should this be "one window per key" instead?
> >
> > I agree that both window boundaries should be inclusive. However, I am
> > not sure about:
> >
> > > At most one record is forwarded when new data arrives
> >
> > (1) For what case, no output would be produced?
> >
> > (2) I think, if we advance in time, it can also happen that we emit
> > multiple records. If a window "slides" (not "hops"), we cannot just
> > advance it to the current record stream time but would need to emit more
> > result if records expire before the current input record is added. For
> > example, consider a window with size 5ms, and the following ts (all
> > records have the same key):
> >
> > 1 2 3 10 11
> >
> > This should result in windows:
> >
> > [1]
> > [1,2]
> > [1,2,3]
> > [2,3]
> > [3]
> > [10]
> > [10,11]
> >
> > Ie, when the record with ts=10 is processed, it will trigger the
> > computation of [2,3], [3] and [10].
> >
> >
> > About out-of-order handling: I am wondering, if the current design that
> > does not allow any grace period is too restrictive. Can you elaborate
> > more on the motivation for this suggestions?
> >
> >
> > Can you give more details about the "simple design"? Atm, it's not clear
> > to me how it works. I though we always need to store all raw values. If
> > we only store the current aggregate, would we end up with the same
> > inefficient solution as using a hopping window with advance 1ms?
> >
> >
> > For the O(sqrt(N)) proposal: can you maybe add an example with concrete
> > bucket sizes, window size etc. The current proposal is a little unclear
> > to me, atm.

Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-04-15 Thread Sophie Blee-Goldman
Thanks for the feedback Matthias and Bill. After discussing offline we
realized the type of windows I originally had in mind were quite different,
and I agree now that the semantics outlined by Matthias are the direction
to go in here. I will update the KIP accordingly with the new semantics
(and corresponding design) and restart the discussion from there.

In the meantime, to respond to some other points:

1) API:

I propose adding only the one class -- public class SlidingWindows extends
Windows {} --  so I do not believe we need any new Serdes? It
will still be a fixed size TimeWindow, but handled a bit differently. I've
updated the KIP to state explicitly all of the classes/methods being added

2) Zero grace period

The "zero grace period" was essentially just consequence of my original
definition for sliding windows; with the new semantics we can (and should)
allow for a nonzero grace period

3) Wall-clock time

Hm, I had not considered this yet but it may be a good idea to keep in mind
while rethinking the design. To clarify, we don't support wall-clock based
aggregations with hopping or tumbling windows though (yet?)

4) Commutative vs associative vs invertible aggregations

I agree that it's reasonable to assume commutativity and associativity, but
that's not the same as being subtractable -- that requires invertibility,
which is broken by a lot of very simple functions and is not, I think, ok
to assume. However we could consider adding a separate API which also takes
a subtractor and corresponds to a completely different implementation. We
could also consider an API that takes a function that aggregates two
aggregates together in addition to the existing aggregator (which
aggregates a single value with an existing aggregate) WDYT?




On Thu, Apr 11, 2019 at 1:13 AM Matthias J. Sax 
wrote:

> Thanks for the KIP Sophie. Couple of comments:
>
> It's a little unclear to me, what public API you propose. It seems you
> want to add
>
> > public class SlidingWindow extends TimeWindow {}
>
> and
>
> > public class SlidingWindows extends TimeWindows {} // or maybe `extends
> Windows`
>
> If yes, should we add corresponding public Serdes classes?
>
> Also, can you list all newly added classes/methods explicitly in the wiki?
>
>
> About the semantics of the operator.
>
> > "Only one single window is defined at a time,"
>
> Should this be "one window per key" instead?
>
> I agree that both window boundaries should be inclusive. However, I am
> not sure about:
>
> > At most one record is forwarded when new data arrives
>
> (1) For what case, no output would be produced?
>
> (2) I think, if we advance in time, it can also happen that we emit
> multiple records. If a window "slides" (not "hops"), we cannot just
> advance it to the current record stream time but would need to emit more
> result if records expire before the current input record is added. For
> example, consider a window with size 5ms, and the following ts (all
> records have the same key):
>
> 1 2 3 10 11
>
> This should result in windows:
>
> [1]
> [1,2]
> [1,2,3]
> [2,3]
> [3]
> [10]
> [10,11]
>
> Ie, when the record with ts=10 is processed, it will trigger the
> computation of [2,3], [3] and [10].
>
>
> About out-of-order handling: I am wondering, if the current design that
> does not allow any grace period is too restrictive. Can you elaborate
> more on the motivation for this suggestions?
>
>
> Can you give more details about the "simple design"? Atm, it's not clear
> to me how it works. I though we always need to store all raw values. If
> we only store the current aggregate, would we end up with the same
> inefficient solution as using a hopping window with advance 1ms?
>
>
> For the O(sqrt(N)) proposal: can you maybe add an example with concrete
> bucket sizes, window size etc. The current proposal is a little unclear
> to me, atm.
>
>
> How are windows advance? Do you propose to advance all windows over all
> keys at the same time, or would each window (per key) advance
> independent from all other windows? What would be the pros/cons for both
> approaches?
>
>
> To add to Guozhang's comment: atm, DSL operators assume that aggregate
> functions are commutative and associative. Hence, it seems ok to make
> the same assumption for sliding window. Addressing holistic and
> non-subtractable aggregations should be supported out of the box at some
> point, too, but this would be a different KIP adding this to all
> existing aggregations.
>
>
> -Matthias
>
>
>
> On 4/9/19 4:38 PM, Guozhang Wang wrote:
> > Hi Sophie,
> >
> > Thanks for the proposed KIP. I've made a pass over it and here are some
> > thoughts:
> >
> > 1. "The window size is effectively the grace and retention period". The
> > grace time is defined as "the time to admit late-arriving events after
> the
> > end of the window." hence it is the additional time beyond the window
> size.
> > I guess your were trying to say it should be zero?
> >
> > Also for retention period, it is not 

Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-04-12 Thread Bill Bejeck
Thanks for the KIP Sophie.

I have a couple of additional comments.

The current proposal only considers stream-time.

While I support this, each time we introduce a new operation based on
stream-time, invariably users request that operation support wall-clock
time as well.  Would we want to consider this option in the current KIP
proactively?

Also, I think the concept of 0 grace period is too restrictive as well.  I
may have missed your response, but can you elaborate on the reasoning?

Thanks,
Bill


On Fri, Apr 12, 2019 at 12:14 PM Guozhang Wang  wrote:

> On Thu, Apr 11, 2019 at 2:10 PM Sophie Blee-Goldman 
> wrote:
>
> > Thanks for the comments Guozhang! I've answered your questions below
> >
> > On Tue, Apr 9, 2019 at 4:38 PM Guozhang Wang  wrote:
> >
> > > Hi Sophie,
> > >
> > > Thanks for the proposed KIP. I've made a pass over it and here are some
> > > thoughts:
> > >
> > > 1. "The window size is effectively the grace and retention period". The
> > > grace time is defined as "the time to admit late-arriving events after
> > the
> > > end of the window." hence it is the additional time beyond the window
> > size.
> > > I guess your were trying to say it should be zero?
> > >
> > > Also for retention period, it is not a notion of the window spec any
> > more,
> > > but only for the window store itself. So I'd suggest talking about
> window
> > > size here, and note that store retention time cannot be controlled via
> > > window spec at all.
> > >
> >
> > Yes, I meant to say the grace period is effectively zero -- the retention
> > period will ultimately be the same as the window size, which is
> > configurable, but it can't be configured independently if that's what you
> > mean?
> >
> >
> You can confiture retention via Materialized (in DSL), when specifying the
> store in KTable, or via WindowStoreBuilder#retentionPeriod (in PAPI). The
> point here is that they are specified independently from the Windows spec.
> So a user cannot control how long a materialized store can be retained from
> the window spec itself, she must do that via the mentioned methods before.
>
>
> >
> > > 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire
> a
> > > bucket, so I'd assume you will expire one bucket as a whole when its
> end
> > > time is smaller than the current window's starting time, right?
> > >
> >
> > Since this design assumes we don't have a subtracter, each bucket would
> > expire when it's start time is outside the current window; the remaining
> > values in that bucket are then aggregated with the "running aggregate" of
> > the next bucket to get the total aggregate for the entire window. I'll
> try
> > to come up with a diagram and/or better way to explain what I have in
> mind
> > here...
> > (The values themselves in the buckets will expire automatically by
> setting
> > the retention period of the underlying window store)
> >
> >
> > > 3. Also in your algorithm how to choose "M" seems tricky, would it be a
> > > configurable parameter exposed to users or is it abstracted away and
> only
> > > being selected internally?
> > >
> >
> > Good question. If we ignore the difference in cost between aggregation
> > operations and writes to the underlying store, the optimal value of M is
> > sqrt(N). But the reality is the aggregation might be very simple vs
> > expensive RocksDB writes -- conversely the aggregation itself could be
> > complicated/costly while the underlying store is cheap to write  (ie
> > in-memory). I do feel it should be abstracted away from the user however
> > and not an additional parameter they need to consider and tune (eg
> > segmentInterval) ... some profiling would probably be needed to
> determine a
> > reasonable choice
> >
> >
> > > 4. "There is some tradeoff between purely optimizing " seems incomplete
> > > paragraph?
> > >
> >
> > Whoops
> >
> >
> > > 5. Meta comment: for many aggregations it is commutative and
> associative
> > so
> > > we can require users to pass in a "substract" function as well. Given
> > these
> > > two function I think we can propose two set of APIs, 1) with the adder
> > and
> > > subtractor and 2) with the added only (if the aggregate logic is not
> > comm.
> > > and assoc.).
> > >
> > > We just maintain an aggregate value for each bucket (call it
> > > bucket_aggregate) plus for the whole window (call it total_aggregate),
> > i.e.
> > > at most M + 1 values per key. We use the total_aggregate for queries,
> and
> > > each update will cause 2 writes (to the bucket and to the total
> > aggregate).
> > >
> > > And with 1) when expiring the oldest bucket we simply call
> > > subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
> > > oldest bucket we can re-compute the total_aggregate by
> > > sum(bucket_aggregate) over other buckets again.
> > >
> >
> > This is a good point, ie we can definitely be much smarter in our design
> if
> > we have a subtracter, in which case it's probably worth separate sets of
> > AP

Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-04-12 Thread Guozhang Wang
On Thu, Apr 11, 2019 at 2:10 PM Sophie Blee-Goldman 
wrote:

> Thanks for the comments Guozhang! I've answered your questions below
>
> On Tue, Apr 9, 2019 at 4:38 PM Guozhang Wang  wrote:
>
> > Hi Sophie,
> >
> > Thanks for the proposed KIP. I've made a pass over it and here are some
> > thoughts:
> >
> > 1. "The window size is effectively the grace and retention period". The
> > grace time is defined as "the time to admit late-arriving events after
> the
> > end of the window." hence it is the additional time beyond the window
> size.
> > I guess your were trying to say it should be zero?
> >
> > Also for retention period, it is not a notion of the window spec any
> more,
> > but only for the window store itself. So I'd suggest talking about window
> > size here, and note that store retention time cannot be controlled via
> > window spec at all.
> >
>
> Yes, I meant to say the grace period is effectively zero -- the retention
> period will ultimately be the same as the window size, which is
> configurable, but it can't be configured independently if that's what you
> mean?
>
>
You can confiture retention via Materialized (in DSL), when specifying the
store in KTable, or via WindowStoreBuilder#retentionPeriod (in PAPI). The
point here is that they are specified independently from the Windows spec.
So a user cannot control how long a materialized store can be retained from
the window spec itself, she must do that via the mentioned methods before.


>
> > 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a
> > bucket, so I'd assume you will expire one bucket as a whole when its end
> > time is smaller than the current window's starting time, right?
> >
>
> Since this design assumes we don't have a subtracter, each bucket would
> expire when it's start time is outside the current window; the remaining
> values in that bucket are then aggregated with the "running aggregate" of
> the next bucket to get the total aggregate for the entire window. I'll try
> to come up with a diagram and/or better way to explain what I have in mind
> here...
> (The values themselves in the buckets will expire automatically by setting
> the retention period of the underlying window store)
>
>
> > 3. Also in your algorithm how to choose "M" seems tricky, would it be a
> > configurable parameter exposed to users or is it abstracted away and only
> > being selected internally?
> >
>
> Good question. If we ignore the difference in cost between aggregation
> operations and writes to the underlying store, the optimal value of M is
> sqrt(N). But the reality is the aggregation might be very simple vs
> expensive RocksDB writes -- conversely the aggregation itself could be
> complicated/costly while the underlying store is cheap to write  (ie
> in-memory). I do feel it should be abstracted away from the user however
> and not an additional parameter they need to consider and tune (eg
> segmentInterval) ... some profiling would probably be needed to determine a
> reasonable choice
>
>
> > 4. "There is some tradeoff between purely optimizing " seems incomplete
> > paragraph?
> >
>
> Whoops
>
>
> > 5. Meta comment: for many aggregations it is commutative and associative
> so
> > we can require users to pass in a "substract" function as well. Given
> these
> > two function I think we can propose two set of APIs, 1) with the adder
> and
> > subtractor and 2) with the added only (if the aggregate logic is not
> comm.
> > and assoc.).
> >
> > We just maintain an aggregate value for each bucket (call it
> > bucket_aggregate) plus for the whole window (call it total_aggregate),
> i.e.
> > at most M + 1 values per key. We use the total_aggregate for queries, and
> > each update will cause 2 writes (to the bucket and to the total
> aggregate).
> >
> > And with 1) when expiring the oldest bucket we simply call
> > subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
> > oldest bucket we can re-compute the total_aggregate by
> > sum(bucket_aggregate) over other buckets again.
> >
>
> This is a good point, ie we can definitely be much smarter in our design if
> we have a subtracter, in which case it's probably worth separate sets of
> APIs/implementations based on what the user can provide. I'll work this
> into the KIP
>
>
> > 6. Meta comment: it is reasonable to assume in practice out-of-ordering
> > data is not very common, hence most of the updates will be falling into
> the
> > latest bucket. So I'm wondering if it makes sense to always store the
> first
> > bucket in memory while making other buckets optionally on persistent
> > storage. In practice, as long as M is large enough (we probably need it
> to
> > be large enough to have sufficiently sensitive expiration anyways) then
> > each bucket's aggregate data is small enough to be in memory.
> >
>
> This sounds reasonable to me (looking into the future, if we want to
> eventually support a way to "tune" the total memory usage by Streams this
> could be tur

Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-04-11 Thread Sophie Blee-Goldman
Thanks for the comments Guozhang! I've answered your questions below

On Tue, Apr 9, 2019 at 4:38 PM Guozhang Wang  wrote:

> Hi Sophie,
>
> Thanks for the proposed KIP. I've made a pass over it and here are some
> thoughts:
>
> 1. "The window size is effectively the grace and retention period". The
> grace time is defined as "the time to admit late-arriving events after the
> end of the window." hence it is the additional time beyond the window size.
> I guess your were trying to say it should be zero?
>
> Also for retention period, it is not a notion of the window spec any more,
> but only for the window store itself. So I'd suggest talking about window
> size here, and note that store retention time cannot be controlled via
> window spec at all.
>

Yes, I meant to say the grace period is effectively zero -- the retention
period will ultimately be the same as the window size, which is
configurable, but it can't be configured independently if that's what you
mean?


> 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a
> bucket, so I'd assume you will expire one bucket as a whole when its end
> time is smaller than the current window's starting time, right?
>

Since this design assumes we don't have a subtracter, each bucket would
expire when it's start time is outside the current window; the remaining
values in that bucket are then aggregated with the "running aggregate" of
the next bucket to get the total aggregate for the entire window. I'll try
to come up with a diagram and/or better way to explain what I have in mind
here...
(The values themselves in the buckets will expire automatically by setting
the retention period of the underlying window store)


> 3. Also in your algorithm how to choose "M" seems tricky, would it be a
> configurable parameter exposed to users or is it abstracted away and only
> being selected internally?
>

Good question. If we ignore the difference in cost between aggregation
operations and writes to the underlying store, the optimal value of M is
sqrt(N). But the reality is the aggregation might be very simple vs
expensive RocksDB writes -- conversely the aggregation itself could be
complicated/costly while the underlying store is cheap to write  (ie
in-memory). I do feel it should be abstracted away from the user however
and not an additional parameter they need to consider and tune (eg
segmentInterval) ... some profiling would probably be needed to determine a
reasonable choice


> 4. "There is some tradeoff between purely optimizing " seems incomplete
> paragraph?
>

Whoops


> 5. Meta comment: for many aggregations it is commutative and associative so
> we can require users to pass in a "substract" function as well. Given these
> two function I think we can propose two set of APIs, 1) with the adder and
> subtractor and 2) with the added only (if the aggregate logic is not comm.
> and assoc.).
>
> We just maintain an aggregate value for each bucket (call it
> bucket_aggregate) plus for the whole window (call it total_aggregate), i.e.
> at most M + 1 values per key. We use the total_aggregate for queries, and
> each update will cause 2 writes (to the bucket and to the total aggregate).
>
> And with 1) when expiring the oldest bucket we simply call
> subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
> oldest bucket we can re-compute the total_aggregate by
> sum(bucket_aggregate) over other buckets again.
>

This is a good point, ie we can definitely be much smarter in our design if
we have a subtracter, in which case it's probably worth separate sets of
APIs/implementations based on what the user can provide. I'll work this
into the KIP


> 6. Meta comment: it is reasonable to assume in practice out-of-ordering
> data is not very common, hence most of the updates will be falling into the
> latest bucket. So I'm wondering if it makes sense to always store the first
> bucket in memory while making other buckets optionally on persistent
> storage. In practice, as long as M is large enough (we probably need it to
> be large enough to have sufficiently sensitive expiration anyways) then
> each bucket's aggregate data is small enough to be in memory.
>

This sounds reasonable to me (looking into the future, if we want to
eventually support a way to "tune" the total memory usage by Streams this
could be turned on/off)


> Guozhang
>
>
>
> On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman 
> wrote:
>
> > Hello all,
> >
> > I would like to kick off discussion of this KIP aimed at providing
> sliding
> > window semantics to DSL aggregations.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >
> > Please take a look and share any thoughts you have regarding the API,
> > semantics, design, etc!
> >
> > I also have a POC PR open with the "naive" implementation for your
> > reference: https://github.com/apache/kafka/pull/6549
> >
> > Cheers,
> > Sophie
> >
>
>
> --
> -- Guozhang

Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-04-11 Thread Matthias J. Sax
Thanks for the KIP Sophie. Couple of comments:

It's a little unclear to me, what public API you propose. It seems you
want to add

> public class SlidingWindow extends TimeWindow {}

and

> public class SlidingWindows extends TimeWindows {} // or maybe `extends 
> Windows`

If yes, should we add corresponding public Serdes classes?

Also, can you list all newly added classes/methods explicitly in the wiki?


About the semantics of the operator.

> "Only one single window is defined at a time,"

Should this be "one window per key" instead?

I agree that both window boundaries should be inclusive. However, I am
not sure about:

> At most one record is forwarded when new data arrives

(1) For what case, no output would be produced?

(2) I think, if we advance in time, it can also happen that we emit
multiple records. If a window "slides" (not "hops"), we cannot just
advance it to the current record stream time but would need to emit more
result if records expire before the current input record is added. For
example, consider a window with size 5ms, and the following ts (all
records have the same key):

1 2 3 10 11

This should result in windows:

[1]
[1,2]
[1,2,3]
[2,3]
[3]
[10]
[10,11]

Ie, when the record with ts=10 is processed, it will trigger the
computation of [2,3], [3] and [10].


About out-of-order handling: I am wondering, if the current design that
does not allow any grace period is too restrictive. Can you elaborate
more on the motivation for this suggestions?


Can you give more details about the "simple design"? Atm, it's not clear
to me how it works. I though we always need to store all raw values. If
we only store the current aggregate, would we end up with the same
inefficient solution as using a hopping window with advance 1ms?


For the O(sqrt(N)) proposal: can you maybe add an example with concrete
bucket sizes, window size etc. The current proposal is a little unclear
to me, atm.


How are windows advance? Do you propose to advance all windows over all
keys at the same time, or would each window (per key) advance
independent from all other windows? What would be the pros/cons for both
approaches?


To add to Guozhang's comment: atm, DSL operators assume that aggregate
functions are commutative and associative. Hence, it seems ok to make
the same assumption for sliding window. Addressing holistic and
non-subtractable aggregations should be supported out of the box at some
point, too, but this would be a different KIP adding this to all
existing aggregations.


-Matthias



On 4/9/19 4:38 PM, Guozhang Wang wrote:
> Hi Sophie,
> 
> Thanks for the proposed KIP. I've made a pass over it and here are some
> thoughts:
> 
> 1. "The window size is effectively the grace and retention period". The
> grace time is defined as "the time to admit late-arriving events after the
> end of the window." hence it is the additional time beyond the window size.
> I guess your were trying to say it should be zero?
> 
> Also for retention period, it is not a notion of the window spec any more,
> but only for the window store itself. So I'd suggest talking about window
> size here, and note that store retention time cannot be controlled via
> window spec at all.
> 
> 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a
> bucket, so I'd assume you will expire one bucket as a whole when its end
> time is smaller than the current window's starting time, right?
> 
> 3. Also in your algorithm how to choose "M" seems tricky, would it be a
> configurable parameter exposed to users or is it abstracted away and only
> being selected internally?
> 
> 4. "There is some tradeoff between purely optimizing " seems incomplete
> paragraph?
> 
> 5. Meta comment: for many aggregations it is commutative and associative so
> we can require users to pass in a "substract" function as well. Given these
> two function I think we can propose two set of APIs, 1) with the adder and
> subtractor and 2) with the added only (if the aggregate logic is not comm.
> and assoc.).
> 
> We just maintain an aggregate value for each bucket (call it
> bucket_aggregate) plus for the whole window (call it total_aggregate), i.e.
> at most M + 1 values per key. We use the total_aggregate for queries, and
> each update will cause 2 writes (to the bucket and to the total aggregate).
> 
> And with 1) when expiring the oldest bucket we simply call
> subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
> oldest bucket we can re-compute the total_aggregate by
> sum(bucket_aggregate) over other buckets again.
> 
> 6. Meta comment: it is reasonable to assume in practice out-of-ordering
> data is not very common, hence most of the updates will be falling into the
> latest bucket. So I'm wondering if it makes sense to always store the first
> bucket in memory while making other buckets optionally on persistent
> storage. In practice, as long as M is large enough (we probably need it to
> be large enough to have sufficiently sens

Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-04-09 Thread Guozhang Wang
Hi Sophie,

Thanks for the proposed KIP. I've made a pass over it and here are some
thoughts:

1. "The window size is effectively the grace and retention period". The
grace time is defined as "the time to admit late-arriving events after the
end of the window." hence it is the additional time beyond the window size.
I guess your were trying to say it should be zero?

Also for retention period, it is not a notion of the window spec any more,
but only for the window store itself. So I'd suggest talking about window
size here, and note that store retention time cannot be controlled via
window spec at all.

2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a
bucket, so I'd assume you will expire one bucket as a whole when its end
time is smaller than the current window's starting time, right?

3. Also in your algorithm how to choose "M" seems tricky, would it be a
configurable parameter exposed to users or is it abstracted away and only
being selected internally?

4. "There is some tradeoff between purely optimizing " seems incomplete
paragraph?

5. Meta comment: for many aggregations it is commutative and associative so
we can require users to pass in a "substract" function as well. Given these
two function I think we can propose two set of APIs, 1) with the adder and
subtractor and 2) with the added only (if the aggregate logic is not comm.
and assoc.).

We just maintain an aggregate value for each bucket (call it
bucket_aggregate) plus for the whole window (call it total_aggregate), i.e.
at most M + 1 values per key. We use the total_aggregate for queries, and
each update will cause 2 writes (to the bucket and to the total aggregate).

And with 1) when expiring the oldest bucket we simply call
subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
oldest bucket we can re-compute the total_aggregate by
sum(bucket_aggregate) over other buckets again.

6. Meta comment: it is reasonable to assume in practice out-of-ordering
data is not very common, hence most of the updates will be falling into the
latest bucket. So I'm wondering if it makes sense to always store the first
bucket in memory while making other buckets optionally on persistent
storage. In practice, as long as M is large enough (we probably need it to
be large enough to have sufficiently sensitive expiration anyways) then
each bucket's aggregate data is small enough to be in memory.



Guozhang



On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman 
wrote:

> Hello all,
>
> I would like to kick off discussion of this KIP aimed at providing sliding
> window semantics to DSL aggregations.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
>
> Please take a look and share any thoughts you have regarding the API,
> semantics, design, etc!
>
> I also have a POC PR open with the "naive" implementation for your
> reference: https://github.com/apache/kafka/pull/6549
>
> Cheers,
> Sophie
>


-- 
-- Guozhang


[DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-04-05 Thread Sophie Blee-Goldman
Hello all,

I would like to kick off discussion of this KIP aimed at providing sliding
window semantics to DSL aggregations.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL

Please take a look and share any thoughts you have regarding the API,
semantics, design, etc!

I also have a POC PR open with the "naive" implementation for your
reference: https://github.com/apache/kafka/pull/6549

Cheers,
Sophie