[ 
https://issues.apache.org/jira/browse/KAFKA-20628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hector Geraldino updated KAFKA-20628:
-------------------------------------
    Description: 
The {{PartitionLeaderCache}} introduced in 
[KAFKA-17663|https://github.com/apache/kafka/pull/17367] has no expiration 
mechanism. There are cases where the admin client is used infrequently (once 
every few minutes to hours), resulting in cached partition-leader mappings 
becoming stale.

Now, if a cached broker goes down between calls, the next {{listOffsets()}} 
skips the metadata lookup and routes directly to the cached (and now dead) 
broker, this request waiting {{request.timeout.ms}} (default 30s) before 
retrying. In most cases this is not a problem, client will retry when the 
information becomes stale, but this is hitting a pathological corner case in 
Kafka Connect.

h3. Impact on Kafka Connect

In our case, after updating our Kafka Connect fleet from 3.9 to 4.2 we started 
noticing lots of {{TimeoutException}} exceptions being thrown each time our 
Kafka brokers restarted. 

In Connect's Distributed Mode, the admin client used by {{KafkaBasedLog}} for 
internal topics is called infrequently — mostly for session key rotation once 
every hour. When a broker hosting the config topic partition leader is bounced 
between rotations:

  1. {{putSessionKey()}} writes the key (producer refreshes metadata), then 
calls {{configLog.readToEnd()}} to confirm.
  2. The background thread's {{admin.endOffsets()}} hits the (now stale) cache, 
sending the request to the dead broker.
  3. The admin's retry timeout ({{default.api.timeout.ms}}, (default 60s) 
exceeds {{putSessionKey}}'s hardcoded 30-second budget 
({{READ_WRITE_TOTAL_TIMEOUT_MS}}), causing a {{TimeoutException}}.
  4. The herder enters a {{readConfigToEnd}} retry loop. Each subsequent 
attempt times out after {{worker.sync.timeout.ms}} (default 3s), and the worker 
leaves the group — triggering cascading rebalances across the cluster.

h2. Proposed Fix

Add TTL-based expiration to {{PartitionLeaderCache}} using the existing 
{{metadata.max.age.ms}} (default 5 min) as the expiry. As the cached leader 
info is derived from metadata, it makes sense for it not to outlive the 
metadata refresh interval. This preserves the caching optimization for rapid 
successive calls while preventing stale routing for infrequent calls. 

h3. Our workaround

To avoid this problem, we have added the following config knobs to our workers:

  {code}
  request.timeout.ms=10000
  default.api.timeout.ms=20000
  worker.sync.timeout.ms=15000
  {code}

This ensures the admin retry time fits within {{putSessionKey}}'s 30-second 
budget.

  h2. How to Reproduce

  1. Start a distributed Connect cluster (2+ workers).
  2. Stop the broker hosting the {{_connect-configs}} topic partition leader.
  3. Wait for the next session key rotation (or set 
{{inter.worker.key.ttl.ms=60000}} or lower).
  4. Observe {{TimeoutException}}, followed by cascading group leaves.

  was:
The {{PartitionLeaderCache}} introduced in 
[KAFKA-17663|https://github.com/apache/kafka/pull/17367] has no expiration 
mechanism. When the admin client is used infrequently (minutes to hours between 
calls), cached partition-leader mappings become stale.

If a cached broker goes down between calls, the next {{listOffsets()}} skips 
the metadata lookup and routes directly to the dead broker, waiting 
{{request.timeout.ms}} (default 30s) before retrying. 

h3. Impact on Kafka Connect

After updating our Kafka Connect fleet from 3.9 to 4.2, we started noticing 
lots of {{TimeoutException}} exceptions being thrown each time our Kafka 
brokers restarted. 

In Connect's distributed mode, the admin client used by {{KafkaBasedLog}} for 
internal topics is called infrequently — mostly for session key rotation every 
hour. When a broker hosting the config topic partition leader is bounced 
between rotations:

  1. {{putSessionKey()}} writes the key (producer refreshes metadata, 
succeeds), then calls {{configLog.readToEnd()}} to confirm.
  2. The background thread's {{admin.endOffsets()}} hits the (now stale) cache, 
sending the request to the dead broker.
  3. The admin's retry cycle ({{default.api.timeout.ms}}, default 60s) exceeds 
{{putSessionKey}}'s hardcoded 30-second budget 
({{READ_WRITE_TOTAL_TIMEOUT_MS}}), causing a {{TimeoutException}}.
  4. The herder enters a {{readConfigToEnd}} retry loop. Each attempt times out 
after {{worker.sync.timeout.ms}} (default 3s), and the worker leaves the group 
— triggering cascading rebalances across the cluster.

h2. Proposed Fix

Add TTL-based expiration to {{PartitionLeaderCache}} using the existing 
{{metadata.max.age.ms}} (default 5 min) as the expiry. As the cached leader 
info is derived from metadata, it makes sense for it not to outlive the 
metadata refresh interval. This preserves the caching optimization for rapid 
successive calls while preventing stale routing for infrequent calls. 

h3. Our workaround

To avoid this problem, we have added the following config knobs to our workers:

  {code}
  request.timeout.ms=10000
  default.api.timeout.ms=20000
  worker.sync.timeout.ms=15000
  {code}

This ensures the admin retry time fits within {{putSessionKey}}'s 30-second 
budget.

  h2. How to Reproduce

  1. Start a distributed Connect cluster (2+ workers).
  2. Stop the broker hosting the {{_connect-configs}} topic partition leader.
  3. Wait for the next session key rotation (or set 
{{inter.worker.key.ttl.ms=60000}}).
  4. Observe {{TimeoutException}}, followed by cascading group leaves.


> TimeoutException cascades in Connect due to PartitionLeaderCache stale entries
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-20628
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20628
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, connect
>    Affects Versions: 4.0.0, 4.1.0, 4.2.0, 4.3.0
>            Reporter: Hector Geraldino
>            Priority: Minor
>
> The {{PartitionLeaderCache}} introduced in 
> [KAFKA-17663|https://github.com/apache/kafka/pull/17367] has no expiration 
> mechanism. There are cases where the admin client is used infrequently (once 
> every few minutes to hours), resulting in cached partition-leader mappings 
> becoming stale.
> Now, if a cached broker goes down between calls, the next {{listOffsets()}} 
> skips the metadata lookup and routes directly to the cached (and now dead) 
> broker, this request waiting {{request.timeout.ms}} (default 30s) before 
> retrying. In most cases this is not a problem, client will retry when the 
> information becomes stale, but this is hitting a pathological corner case in 
> Kafka Connect.
> h3. Impact on Kafka Connect
> In our case, after updating our Kafka Connect fleet from 3.9 to 4.2 we 
> started noticing lots of {{TimeoutException}} exceptions being thrown each 
> time our Kafka brokers restarted. 
> In Connect's Distributed Mode, the admin client used by {{KafkaBasedLog}} for 
> internal topics is called infrequently — mostly for session key rotation once 
> every hour. When a broker hosting the config topic partition leader is 
> bounced between rotations:
>   1. {{putSessionKey()}} writes the key (producer refreshes metadata), then 
> calls {{configLog.readToEnd()}} to confirm.
>   2. The background thread's {{admin.endOffsets()}} hits the (now stale) 
> cache, sending the request to the dead broker.
>   3. The admin's retry timeout ({{default.api.timeout.ms}}, (default 60s) 
> exceeds {{putSessionKey}}'s hardcoded 30-second budget 
> ({{READ_WRITE_TOTAL_TIMEOUT_MS}}), causing a {{TimeoutException}}.
>   4. The herder enters a {{readConfigToEnd}} retry loop. Each subsequent 
> attempt times out after {{worker.sync.timeout.ms}} (default 3s), and the 
> worker leaves the group — triggering cascading rebalances across the cluster.
> h2. Proposed Fix
> Add TTL-based expiration to {{PartitionLeaderCache}} using the existing 
> {{metadata.max.age.ms}} (default 5 min) as the expiry. As the cached leader 
> info is derived from metadata, it makes sense for it not to outlive the 
> metadata refresh interval. This preserves the caching optimization for rapid 
> successive calls while preventing stale routing for infrequent calls. 
> h3. Our workaround
> To avoid this problem, we have added the following config knobs to our 
> workers:
>   {code}
>   request.timeout.ms=10000
>   default.api.timeout.ms=20000
>   worker.sync.timeout.ms=15000
>   {code}
> This ensures the admin retry time fits within {{putSessionKey}}'s 30-second 
> budget.
>   h2. How to Reproduce
>   1. Start a distributed Connect cluster (2+ workers).
>   2. Stop the broker hosting the {{_connect-configs}} topic partition leader.
>   3. Wait for the next session key rotation (or set 
> {{inter.worker.key.ttl.ms=60000}} or lower).
>   4. Observe {{TimeoutException}}, followed by cascading group leaves.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to