Hello Matthias, thanks for your reply. > Using a plain kv-store, whenever the punctuation runs you can find closed windows, forward the result and also delete the row explicitly, which give you more control.
What is the best way to find closed windows? Have you got any examples? Thanks! :) On 2021/11/02 23:34:33 "Matthias J. Sax" wrote: > I did not study your code snippet, but yes, it sounds like a valid > approach from your description. > > > How can I be sure that the start of the window will > > coincide with the Punctuator's scheduled interval? > > For punctuations, there is always some jitter, because it's not possible > to run a punctuation at the very exact point in time when it is > scheduled to run. Thus, a punctuation might fire a little delayed > anyway. You can also not control the "anchor point" directly, because it > depends on the point in time when you register the punctuation. > > Also note, that a WindowedStore is basically still a key-value store, ie > a single key-value pair models one window. The main difference is the > timestamp that is use to expired rows eventually, what just implies that > expired rows are dropped (without any notification). > > Thus, the only thing you can do is, to run the punctuation frequently > enough to keep latency low enough to detect windows that are logically > closed to forward the corresponding result. > > But you cannot really "bind" the punctuation with the state store > expiration, and window-store also does not support deletes... Thus, I am > wondering if using a plain key-value store might be more useful for your > case? Using a plain kv-store, whenever the punctuation runs you can find > closed windows, forward the result and also delete the row explicitly, > which give you more control. > > Hope this helps. > > -Matthias > > On 11/2/21 10:29 AM, Luigi Cerone wrote: > > I'm using Kafka Streams in a deduplication events problem over short time > > windows (<= 1 minute). > > First I've tried to tackle the problem by using DSL API with > > [`.suppress(Suppressed.untilWindowCloses(...))`]( > > https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html ) > > operator but, given the fact that wall-clock time is not yet supported > > (I've seen the [KIP 424]( > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time) ), > > this operator is not viable for my use case. > > > > Then, I've followed this [official Confluent example]( > > https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html) > > in which low level Processor API is used and it was working fine but has > > one major limitation for my use-case. The single event (obtained by > > deduplication) is emitted at the **beginning** of the time window, > > subsequent duplicated events are "suppressed". In my use case I need the > > reverse of that, meaning that a single event should be emitted at the end > > of the window. > > I'm asking for suggestions on how to implement this use case with Processor > > API. > > > > My idea was to use the Processor API with a custom [Transformer]( > > https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html ) > > and a [Punctuator]( > > https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/streams/processor/Punctuator.html > > ). > > The transformer would store in a [WindowStore]( > > https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/state/WindowStore.html ) > > the distinct keys received without returning any KeyValue. Simultaneously, > > I'd schedule a punctuator running with an interval equal to the size of the > > window in the WindowStore. This punctuator will iterate over the elements > > in the store and forward them downstream. > > The following are some core parts of the logic: > > > > DeduplicationTransformer (slightly modified from [official Confluent > > example]( > > https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html) > > ): > > ```java > > @Override > > @SuppressWarnings("unchecked") > > public void init(final ProcessorContext context) { > > this.context = context; > > eventIdStore = (WindowStore<E, V>) > > context.getStateStore(this.storeName); > > > > // Schedule punctuator for this transformer. > > context.schedule(Duration.ofMillis(this.windowSizeMs), > > PunctuationType.WALL_CLOCK_TIME, > > new DeduplicationPunctuator<E, V>(eventIdStore, context, > > this.windowSizeMs)); > > } > > > > @Override > > public KeyValue<K, V> transform(final K key, final V value) { > > final E eventId = idExtractor.apply(key, value); > > if (eventId == null) { > > return KeyValue.pair(key, value); > > } else { > > if (!isDuplicate(eventId)) { > > rememberNewEvent(eventId, value, context.timestamp()); > > } > > return null; > > } > > } > > ``` > > > > DeduplicationPunctuator: > > ```java > > public DeduplicationPunctuator(WindowStore<E, V> eventIdStore, > > ProcessorContext context, > > long retainPeriodMs) { > > this.eventIdStore = eventIdStore; > > this.context = context; > > this.retainPeriodMs = retainPeriodMs; > > } > > > > @Override > > public void punctuate(long invocationTime) { > > LOGGER.info("Punctuator invoked at {}, searching from {}", new > > Date(invocationTime), new Date(invocationTime-retainPeriodMs)); > > > > KeyValueIterator<Windowed<E>, V> it = > > eventIdStore.fetchAll(invocationTime - retainPeriodMs, > > invocationTime + retainPeriodMs); > > > > while (it.hasNext()) { > > KeyValue<Windowed<E>, V> next = it.next(); > > > > LOGGER.info("Punctuator running on {}", next.key.key()); > > > > context.forward(next.key.key(), next.value); > > // Delete from store with tombstone > > eventIdStore.put(next.key.key(), null, invocationTime); > > context.commit(); > > } > > > > it.close(); > > } > > ``` > > > > Is this a valid approach? > > With the previous code, I'm running some integration tests and I've some > > synchronization issues. How can I be sure that the start of the window will > > coincide with the Punctuator's scheduled interval? > > > > > > Also as an alternative approach, I was wondering (I've googled with no > > result), if there is any event triggered by window closing to which I can > > attach a callback in order to iterate over store and publish only distinct > > events. > > > > Thanks. > > > > My question is also here: https://stackoverflow.com/q/69725131/4837677 > > >