[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835832#comment-16835832 ]
John Roesler commented on KAFKA-8315: ------------------------------------- Hi [~the4thamigo_uk], Sorry for leaving you hanging a bit. I'm glad your investigation is progressing. Last question first: Sterams should choose to consume from the left or right based on which one has the lower timestamp in the next record, so I would not expect one side to "run ahead" of the other. There's one caveat, that when one side is being produced more slowly, Streams won't just wait indefinitely for the next data, but instead just process the side that does have data. This is controled by the "max idle ms" config, but since you're processing historically, this shouldn't be your problem. Still might be worth a look. Maybe for debugging purposes, you can print out the key, value, and timestamp for each of the sides as well as in the joiner, so you can identify which side is triggering the join, and evaluate whether or not it's correctly time-ordered. If it is in fact running ahead on one side, despite what it should be doing, this would explain why you see better results with a larger grace period. To confirm, the grace period should only matter up to the maximum time skew in your stream. So, as you said, if you have two producers that each produce a full 24 hours of data, sequentially, then you should see stream time advance when the first producer writes its data, and then "freeze" while the second producer writes its (out-of-order) data. Thus, you'll want to set the grace period to keep old windows around for at least 24 hours, since you know you have to wait for that second producer's data. Finally, to answer your earlier questions, yes, each task is handling just one partition of both input topics (the same partition on the left and right). Stream Time is independently maintained for each task/partition, and it is computed simply as the highest timestamp yet observed for that partition. If you want to look at it in detail, it's tracked in org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore . Actually, you can set that class's logger to DEBUG mode and it'll print out every time it skips a record that is outside of retention. Minor point, you should not need to mess with the retention of the changelog topic. Streams sets this appropriately to preserve the same data as the store, but this is only apparent when restoring the store. The actual results of the join are served out of the state store, so only the state store's retention matters. This is what you're setting with the grace period. I hope this helps! -John > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > ----------------------------------------------------------------------------------------------------- > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Andrew > Assignee: John Roesler > Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)