[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:43 AM:
--------------------------------------------------------

[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +++) and the right stream 
{{partitionTime = 1000}}. So, the left stream would be selected first, until 
all records are consumed in the left stream, then the right stream records 
would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+++ for convenience I am quoting these times as second offsets from the start 
time 1902580000000ms


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +) and the right stream {{partitionTime 
= 1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+ for convenience I am quoting these times as second offsets from the start 
time 1902580000000ms

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8315
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8315
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Andrew
>            Assignee: John Roesler
>            Priority: Major
>         Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to