Thanks for the details!

Seem Guozhang did a PR to improve the behavior:
https://github.com/apache/kafka/pull/7573

(I saw that you reviewed the PR already, just following up to close the
loop for other and make them aware of the change.)


-Matthias


On 10/15/19 1:17 PM, Javier Holguera wrote:
> Hi Matthias,
> 
> In our case, we are doing a Left-Join between a KStream and a KTable. The
> data can arrive with a few milliseconds/seconds apart. If the KTable
> arrives first, that's fine. However, if the KStream arrives first, we might
> "incorrectly" push downstream a result with null for the KTable only
> because the record arrived "late".
> 
> If idleStartTime was reset after the partitions were empty, `
> max.task.idle.ms` would apply "again" and we would wait x milliseconds,
> meaning we would be able to use the record from the KTable, assuming its
> timestamp was in fact earlier than the one in the KStream.
> 
> However, since as you said once the task goes into "forced processing mode"
> the only way to get "out" is to populate both partitions, we might end up
> consuming one partition, then the other, without ever getting to the point
> where both partitions have records at the same time.
> 
> Maybe we are using the wrong semantics here, but I'm curious why
> idleStartTime is not reset when partitions are empty, by design.
> 
> Thanks.
> 
> 
> On Tue, 15 Oct 2019 at 18:36, Matthias J. Sax <matth...@confluent.io> wrote:
> 
>> Javier,
>>
>> If a task is put into "forced processing mode" it will stay there until
>> all partitions have data at the same time. That is be design.
>>
>> Why is this behavior problematic for your use case?
>>
>>
>>
>> -Matthias
>>
>>
>> On 10/14/19 7:44 AM, Javier Holguera wrote:
>>> Hi,
>>>
>>> We have a KStream and a KTable that we are left-joining. The KTable has a
>>> "backlog" of records that we want to consume before any of the entries in
>>> the KStream is processed. To guarantee that, we have played with the
>>> timestamp extraction, setting the time for those records in the "distant"
>>> past to guarantee they will be consumed before any of the records in the
>>> KStream are processed.
>>>
>>> This is working as expected, forcing the KTable to be ingested before the
>>> KStream. However, an unexpected side effect that we have noticed is this
>>> "delayed" is only applied once when the application starts. Going through
>>> the code in StreamTask method (
>>>
>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L333-L351
>> )
>>> it seems that the problem is the fact that, once all records in all
>>> partitions are consumed, idleStartTime is not set back to UNKNOWN. That
>>> means that the first record that arrives through any of the two
>> partitions,
>>> will be immediately processed.
>>>
>>> Is this by design?
>>>
>>> Thanks.
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to