First, I want to mention that you do no see "duplicate" -- you see late
updates. Kafka Streams embraces "change" and there is no such thing as a
final aggregate, but each agg output record is an update/refinement of
the result.

Strict filtering of "late updates" is hard in Kafka Streams

If you want to have such filtering, you would need to use

aggregate(...).toStream().transform()

with an attached state for transform() to implement this filter
manually. The state holds all emitted record per key. If a records
arrives, you check if its in the state of not. If not, you add it to the
state and emit it. If yes, you just drop the record.

However, this will still not be perfect, because each time a commit is
triggered, the current window is flushed even if "stream time" did not
pass "window end" timestamp -- thus, the window is not completed yet.

Thus, you would also need to consider current "stream time" that you can
indirectly access via .punctuate(). Thus, for incomplete windows you
might want to filter those "intermediate results" and not add to the
store. This is hard to get right (I am even not sure if it is possible
at all to get right).

Even if this works however, this will only give you no duplicates (in
the strong sense of duplicate) as long as no error occurs. Kafka Streams
does not (yet) support exactly once processing and thus, in case of a
failure, you might get duplicate outputs.

I am not sure what kind of alerting you are doing, but you should
remember if you did raise an alert in some other way, and if an late
update (or real duplicate) occurs don't alert a second time.

Hope this helps.



-Matthias



On 2/24/17 2:16 PM, Jozef.koval wrote:
> Hi Kohki,
> 
> Kafka streams windows use so called "segments" internally and their retention 
> time cannot be lower than some minimum. Your configuration is set to less 
> than this minimum, therefore is not accepted. Even Windows#until javadoc 
> specifies it:
> 
> * Set the window maintain duration (retention time) in milliseconds.
> 
> * This retention time is a guaranteed <i>lower bound</i> for how long a 
> window will be maintained.
> 
> For more info consider reading 
> [this](https://github.com/confluentinc/examples/issues/76) issue.
> 
> Regards, Jozef
> 
> 
> Sent from [ProtonMail](https://protonmail.ch), encrypted email based in 
> Switzerland.
> 
> 
> 
> -------- Original Message --------
> Subject: Re: Immutable Record with Kafka Stream
> Local Time: February 24, 2017 7:11 PM
> UTC Time: February 24, 2017 7:11 PM
> From: tarop...@gmail.com
> To: users@kafka.apache.org
> 
> Guozhang, thanks for the reply, but I'm having trouble understanding,
> here's the statement from the document
> 
> Windowing operations are available in the Kafka Streams DSL
>> <http://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-dsl>,
>> where users can specify a *retention period* for the window. This allows
>> Kafka Streams to retain old window buckets for a period of time in order to
>> wait for the late arrival of records whose timestamps fall within the
>> window interval. If a record arrives after the retention period has passed,
>> the record cannot be processed and is dropped.
> 
> 
> And I believe I can set retention period by using 'until'
> 
> TimeWindows.of(60000).until(60000)
> 
> 
> After receiving a data from (00:06:00), I don't know why it still continue
> receiving data from time of 00:00:00, what is 'until' supposed to do ?
> 
> Thanks
> -Kohki
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to