Hi Sushrut,

That's frustrating... I haven't seen that before, but looking at the error
in combination with what you say happens without suppress makes
me think there's a large volume of data involved here. Probably,
the problem isn't specific to suppression, but it's just that the
interactions on the suppression buffers are pushing the system over
the edge.

Counterintuitively, adding Suppression can actually increase your
broker traffic because the Suppression buffer has to provide resiliency
guarantees, so it needs its own changelog, even though the aggregation
immediately before it _also_ has a changelog.

Judging from your description, you were just trying to batch more, rather
than specifically trying to get "final results" semantics for the window
results. In that case, you might want to try removing the suppression
and instead increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
and "COMMIT_INTERVAL_MS_CONFIG" configurations.

Hope this helps,
-John

On Fri, Jan 17, 2020, at 22:02, Sushrut Shivaswamy wrote:
> Hey,
> 
> I'm building a streams application where I'm trying to aggregate a stream
> of events
> and getting a list of events per key.
> `eventStream
> .groupByKey(Grouped.with(Serdes.String(), eventSerde))
> .windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1)))
> .aggregate(
>     ArrayList::new, (eent, accum) -> {
>         accum.add(event);
>         return accum;
> })
> .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> .toStream()
> .map((windowedKey, value) -> new KeyValue<String,
> List<Event>>(windowedKey.key(), value))
> .map(eventProcessor::processEventsWindow)
> .to("event-window-chunks-queue", Produced.with(Serdes.String(),
> eventListSerde))`
> 
> As you can see I'm grouping events by key and capturing windowed lists of
> events for further processing.
> To be able to process the list of events per key in chunks I added
> `suppress()`.
> This does not seem to work though.
> I get this error multiple times:
> `Got error produce response with correlation id 5 on topic-partition
> app-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0, retrying
> (2147483646 attempts left). Error: NETWORK_EXCEPTION
> WARN org.apache.kafka.clients.producer.internals.Sender - Received invalid
> metadata error in produce request on partition
> shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0 due to
> org.apache.kafka.common.errors.NetworkException: The server disconnected
> before a response was received.. Going to request metadata update now`
> 
> When I comment out the suppress() line it works fine but I get a large
> number of events in a list while processing chunks since it does not
> suppress already evaluated chunks.
> Can anyone help me out with what could be happening here?
> 
> Regards,
> Sushrut
>

Reply via email to