I'm using Kafka Streams in a deduplication events problem over short time
windows (<= 1 minute).
First I've tried to tackle the problem by using DSL API with
[`.suppress(Suppressed.untilWindowCloses(...))`](
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html)
operator but, given the fact that wall-clock time is not yet supported
(I've seen the [KIP 424](
https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time)),
this operator is not viable for my use case.

Then, I've followed this [official Confluent example](
https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)
in which low level Processor API is used and it was working fine but has
one major limitation for my use-case. The single event (obtained by
deduplication) is emitted at the **beginning** of the time window,
subsequent duplicated events are "suppressed". In my use case I need the
reverse of that, meaning that a single event should be emitted at the end
of the window.
I'm asking for suggestions on how to implement this use case with Processor
API.

My idea was to use the Processor API with a custom [Transformer](
https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html)
and a [Punctuator](
https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/streams/processor/Punctuator.html
).
The transformer would store in a [WindowStore](
https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/state/WindowStore.html)
the distinct keys received without returning any KeyValue. Simultaneously,
I'd schedule a punctuator running with an interval equal to the size of the
window in the WindowStore. This punctuator will iterate over the elements
in the store and forward them downstream.
The following are some core parts of the logic:

DeduplicationTransformer (slightly modified from [official Confluent
example](
https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)
):
```java
    @Override
    @SuppressWarnings("unchecked")
    public void init(final ProcessorContext context) {
        this.context = context;
        eventIdStore = (WindowStore<E, V>)
context.getStateStore(this.storeName);

        // Schedule punctuator for this transformer.
        context.schedule(Duration.ofMillis(this.windowSizeMs),
PunctuationType.WALL_CLOCK_TIME,
            new DeduplicationPunctuator<E, V>(eventIdStore, context,
this.windowSizeMs));
    }

    @Override
    public KeyValue<K, V> transform(final K key, final V value) {
        final E eventId = idExtractor.apply(key, value);
        if (eventId == null) {
            return KeyValue.pair(key, value);
        } else {
            if (!isDuplicate(eventId)) {
                rememberNewEvent(eventId, value, context.timestamp());
            }
            return null;
        }
    }
```

DeduplicationPunctuator:
```java
    public DeduplicationPunctuator(WindowStore<E, V> eventIdStore,
ProcessorContext context,
        long retainPeriodMs) {
        this.eventIdStore = eventIdStore;
        this.context = context;
        this.retainPeriodMs = retainPeriodMs;
    }

    @Override
    public void punctuate(long invocationTime) {
        LOGGER.info("Punctuator invoked at {}, searching from {}", new
Date(invocationTime), new Date(invocationTime-retainPeriodMs));

        KeyValueIterator<Windowed<E>, V> it =
            eventIdStore.fetchAll(invocationTime - retainPeriodMs,
invocationTime + retainPeriodMs);

        while (it.hasNext()) {
            KeyValue<Windowed<E>, V> next = it.next();

            LOGGER.info("Punctuator running on {}", next.key.key());

            context.forward(next.key.key(), next.value);
            // Delete from store with tombstone
            eventIdStore.put(next.key.key(), null, invocationTime);
            context.commit();
        }

        it.close();
    }
```

Is this a valid approach?
With the previous code, I'm running some integration tests and I've some
synchronization issues. How can I be sure that the start of the window will
coincide with the Punctuator's scheduled interval?


Also as an alternative approach, I was wondering (I've googled with no
result), if there is any event triggered by window closing to which I can
attach a callback in order to iterate over store and publish only distinct
events.

Thanks.

My question is also here: https://stackoverflow.com/q/69725131/4837677

Reply via email to