[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841571#comment-16841571
 ] 

John Roesler edited comment on KAFKA-8315 at 5/16/19 5:50 PM:
--------------------------------------------------------------

Oh, man. I'm sorry that I didn't convey the significance of that configuration 
enough. I just assumed you tried it. I feel bad for all the time you spent.

It's called "max idle time" and defaults to 0 because Streams actually has to 
resist processing data that it already has (aka, it has to idle) in order to 
wait for extra data. Streams would never idle before we added the config, and 
idling could have a severe impact on throughput for high-volume applications, 
so we basically can't default greater than 0.

Still, it seems like in a replay case like yours, it should at least wait until 
it polls all inputs at least once before starting, so I agree there's room for 
improvement here. I'm wondering if we should just take more control over the 
consumer and explicitly poll each topic, instead of just assigning them all and 
letting the consumer/broker decide which ones to give data back from first.

Since you dug in deep enough to figure this out, do you have any ideas, 
[~the4thamigo_uk]?


was (Author: vvcephei):
Oh, man. I'm sorry that I didn't convey the significance of that configuration 
enough. I just assumed you tried it. I feel bad for all the time you spent.

It's called "max idle time" and defaults to 0 because Streams actually has to 
resist processing data that it already has (aka, it has to idle) in order to 
wait for extra data. Streams would never idle before we added the config, and 
idling could have a severe impact on throughput for high-volume applications, 
so we basically can't default greater than 0.

Still, it seems like in a replay case like yours, it should at least wait until 
it polls all inputs at least once before starting, so I agree there's room for 
improvement here. I'm wondering if we should just take more control over the 
consumer and explicitly poll each topic, instead of just assigning them all and 
letting the consumer/broker decide which ones to give data back from first.

> Historical join issues
> ----------------------
>
>                 Key: KAFKA-8315
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8315
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Andrew
>            Assignee: John Roesler
>            Priority: Major
>         Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to