[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835832#comment-16835832
 ] 

John Roesler commented on KAFKA-8315:
-------------------------------------

Hi [~the4thamigo_uk], Sorry for leaving you hanging a bit. I'm glad your 
investigation is progressing.

Last question first: Sterams should choose to consume from the left or right 
based on which one has the lower timestamp in the next record, so I would not 
expect one side to "run ahead" of the other. There's one caveat, that when one 
side is being produced more slowly, Streams won't just wait indefinitely for 
the next data, but instead just process the side that does have data. This is 
controled by the "max idle ms" config, but since you're processing 
historically, this shouldn't be your problem. Still might be worth a look.

Maybe for debugging purposes, you can print out the key, value, and timestamp 
for each of the sides as well as in the joiner, so you can identify which side 
is triggering the join, and evaluate whether or not it's correctly time-ordered.

If it is in fact running ahead on one side, despite what it should be doing, 
this would explain why you see better results with a larger grace period. To 
confirm, the grace period should only matter up to the maximum time skew in 
your stream. So, as you said, if you have two producers that each produce a 
full 24 hours of data, sequentially, then you should see stream time advance 
when the first producer writes its data, and then "freeze" while the second 
producer writes its (out-of-order) data. Thus, you'll want to set the grace 
period to keep old windows around for at least 24 hours, since you know you 
have to wait for that second producer's data.

Finally, to answer your earlier questions, yes, each task is handling just one 
partition of both input topics (the same partition on the left and right). 
Stream Time is independently maintained for each task/partition, and it is 
computed simply as the highest timestamp yet observed for that partition. If 
you want to look at it in detail, it's tracked in 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore . 
Actually, you can set that class's logger to DEBUG mode and it'll print out 
every time it skips a record that is outside of retention.

Minor point, you should not need to mess with the retention of the changelog 
topic. Streams sets this appropriately to preserve the same data as the store, 
but this is only apparent when restoring the store. The actual results of the 
join are served out of the state store, so only the state store's retention 
matters. This is what you're setting with the grace period.

I hope this helps!
-John

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8315
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8315
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Andrew
>            Assignee: John Roesler
>            Priority: Major
>         Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to