Hi Sushrut,

I have to confess I don’t think I fully understand your last message, but I 
will try to help.

It sounds like maybe you’re thinking that streams would just repeatedly emit 
everything every commit? That is certainly not the case. If there are only 10 
events in window 1 and 10 in window 2, you would see at most 20 output events, 
regardless of any caching or suppression. That is, if you disable all caches, 
you get one output record ( an updated aggregation result) for each input 
record. Enabling caches only serves to reduce the number. 

I hope this helps,
John


On Sat, Jan 18, 2020, at 08:36, Sushrut Shivaswamy wrote:
> Hey John,
> 
> I tried following the docs here about the configs:
> `streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> 10 * 1024 * 1024L);
> // Set commit interval to 1 second.
> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);`
> https://kafka.apache.org/10/documentation/streams/developer-guide/memory-mgmt
> 
> I'm trying to group events by id by accumulating them in a list and then
> spilt the aggregated list
> into smaller chunks for processing.
> I have a doubt about when windows expire and how aggregated values are
> flushed out.
> Lets assume in window 1(W1) 10 records arrived and in window 2(W2) 10 more
> records arrived for the same key.
> Assuming the cache can hold only 10 records in memory.
> Based on my understanding:
> At T1: 10 records from W1 are flushed
> At T2: 20 records from W1 + W2 are flushed.
> The records from W1 will be duplicated at the next commit time till that
> window expires.
> Is this accurate?
> If it is, can you share any way I can avoid/limit the number of times
> duplicate data is flushed?
> 
> Thanks,
> Sushrut
> 
> 
> 
> 
> 
> 
> On Sat, Jan 18, 2020 at 12:00 PM Sushrut Shivaswamy <
> sushrut.shivasw...@gmail.com> wrote:
> 
> > Thanks John,
> > I'll try increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> > and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> >
> > Thanks,
> > Sushrut
> >
> > On Sat, Jan 18, 2020 at 11:31 AM John Roesler <vvcep...@apache.org> wrote:
> >
> >> Ah, I should add, if you actually want to use suppression, or
> >> you need to resolve a similar error message in the future, you
> >> probably need to tweak the batch sizes and/or timeout configs
> >> of the various clients, and maybe the server as well.
> >>
> >> That error message kind of sounds like the server went silent
> >> long enough that the http session expired, or maybe it suffered
> >> a long pause of some kind (GC, de-scheduling, etc.) that caused
> >> the OS to hang up the socket.
> >>
> >> I'm not super familiar with diagnosing these issues; I'm just
> >> trying to point you in the right direction in case you wanted
> >> to directly solve the given error instead of trying something
> >> different.
> >>
> >> Thanks,
> >> -John
> >>
> >> On Fri, Jan 17, 2020, at 23:33, John Roesler wrote:
> >> > Hi Sushrut,
> >> >
> >> > That's frustrating... I haven't seen that before, but looking at the
> >> error
> >> > in combination with what you say happens without suppress makes
> >> > me think there's a large volume of data involved here. Probably,
> >> > the problem isn't specific to suppression, but it's just that the
> >> > interactions on the suppression buffers are pushing the system over
> >> > the edge.
> >> >
> >> > Counterintuitively, adding Suppression can actually increase your
> >> > broker traffic because the Suppression buffer has to provide resiliency
> >> > guarantees, so it needs its own changelog, even though the aggregation
> >> > immediately before it _also_ has a changelog.
> >> >
> >> > Judging from your description, you were just trying to batch more,
> >> rather
> >> > than specifically trying to get "final results" semantics for the window
> >> > results. In that case, you might want to try removing the suppression
> >> > and instead increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> >> > and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> >> >
> >> > Hope this helps,
> >> > -John
> >> >
> >> > On Fri, Jan 17, 2020, at 22:02, Sushrut Shivaswamy wrote:
> >> > > Hey,
> >> > >
> >> > > I'm building a streams application where I'm trying to aggregate a
> >> stream
> >> > > of events
> >> > > and getting a list of events per key.
> >> > > `eventStream
> >> > > .groupByKey(Grouped.with(Serdes.String(), eventSerde))
> >> > >
> >> .windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1)))
> >> > > .aggregate(
> >> > >     ArrayList::new, (eent, accum) -> {
> >> > >         accum.add(event);
> >> > >         return accum;
> >> > > })
> >> > > .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> >> > > .toStream()
> >> > > .map((windowedKey, value) -> new KeyValue<String,
> >> > > List<Event>>(windowedKey.key(), value))
> >> > > .map(eventProcessor::processEventsWindow)
> >> > > .to("event-window-chunks-queue", Produced.with(Serdes.String(),
> >> > > eventListSerde))`
> >> > >
> >> > > As you can see I'm grouping events by key and capturing windowed
> >> lists of
> >> > > events for further processing.
> >> > > To be able to process the list of events per key in chunks I added
> >> > > `suppress()`.
> >> > > This does not seem to work though.
> >> > > I get this error multiple times:
> >> > > `Got error produce response with correlation id 5 on topic-partition
> >> > > app-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0,
> >> retrying
> >> > > (2147483646 attempts left). Error: NETWORK_EXCEPTION
> >> > > WARN org.apache.kafka.clients.producer.internals.Sender - Received
> >> invalid
> >> > > metadata error in produce request on partition
> >> > > shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0
> >> due to
> >> > > org.apache.kafka.common.errors.NetworkException: The server
> >> disconnected
> >> > > before a response was received.. Going to request metadata update now`
> >> > >
> >> > > When I comment out the suppress() line it works fine but I get a large
> >> > > number of events in a list while processing chunks since it does not
> >> > > suppress already evaluated chunks.
> >> > > Can anyone help me out with what could be happening here?
> >> > >
> >> > > Regards,
> >> > > Sushrut
> >> > >
> >> >
> >>
> >
>

Reply via email to