[
https://issues.apache.org/jira/browse/KAFKA-20628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hector Geraldino updated KAFKA-20628:
-------------------------------------
Description:
The {{PartitionLeaderCache}} introduced in
[KAFKA-17663|https://github.com/apache/kafka/pull/17367] has no expiration
mechanism. There are cases where the admin client is used infrequently (once
every few minutes to hours), resulting in cached partition-leader mappings
becoming stale.
Now, if a cached broker goes down between calls, the next {{listOffsets()}}
skips the metadata lookup and routes directly to the cached (and now dead)
broker, this request waiting {{request.timeout.ms}} (default 30s) before
retrying. In most cases this is not a problem, client will retry when the
information becomes stale, but this is hitting a pathological corner case in
Kafka Connect.
h3. Impact on Kafka Connect
In our case, after updating our Kafka Connect fleet from 3.9 to 4.2 we started
noticing lots of {{TimeoutException}} exceptions being thrown each time our
Kafka brokers restarted.
In Connect's Distributed Mode, the admin client used by {{KafkaBasedLog}} for
internal topics is called infrequently — mostly for session key rotation once
every hour. When a broker hosting the config topic partition leader is bounced
between rotations:
1. {{putSessionKey()}} writes the key (producer refreshes metadata), then
calls {{configLog.readToEnd()}} to confirm.
2. The background thread's {{admin.endOffsets()}} hits the (now stale) cache,
sending the request to the dead broker.
3. The admin's retry timeout ({{default.api.timeout.ms}}, (default 60s)
exceeds {{putSessionKey}}'s hardcoded 30-second budget
({{READ_WRITE_TOTAL_TIMEOUT_MS}}), causing a {{TimeoutException}}.
4. The herder enters a {{readConfigToEnd}} retry loop. Each subsequent
attempt times out after {{worker.sync.timeout.ms}} (default 3s), and the worker
leaves the group — triggering cascading rebalances across the cluster.
h2. Proposed Fix
Add TTL-based expiration to {{PartitionLeaderCache}} using the existing
{{metadata.max.age.ms}} (default 5 min) as the expiry. As the cached leader
info is derived from metadata, it makes sense for it not to outlive the
metadata refresh interval. This preserves the caching optimization for rapid
successive calls while preventing stale routing for infrequent calls.
h3. Our workaround
To avoid this problem, we have added the following config knobs to our workers:
{code}
request.timeout.ms=10000
default.api.timeout.ms=20000
worker.sync.timeout.ms=15000
{code}
This ensures the admin retry time fits within {{putSessionKey}}'s 30-second
budget.
h2. How to Reproduce
1. Start a distributed Connect cluster (2+ workers).
2. Stop the broker hosting the {{_connect-configs}} topic partition leader.
3. Wait for the next session key rotation (or set
{{inter.worker.key.ttl.ms=60000}} or lower).
4. Observe {{TimeoutException}}, followed by cascading group leaves.
was:
The {{PartitionLeaderCache}} introduced in
[KAFKA-17663|https://github.com/apache/kafka/pull/17367] has no expiration
mechanism. When the admin client is used infrequently (minutes to hours between
calls), cached partition-leader mappings become stale.
If a cached broker goes down between calls, the next {{listOffsets()}} skips
the metadata lookup and routes directly to the dead broker, waiting
{{request.timeout.ms}} (default 30s) before retrying.
h3. Impact on Kafka Connect
After updating our Kafka Connect fleet from 3.9 to 4.2, we started noticing
lots of {{TimeoutException}} exceptions being thrown each time our Kafka
brokers restarted.
In Connect's distributed mode, the admin client used by {{KafkaBasedLog}} for
internal topics is called infrequently — mostly for session key rotation every
hour. When a broker hosting the config topic partition leader is bounced
between rotations:
1. {{putSessionKey()}} writes the key (producer refreshes metadata,
succeeds), then calls {{configLog.readToEnd()}} to confirm.
2. The background thread's {{admin.endOffsets()}} hits the (now stale) cache,
sending the request to the dead broker.
3. The admin's retry cycle ({{default.api.timeout.ms}}, default 60s) exceeds
{{putSessionKey}}'s hardcoded 30-second budget
({{READ_WRITE_TOTAL_TIMEOUT_MS}}), causing a {{TimeoutException}}.
4. The herder enters a {{readConfigToEnd}} retry loop. Each attempt times out
after {{worker.sync.timeout.ms}} (default 3s), and the worker leaves the group
— triggering cascading rebalances across the cluster.
h2. Proposed Fix
Add TTL-based expiration to {{PartitionLeaderCache}} using the existing
{{metadata.max.age.ms}} (default 5 min) as the expiry. As the cached leader
info is derived from metadata, it makes sense for it not to outlive the
metadata refresh interval. This preserves the caching optimization for rapid
successive calls while preventing stale routing for infrequent calls.
h3. Our workaround
To avoid this problem, we have added the following config knobs to our workers:
{code}
request.timeout.ms=10000
default.api.timeout.ms=20000
worker.sync.timeout.ms=15000
{code}
This ensures the admin retry time fits within {{putSessionKey}}'s 30-second
budget.
h2. How to Reproduce
1. Start a distributed Connect cluster (2+ workers).
2. Stop the broker hosting the {{_connect-configs}} topic partition leader.
3. Wait for the next session key rotation (or set
{{inter.worker.key.ttl.ms=60000}}).
4. Observe {{TimeoutException}}, followed by cascading group leaves.
> TimeoutException cascades in Connect due to PartitionLeaderCache stale entries
> ------------------------------------------------------------------------------
>
> Key: KAFKA-20628
> URL: https://issues.apache.org/jira/browse/KAFKA-20628
> Project: Kafka
> Issue Type: Bug
> Components: clients, connect
> Affects Versions: 4.0.0, 4.1.0, 4.2.0, 4.3.0
> Reporter: Hector Geraldino
> Priority: Minor
>
> The {{PartitionLeaderCache}} introduced in
> [KAFKA-17663|https://github.com/apache/kafka/pull/17367] has no expiration
> mechanism. There are cases where the admin client is used infrequently (once
> every few minutes to hours), resulting in cached partition-leader mappings
> becoming stale.
> Now, if a cached broker goes down between calls, the next {{listOffsets()}}
> skips the metadata lookup and routes directly to the cached (and now dead)
> broker, this request waiting {{request.timeout.ms}} (default 30s) before
> retrying. In most cases this is not a problem, client will retry when the
> information becomes stale, but this is hitting a pathological corner case in
> Kafka Connect.
> h3. Impact on Kafka Connect
> In our case, after updating our Kafka Connect fleet from 3.9 to 4.2 we
> started noticing lots of {{TimeoutException}} exceptions being thrown each
> time our Kafka brokers restarted.
> In Connect's Distributed Mode, the admin client used by {{KafkaBasedLog}} for
> internal topics is called infrequently — mostly for session key rotation once
> every hour. When a broker hosting the config topic partition leader is
> bounced between rotations:
> 1. {{putSessionKey()}} writes the key (producer refreshes metadata), then
> calls {{configLog.readToEnd()}} to confirm.
> 2. The background thread's {{admin.endOffsets()}} hits the (now stale)
> cache, sending the request to the dead broker.
> 3. The admin's retry timeout ({{default.api.timeout.ms}}, (default 60s)
> exceeds {{putSessionKey}}'s hardcoded 30-second budget
> ({{READ_WRITE_TOTAL_TIMEOUT_MS}}), causing a {{TimeoutException}}.
> 4. The herder enters a {{readConfigToEnd}} retry loop. Each subsequent
> attempt times out after {{worker.sync.timeout.ms}} (default 3s), and the
> worker leaves the group — triggering cascading rebalances across the cluster.
> h2. Proposed Fix
> Add TTL-based expiration to {{PartitionLeaderCache}} using the existing
> {{metadata.max.age.ms}} (default 5 min) as the expiry. As the cached leader
> info is derived from metadata, it makes sense for it not to outlive the
> metadata refresh interval. This preserves the caching optimization for rapid
> successive calls while preventing stale routing for infrequent calls.
> h3. Our workaround
> To avoid this problem, we have added the following config knobs to our
> workers:
> {code}
> request.timeout.ms=10000
> default.api.timeout.ms=20000
> worker.sync.timeout.ms=15000
> {code}
> This ensures the admin retry time fits within {{putSessionKey}}'s 30-second
> budget.
> h2. How to Reproduce
> 1. Start a distributed Connect cluster (2+ workers).
> 2. Stop the broker hosting the {{_connect-configs}} topic partition leader.
> 3. Wait for the next session key rotation (or set
> {{inter.worker.key.ttl.ms=60000}} or lower).
> 4. Observe {{TimeoutException}}, followed by cascading group leaves.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)