[ 
https://issues.apache.org/jira/browse/KAFKA-16319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield reassigned KAFKA-16319:
----------------------------------------

    Assignee: Andrew Schofield

> Wrong broker destinations for DeleteRecords requests when more than one topic 
> is involved and the topics/partitions are led by different brokers
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16319
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16319
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.6.1
>            Reporter: AlexeyASF
>            Assignee: Andrew Schofield
>            Priority: Major
>
> h2. Context
> Kafka streams applications send, time after time, {{DeleteRecords}} requests, 
> via 
> {{org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords}}
>  method. Such requests may involve more than 1 topic (or partition), and such 
> requests are supposed to be sent to partitions' leaders brokers.
>  
> h2. Observed behaviour
> In case when {{DeleteRecords}} request includes more than one topic (let's 
> say 2 - {{topic1}} and {{{}topic2{}}}), and these topics are led by different 
> brokers (let’s say {{broker1}} and {{broker2}} respectively), the request is 
> sent to only one broker (let’s say {{{}broker1{}}}), leading to partial 
> not_leader_or_follower errors. As not the whole request was successful 
> ({{{}topic1{}}} is fine, but {{topic2}} is not), it gets retried, with the 
> _same_ arguments, to the _same_ broker ({{{}broker1{}}}), meaning the 
> response will be partially faulty again and again. It also may (and does) 
> happen that there is a “mirrored” half-faulty request - in this case, to 
> {{{}broker2{}}}, where {{topic2}} operation is successful, but {{topic1}} 
> operation fails.
> Here’s an anonymised logs example from a production system (“direct” and 
> “mirrored” requests, one after another):
> {code:java}
> [AdminClient clientId=worker-admin]
> Sending DeleteRecordsRequestData(topics=[
>   DeleteRecordsTopic(
>     name='topic1',
>     partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
>     ),
>   DeleteRecordsTopic(
>     name='topic2',
>     partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
> )], timeoutMs=60000)
> to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1
> correlationId=42003907, timeoutMs=30000
> [AdminClient clientId=worker-admin]
> Sending DeleteRecordsRequestData(topics=[
>   DeleteRecordsTopic(
>     name='topic1',
>     partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
>   ),
>   DeleteRecordsTopic(
>     name='topic2',
>     partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
> )], timeoutMs=60000)
> to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2
> correlationId=42003906, timeoutMs=30000 {code}
> Such request results in the following response (in this case, only for the 
> "direct" response):
> {code:java}
> [AdminClient clientId=worker-admin]
> Call(
>   callName=deleteRecords(api=DELETE_RECORDS),
>   deadlineMs=...,
>   tries=..., // Can be hundreds
>   nextAllowedTryMs=...)
> got response DeleteRecordsResponseData(
>   throttleTimeMs=0,
>   topics=[
>     DeleteRecordsTopicResult(
>       name='topic2',
>       partitions=[DeleteRecordsPartitionResult(
>         partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the 
> errorCode 6, which is not_leader_or_follower
>     DeleteRecordsTopicResult(
>       name='topic1',
>       partitions=[DeleteRecordsPartitionResult(
>         partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the 
> errorCode 0, which means the operation was successful
>   ]
> ) {code}
> h2. Expected behaviour
> {{DeleteRecords}} requests are sent to corresponding partitions' leaders 
> brokers when more than 1 topic/partition is involved and they are led by 
> different brokers.
> h2. Notes
>  * {_}presumably{_}, introduced in 3.6.1 via 
> [https://github.com/apache/kafka/pull/13760] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to