First, I want to mention that you do no see "duplicate" -- you see late updates. Kafka Streams embraces "change" and there is no such thing as a final aggregate, but each agg output record is an update/refinement of the result.
Strict filtering of "late updates" is hard in Kafka Streams If you want to have such filtering, you would need to use aggregate(...).toStream().transform() with an attached state for transform() to implement this filter manually. The state holds all emitted record per key. If a records arrives, you check if its in the state of not. If not, you add it to the state and emit it. If yes, you just drop the record. However, this will still not be perfect, because each time a commit is triggered, the current window is flushed even if "stream time" did not pass "window end" timestamp -- thus, the window is not completed yet. Thus, you would also need to consider current "stream time" that you can indirectly access via .punctuate(). Thus, for incomplete windows you might want to filter those "intermediate results" and not add to the store. This is hard to get right (I am even not sure if it is possible at all to get right). Even if this works however, this will only give you no duplicates (in the strong sense of duplicate) as long as no error occurs. Kafka Streams does not (yet) support exactly once processing and thus, in case of a failure, you might get duplicate outputs. I am not sure what kind of alerting you are doing, but you should remember if you did raise an alert in some other way, and if an late update (or real duplicate) occurs don't alert a second time. Hope this helps. -Matthias On 2/24/17 2:16 PM, Jozef.koval wrote: > Hi Kohki, > > Kafka streams windows use so called "segments" internally and their retention > time cannot be lower than some minimum. Your configuration is set to less > than this minimum, therefore is not accepted. Even Windows#until javadoc > specifies it: > > * Set the window maintain duration (retention time) in milliseconds. > > * This retention time is a guaranteed <i>lower bound</i> for how long a > window will be maintained. > > For more info consider reading > [this](https://github.com/confluentinc/examples/issues/76) issue. > > Regards, Jozef > > > Sent from [ProtonMail](https://protonmail.ch), encrypted email based in > Switzerland. > > > > -------- Original Message -------- > Subject: Re: Immutable Record with Kafka Stream > Local Time: February 24, 2017 7:11 PM > UTC Time: February 24, 2017 7:11 PM > From: tarop...@gmail.com > To: users@kafka.apache.org > > Guozhang, thanks for the reply, but I'm having trouble understanding, > here's the statement from the document > > Windowing operations are available in the Kafka Streams DSL >> <http://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-dsl>, >> where users can specify a *retention period* for the window. This allows >> Kafka Streams to retain old window buckets for a period of time in order to >> wait for the late arrival of records whose timestamps fall within the >> window interval. If a record arrives after the retention period has passed, >> the record cannot be processed and is dropped. > > > And I believe I can set retention period by using 'until' > > TimeWindows.of(60000).until(60000) > > > After receiving a data from (00:06:00), I don't know why it still continue > receiving data from time of 00:00:00, what is 'until' supposed to do ? > > Thanks > -Kohki >
signature.asc
Description: OpenPGP digital signature