Hi Guozhang,

Thanks for the review and the alternative idea.


A quick note on the metrics. I actually do think that it should be true
that "Skipped records are records that are for one
reason or another invalid." I recently added the change to record a
skipped-record when we get a record for a window that is beyond the
retention period. I think this was a mistake, since such records aren't
invalid. I'd like to draft a separate KIP to correct that with a new
metric. For clarity, the behavior in the currently released version
actually doesn't record any metric at all and instead silently drops events
for no-longer-retained windows.



It seems, even with the metric issue aside, the idea about directly
configuring the emit behavior of windowed aggregations might be a more
straightforward way to achieve "final results" for windowed computations.

Regarding the consistency of the aggregation state vs. the result of
suppress, the KIP is a separate operation that produces a new KTable. It
seems actually strange to me to expect the state *upstream* of suppress to
be consistent with the downstream *result* of suppress. This would be like
doing a filter after the aggregation and then expecting a query of the
aggregation to reflect the filter!

I think that if a user wishes to apply suppress to a KTable *and* offer a
queriable view of the suppressed result, there is a solution within the
existing KTable framework: to offer a variant of suppress taking
Materialized. In other words, since the suppress operator actually produces
a new KTable, we could allow users to make the resulting table queriable
(instead of the upstream one), in which case the queriable state can again
be consistent with the downstream. (I anticipate a question about the
implementation efficiency, but I think there's a good solution for that)


About your proposal: suppose I have that window of size 10 min, and `until`
20 min and use Emitted.onlyOnceAfterWindowClosed(late-period-allowed = 5
min). If I get an event at stream time t16 with record time t4, will the
Emitted config prevent both the state update and the emission, or only the
emission? It seems like you're proposing that it would actually not update
the state store or emit, so that IQ results would be consistent with the
downstream. I'm not sure if it's intuitive for a parameter that controls
the emission pattern to also control the computation itself.

Alternatively, we could move the "close" parameter to the window
definition, allowing Emitted to just be onlyOnceAfterWindowClosed() or
wheneverWindowUpdated().

It seems like Emitted might open the door to follow-on requests for other
types of fine-grained control over the emission pattern, and we would have
to make a case-by-case call on whether each one belongs with Emitted or
with Suppress, or both. I think this is actually an advantage to preferring
one operation: it's a simpler structure for both users and developers to
reason about shaping the emission pattern.


All in all, it remains simpler to me to have just one operation responsible
for "suppressing" updates. But obviously can't speak for everyone. I would
very much like to hear more feedback about this.

What do you think about these ideas?

Thanks again,
-John




On Wed, Jun 27, 2018 at 5:10 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello John, thanks for putting up the KIP. I have a meta comment:
>
> We need to clarify the difference between late event suppression semantics
> and the window retention semantics that result into a windowed KTable. More
> specifically, say you have a window of size 10 min, and `until` 20 min, and
> the resulted window KTable is suppressed for events later than 5 min. My
> understanding is that:
>
> a) for a specific window starting at t0 and ending at t10, if an upstream
> record is received with timestamp falling into [t0, t10) but the current
> stream time has been larger than t20, this record will be dropped on the
> floor and not be used to update the window KTable. Also we record it in the
> "skipped-record" metric.
> b) as above, if an upstream record is received with timestamp falling into
> [t0, t10), and the current stream time is larger than t15 but smaller than
> t20, this record will still be used to update the windowed table, but the
> updated result will not be sent downstream, and it is recorded in the
> "late-event-suppression" metric.
> c) as a result, if we query the windowed KTable's result, we will see
> updates up to stream time t20, but from its resulted changelog stream, we
> will only see results up to stream time t15.
>
> If that is correct, I felt it is really complicated for users to
> comprehend: why should I want to have my windowed aggregations to take late
> records up to 20 minutes, while not seeing their resulted updates in the
> changelog stream? And the semantic difference between "skipped record due
> to window retention" and "late-event-suppression" is quite obscure (btw I
> am not sure it is true that "Skipped records are records that are for one
> reason or another invalid.", since "skipped record due to window retention
> time" is not really due to an invalid record, but some window store
> implementation details, right?)
>
> Thinking about this further, although I understand the intention to propose
> an unified API for all three motivation requests, I feel the "Final value
> of a window" request may better be handled in a more restricted interface.
>
>
> So just throwing out a bold / controversial idea to this proposal: instead
> of using a unified suppress() for all three motivation scenarios, we have:
>
> 1) KTable#suppress() for "Request shaping" and "Easier config", and it will
> only for intermediate-event-suppression, and in this case, for both
> windowed and non-windowed KTable, the suppression semantics can be
> dependent on each key's record timestamp plus the byte buffer size limit /
> buffer strategy.
>
> 2) In TimeWindowedStream / SessionWindowedStream#aggregate() that result in
> a windowed KTable (note that although KGroupedTable#aggregate can also
> result in a windowed KTable, its window semantics is not very well defined
> I'd suggest we defer its discussion later), we add another config object,
> e.g.:
>
> TimeWindowedStream#aggregate(final Initializer<VR> initializer,
>                                                        final Aggregator<?
> super K, ? super V, VR> aggregator,
>                                                        final
> Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized,
>                                                        final Emitted
> emitted);
>
> public class Emitted {
>
>     static Emitted onlyOnceAfterWindowClosed(final long
> late-period-allowed);
>
>     static Emitted wheneverWindowUpdated();  // this may still be subject
> to caching effects, so not exactly every update..
>
> }
>
> ------------
>
> The Emitted config option is going to be much less expressive than
> `Suppressed`, intentionally, to only cover the "Final value of a window"
> case. Note that the resulted window KTable can still be suppressed
> programmatically, but if it is already been emitted only once, then the
> suppress function will take no effect.
>
> In this case, the difference of "late-period-allowed" v.s.
> "Windows.until()" is that, the former determines if or not a record will be
> applied to update the window or not, and it is controlled in the
> WindowedStreamAggregateProcessor, and whenever an event gets dropped
> because of it we record it in a new, say "too-late-records" metric (same to
> "late-event-suppression" actually, just using a different name, while the
> latter only controls how long at least each window will be retained for
> queries and should normally be larger than (window size + late
> period-allowed). From implementation's pov, if the retention time of a
> window is less than (window size + late-period-allowed), the Processor may
> not be able to find any matching window when first trying to get it from
> store, and it then need to tell if it is because the key is never been
> updated for this window or because the window retention has elapsed, hence
> it needs to be aware of the window retention time. And in the latter case,
> it will drop it on the floor and also record it in "too-late-records"
> metrics. And also this emit policy would not need any buffering, since the
> original store's cache contains the record context already need for
> flushing downstream.
>
> My primary motivation is that, from user's perspective, this may be easier
> to comprehensive and reason from the metrics. But if people think it
> actually does not make things better, I'm happy to rethink the current
> proposal.
>
>
>
>
> Guozhang
>
>
> On Wed, Jun 27, 2018 at 12:04 PM, John Roesler <j...@confluent.io> wrote:
>
> > Thanks for the feedback, Matthias,
> >
> > It seems like in straightforward relational processing cases, it would
> not
> > make sense to bound the lateness of KTables. In general, it seems better
> to
> > have "guard rails" in place that make it easier to write sensible
> programs
> > than insensible ones.
> >
> > But I'm still going to argue in favor of keeping it for all KTables ;)
> >
> > 1. I believe it is simpler to understand the operator if it has one
> uniform
> > definition, regardless of context. It's well defined and intuitive what
> > will happen when you use late-event suppression on a KTable, so I think
> > nothing surprising or dangerous will happen in that case. From my
> > perspective, having two sets of allowed operations is actually an
> increase
> > in cognitive complexity.
> >
> > 2. To me, it's not crazy to use the operator this way. For example, in
> lieu
> > of full-featured timestamp semantics, I can implement MVCC behavior when
> > building a KTable by "suppressLateEvents(Duration.ZERO)". I suspect that
> > there are other, non-obvious applications of suppressing late events on
> > KTables.
> >
> > 3. Not to get too much into implementation details in a KIP discussion,
> but
> > if we did want to make late-event suppression available only on windowed
> > KTables, we have two enforcement options:
> >   a. check when we build the topology - this would be simple to
> implement,
> > but would be a runtime check. Hopefully, people write tests for their
> > topology before deploying them, so the feedback loop isn't instantaneous,
> > but it's not too long either.
> >   b. add a new WindowedKTable type - this would be a compile time check,
> > but would also be substantial increase of both interface and code
> > complexity.
> >
> > We should definitely strive to have guard rails protecting against
> > surprising or dangerous behavior. Protecting against programs that we
> don't
> > currently predict is a lesser benefit, and I think we can put up guard
> > rails on a case-by-case basis for that. It seems like the increase in
> > cognitive (and potentially code and interface) complexity makes me think
> we
> > should skip this case.
> >
> > What do you think?
> >
> > Thanks,
> > -John
> >
> > On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Thanks for the KIP John.
> > >
> > > One initial comments about the last example "Bounded lateness": For a
> > > non-windowed KTable bounding the lateness does not really make sense,
> > > does it?
> > >
> > > Thus, I am wondering if we should allow `suppressLateEvents()` for this
> > > case? It seems to be better to only allow it for windowed-KTables.
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 6/27/18 8:53 AM, Ted Yu wrote:
> > > > I noticed this (lack of primary parameter) as well.
> > > >
> > > > What you gave as new example is semantically the same as what I
> > > suggested.
> > > > So it is good by me.
> > > >
> > > > Thanks
> > > >
> > > > On Wed, Jun 27, 2018 at 7:31 AM, John Roesler <j...@confluent.io>
> > wrote:
> > > >
> > > >> Thanks for taking look, Ted,
> > > >>
> > > >> I agree this is a departure from the conventions of Streams DSL.
> > > >>
> > > >> Most of our config objects have one or two "required" parameters,
> > which
> > > fit
> > > >> naturally with the static factory method approach. TimeWindow, for
> > > example,
> > > >> requires a size parameter, so we can naturally say
> > TimeWindows.of(size).
> > > >>
> > > >> I think in the case of a suppression, there's really no "core"
> > > parameter,
> > > >> and "Suppression.of()" seems sillier than "new Suppression()". I
> think
> > > that
> > > >> Suppression.of(duration) would be ambiguous, since there are many
> > > durations
> > > >> that we can configure.
> > > >>
> > > >> However, thinking about it again, I suppose that I can give each
> > > >> configuration method a static version, which would let you replace
> > "new
> > > >> Suppression()." with "Suppression." in all the examples. Basically,
> > > instead
> > > >> of "of()", we'd support any of the methods I listed.
> > > >>
> > > >> For example:
> > > >>
> > > >> windowCounts
> > > >>     .suppress(
> > > >>         Suppression
> > > >>             .suppressLateEvents(Duration.ofMinutes(10))
> > > >>             .suppressIntermediateEvents(
> > > >>
> > >  IntermediateSuppression.emitAfter(Duration.ofMinutes(10))
> > > >>             )
> > > >>     );
> > > >>
> > > >>
> > > >> Does that seem better?
> > > >>
> > > >> Thanks,
> > > >> -John
> > > >>
> > > >>
> > > >> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu <yuzhih...@gmail.com>
> wrote:
> > > >>
> > > >>> I started to read this KIP which contains a lot of materials.
> > > >>>
> > > >>> One suggestion:
> > > >>>
> > > >>>     .suppress(
> > > >>>         new Suppression()
> > > >>>
> > > >>>
> > > >>> Do you think it would be more consistent with the rest of Streams
> > data
> > > >>> structures by supporting `of` ?
> > > >>>
> > > >>> Suppression.of(Duration.ofMinutes(10))
> > > >>>
> > > >>>
> > > >>> Cheers
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler <j...@confluent.io>
> > > wrote:
> > > >>>
> > > >>>> Hello devs and users,
> > > >>>>
> > > >>>> Please take some time to consider this proposal for Kafka Streams:
> > > >>>>
> > > >>>> KIP-328: Ability to suppress updates for KTables
> > > >>>>
> > > >>>> link: https://cwiki.apache.org/confluence/x/sQU0BQ
> > > >>>>
> > > >>>> The basic idea is to provide:
> > > >>>> * more usable control over update rate (vs the current state store
> > > >>> caches)
> > > >>>> * the final-result-for-windowed-computations feature which several
> > > >> people
> > > >>>> have requested
> > > >>>>
> > > >>>> I look forward to your feedback!
> > > >>>>
> > > >>>> Thanks,
> > > >>>> -John
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to