Hi,
As far as my understanding goes, aggregated result for a window is not
included in next window.
Window would stay in state store till it gets deleted based on certain
setting however aggregated result for that window will include only the
records that occur within the window duration.

If you have sliding window then there will be overlap in the records
between the two windows, so aggregated result would be based on some common
records.

Example your records in first window is (R1, R2, R3, R4) and in second it
is (R3, R4, R5, R6)
So final data stored would be [W1, Aggr( R1, R2, R3, R4 )] and [W2, Aggr(
R3, R4, R5, R6 )]

As John pointed out that emitted data to downstream may or may not happen
after each record is aggregated for that window. It depends on how frequent
you want to commit your data.
So aggregated data will be built on following way:
Aggr( R1 )
Aggr( R1, R2 )
Aggr( R1, R2, R3 )
Aggr( R1, R2, R3, R4 )

But not all of these aggregated result may be emitted downstream.

Hope this helps.

Thanks
Sachin



On Tue, Jan 21, 2020 at 10:25 AM Sushrut Shivaswamy <
sushrut.shivasw...@gmail.com> wrote:

> Thanks John.
> That partially answers my question.
> I'm a little confused about when a window will expire.
> As you said, I will receive at most 20 events at T2 but as time goes on
> will the data from the first window always be included in the aggregated
> result?
>
> On Mon, Jan 20, 2020 at 7:55 AM John Roesler <vvcep...@apache.org> wrote:
>
> > Hi Sushrut,
> >
> > I have to confess I don’t think I fully understand your last message, but
> > I will try to help.
> >
> > It sounds like maybe you’re thinking that streams would just repeatedly
> > emit everything every commit? That is certainly not the case. If there
> are
> > only 10 events in window 1 and 10 in window 2, you would see at most 20
> > output events, regardless of any caching or suppression. That is, if you
> > disable all caches, you get one output record ( an updated aggregation
> > result) for each input record. Enabling caches only serves to reduce the
> > number.
> >
> > I hope this helps,
> > John
> >
> >
> > On Sat, Jan 18, 2020, at 08:36, Sushrut Shivaswamy wrote:
> > > Hey John,
> > >
> > > I tried following the docs here about the configs:
> > >
> `streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> > > 10 * 1024 * 1024L);
> > > // Set commit interval to 1 second.
> > > streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> 1000);`
> > >
> >
> https://kafka.apache.org/10/documentation/streams/developer-guide/memory-mgmt
> > >
> > > I'm trying to group events by id by accumulating them in a list and
> then
> > > spilt the aggregated list
> > > into smaller chunks for processing.
> > > I have a doubt about when windows expire and how aggregated values are
> > > flushed out.
> > > Lets assume in window 1(W1) 10 records arrived and in window 2(W2) 10
> > more
> > > records arrived for the same key.
> > > Assuming the cache can hold only 10 records in memory.
> > > Based on my understanding:
> > > At T1: 10 records from W1 are flushed
> > > At T2: 20 records from W1 + W2 are flushed.
> > > The records from W1 will be duplicated at the next commit time till
> that
> > > window expires.
> > > Is this accurate?
> > > If it is, can you share any way I can avoid/limit the number of times
> > > duplicate data is flushed?
> > >
> > > Thanks,
> > > Sushrut
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Sat, Jan 18, 2020 at 12:00 PM Sushrut Shivaswamy <
> > > sushrut.shivasw...@gmail.com> wrote:
> > >
> > > > Thanks John,
> > > > I'll try increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> > > > and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> > > >
> > > > Thanks,
> > > > Sushrut
> > > >
> > > > On Sat, Jan 18, 2020 at 11:31 AM John Roesler <vvcep...@apache.org>
> > wrote:
> > > >
> > > >> Ah, I should add, if you actually want to use suppression, or
> > > >> you need to resolve a similar error message in the future, you
> > > >> probably need to tweak the batch sizes and/or timeout configs
> > > >> of the various clients, and maybe the server as well.
> > > >>
> > > >> That error message kind of sounds like the server went silent
> > > >> long enough that the http session expired, or maybe it suffered
> > > >> a long pause of some kind (GC, de-scheduling, etc.) that caused
> > > >> the OS to hang up the socket.
> > > >>
> > > >> I'm not super familiar with diagnosing these issues; I'm just
> > > >> trying to point you in the right direction in case you wanted
> > > >> to directly solve the given error instead of trying something
> > > >> different.
> > > >>
> > > >> Thanks,
> > > >> -John
> > > >>
> > > >> On Fri, Jan 17, 2020, at 23:33, John Roesler wrote:
> > > >> > Hi Sushrut,
> > > >> >
> > > >> > That's frustrating... I haven't seen that before, but looking at
> the
> > > >> error
> > > >> > in combination with what you say happens without suppress makes
> > > >> > me think there's a large volume of data involved here. Probably,
> > > >> > the problem isn't specific to suppression, but it's just that the
> > > >> > interactions on the suppression buffers are pushing the system
> over
> > > >> > the edge.
> > > >> >
> > > >> > Counterintuitively, adding Suppression can actually increase your
> > > >> > broker traffic because the Suppression buffer has to provide
> > resiliency
> > > >> > guarantees, so it needs its own changelog, even though the
> > aggregation
> > > >> > immediately before it _also_ has a changelog.
> > > >> >
> > > >> > Judging from your description, you were just trying to batch more,
> > > >> rather
> > > >> > than specifically trying to get "final results" semantics for the
> > window
> > > >> > results. In that case, you might want to try removing the
> > suppression
> > > >> > and instead increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> > > >> > and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> > > >> >
> > > >> > Hope this helps,
> > > >> > -John
> > > >> >
> > > >> > On Fri, Jan 17, 2020, at 22:02, Sushrut Shivaswamy wrote:
> > > >> > > Hey,
> > > >> > >
> > > >> > > I'm building a streams application where I'm trying to
> aggregate a
> > > >> stream
> > > >> > > of events
> > > >> > > and getting a list of events per key.
> > > >> > > `eventStream
> > > >> > > .groupByKey(Grouped.with(Serdes.String(), eventSerde))
> > > >> > >
> > > >>
> >
> .windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1)))
> > > >> > > .aggregate(
> > > >> > >     ArrayList::new, (eent, accum) -> {
> > > >> > >         accum.add(event);
> > > >> > >         return accum;
> > > >> > > })
> > > >> > >
> .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> > > >> > > .toStream()
> > > >> > > .map((windowedKey, value) -> new KeyValue<String,
> > > >> > > List<Event>>(windowedKey.key(), value))
> > > >> > > .map(eventProcessor::processEventsWindow)
> > > >> > > .to("event-window-chunks-queue", Produced.with(Serdes.String(),
> > > >> > > eventListSerde))`
> > > >> > >
> > > >> > > As you can see I'm grouping events by key and capturing windowed
> > > >> lists of
> > > >> > > events for further processing.
> > > >> > > To be able to process the list of events per key in chunks I
> added
> > > >> > > `suppress()`.
> > > >> > > This does not seem to work though.
> > > >> > > I get this error multiple times:
> > > >> > > `Got error produce response with correlation id 5 on
> > topic-partition
> > > >> > > app-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0,
> > > >> retrying
> > > >> > > (2147483646 attempts left). Error: NETWORK_EXCEPTION
> > > >> > > WARN org.apache.kafka.clients.producer.internals.Sender -
> Received
> > > >> invalid
> > > >> > > metadata error in produce request on partition
> > > >> > >
> shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0
> > > >> due to
> > > >> > > org.apache.kafka.common.errors.NetworkException: The server
> > > >> disconnected
> > > >> > > before a response was received.. Going to request metadata
> update
> > now`
> > > >> > >
> > > >> > > When I comment out the suppress() line it works fine but I get a
> > large
> > > >> > > number of events in a list while processing chunks since it does
> > not
> > > >> > > suppress already evaluated chunks.
> > > >> > > Can anyone help me out with what could be happening here?
> > > >> > >
> > > >> > > Regards,
> > > >> > > Sushrut
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to