I'm using Kafka Streams in a deduplication events problem over short time windows (<= 1 minute). First I've tried to tackle the problem by using DSL API with [`.suppress(Suppressed.untilWindowCloses(...))`]( https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html) operator but, given the fact that wall-clock time is not yet supported (I've seen the [KIP 424]( https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time)), this operator is not viable for my use case.
Then, I've followed this [official Confluent example]( https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html) in which low level Processor API is used and it was working fine but has one major limitation for my use-case. The single event (obtained by deduplication) is emitted at the **beginning** of the time window, subsequent duplicated events are "suppressed". In my use case I need the reverse of that, meaning that a single event should be emitted at the end of the window. I'm asking for suggestions on how to implement this use case with Processor API. My idea was to use the Processor API with a custom [Transformer]( https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html) and a [Punctuator]( https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/streams/processor/Punctuator.html ). The transformer would store in a [WindowStore]( https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/state/WindowStore.html) the distinct keys received without returning any KeyValue. Simultaneously, I'd schedule a punctuator running with an interval equal to the size of the window in the WindowStore. This punctuator will iterate over the elements in the store and forward them downstream. The following are some core parts of the logic: DeduplicationTransformer (slightly modified from [official Confluent example]( https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html) ): ```java @Override @SuppressWarnings("unchecked") public void init(final ProcessorContext context) { this.context = context; eventIdStore = (WindowStore<E, V>) context.getStateStore(this.storeName); // Schedule punctuator for this transformer. context.schedule(Duration.ofMillis(this.windowSizeMs), PunctuationType.WALL_CLOCK_TIME, new DeduplicationPunctuator<E, V>(eventIdStore, context, this.windowSizeMs)); } @Override public KeyValue<K, V> transform(final K key, final V value) { final E eventId = idExtractor.apply(key, value); if (eventId == null) { return KeyValue.pair(key, value); } else { if (!isDuplicate(eventId)) { rememberNewEvent(eventId, value, context.timestamp()); } return null; } } ``` DeduplicationPunctuator: ```java public DeduplicationPunctuator(WindowStore<E, V> eventIdStore, ProcessorContext context, long retainPeriodMs) { this.eventIdStore = eventIdStore; this.context = context; this.retainPeriodMs = retainPeriodMs; } @Override public void punctuate(long invocationTime) { LOGGER.info("Punctuator invoked at {}, searching from {}", new Date(invocationTime), new Date(invocationTime-retainPeriodMs)); KeyValueIterator<Windowed<E>, V> it = eventIdStore.fetchAll(invocationTime - retainPeriodMs, invocationTime + retainPeriodMs); while (it.hasNext()) { KeyValue<Windowed<E>, V> next = it.next(); LOGGER.info("Punctuator running on {}", next.key.key()); context.forward(next.key.key(), next.value); // Delete from store with tombstone eventIdStore.put(next.key.key(), null, invocationTime); context.commit(); } it.close(); } ``` Is this a valid approach? With the previous code, I'm running some integration tests and I've some synchronization issues. How can I be sure that the start of the window will coincide with the Punctuator's scheduled interval? Also as an alternative approach, I was wondering (I've googled with no result), if there is any event triggered by window closing to which I can attach a callback in order to iterate over store and publish only distinct events. Thanks. My question is also here: https://stackoverflow.com/q/69725131/4837677