[ 
https://issues.apache.org/jira/browse/KAFKA-20532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sahil Devgon updated KAFKA-20532:
---------------------------------
    Fix Version/s: 4.4.0

> Defensive trimming of min.insync.replicas in controller and kafka-topics to 
> tolerate whitespace-contaminated configs
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-20532
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20532
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller, tools
>    Affects Versions: 3.7.0, 3.8.0, 3.8.1, 3.9.0, 3.9.1, 3.9.2, 4.0.0, 4.0.1, 
> 4.1.0, 4.2.0, 4.3.0, 4.0.2, 4.1.1, 4.1.2, 4.3.1
>            Reporter: Sahil Devgon
>            Assignee: Sahil Devgon
>            Priority: Critical
>              Labels: Availability, kraft, migration, regression
>             Fix For: 4.4.0
>
>
> *Summary :*
> Whitespace-contaminated min.insync.replicas values silently accepted at write 
> time cause a controller failover loop and full cluster unavailability with no 
> operator-visible warning, because the --describe observability path trims 
> whitespace before displaying the value.
>  
> *Affected Versions*
>  
>  * Two separate call sites with different version ranges - verified via git 
> history:
>  
> ||Call Site||File||Introduced||Affected Versions||
> |
> *ReplicationControlManager.getTopicEffectiveMinIsr()*|
> metadata/.../ReplicationControlManager.java|
> Kafka 3.7.0 (commit af747fbfed, KAFKA-15581 / KIP-966 ELR, Oct 2023)|
> 3.7.0, 3.8.x, 3.9.x, 4.0.x, 4.1.x, 4.2.x, 4.3.x|
> |
> *TopicCommand.PartitionDescription.minIsrCount()*|
> tools/.../TopicCommand.java|
> Kafka 2.2.0 (commit 3fb1d70b6b, KIP-351 --under-min-isr, Feb 2019)|
> 2.2.0 through all current versions|
>  
> Note on call site 1: getTopicEffectiveMinIsr() did not exist before 3.7.0. 
> Kafka versions 3.3–3.6 running in KRaft mode do not contain this code path 
> and are not affected by the controller failover crash.
> Note on call site 2: The minIsrCount() CLI crash is a long-standing pre-KRaft 
> issue present since 2.2.0. It is only reachable when a 
> whitespace-contaminated value is actually stored - which in practice requires 
> either a direct ZK write, a ZK to KRaft migration carrying corrupt ZK data, 
> or a client exploiting the validate-trim/store-raw mismatch in 
> ConfigurationControlManager.
> h3. *Impact*
> A topic config min.insync.replicas stored with leading or trailing whitespace 
> (e.g. " 2") causes the KRaft controller to enter a continuous failover loop, 
> making the cluster unavailable. The crash is triggered on every 
> alterPartition event (e.g. any replica
> restart), so recovery is not spontaneous. Critically, kafka-topics.sh 
> --describe silently strips the whitespace before displaying the value, so the 
> corrupted config is invisible to operators. There is no log warning, no 
> alert, and no way to detect the problem via the standard Admin API 
> observability path. An operator can stare at a failing cluster and see only
> clean config values.
> h3. *Analysis of the bug:*
> h4. *How Whitespace Gets Into the Metadata Store*
> Two ingestion paths can introduce whitespace-padded values:
>  # {*}ZK to KRaft migration{*}: ZooKeeper does not enforce config formatting. 
> Values stored by older Kafka versions or direct ZooKeeper tooling are copied 
> into the KRaft metadata log as raw strings with no normalisation.
>  # {*}Normal alterConfigs write path{*}: ControllerConfigurationValidator 
> delegates to LogConfig.validate() -> ConfigDef.parse() -> 
> ConfigDef.parseType(), which trims the value before validating it. However, 
> the raw (untrimmed) string from the ConfigRecord is what is written into the 
> KRaft metadata log. The trim is validation-only; the persisted value is never 
> normalised. This means a client that sends min.insync.replicas=" 2" passes 
> validation and is stored as " 2" permanently.
> h4. *Root Cause: Two Divergent Code Paths*
> The --describe (Admin API) path and the controller runtime path handle the 
> same stored value differently:
>  
> ||
> *Code path*||
> *Entry point*||
> *Whitespace handling*||
> *Outcome*||
> |
> *kafka-topics --describe*|
> LogConfig.fromProps() -> ConfigDef.parseType()|
> .trim() before parseInt|
> Space silently stripped; shows clean "2"|
> |
> *Controller alterPartition*|
> ReplicationControlManager.getTopicEffectiveMinIsr()|
> Raw string returned as-is|
> Integer.parseInt(" 2") -> NumberFormatException -> controller failover|
> |
> *kafka-topics --under-min-isr-partitions*|
> TopicCommand.PartitionDescription.minIsrCount()|
> Raw string returned as-is|
> Same NumberFormatException thrown in CLI tool|
>  
>  
> h4. *Crash Sequence*
> 1. Topic has min.insync.replicas=" 2" (with leading space) in metadata store
> 2. Any replica restarts → broker sends AlterPartition RPC to controller
> 3. QuorumController.alterPartition() → 
> ReplicationControlManager.getTopicEffectiveMinIsr()
> 4. Integer.parseInt(" 2") throws NumberFormatException
> 5. Treated as UnknownServerException → controller renounces leadership
> 6. Controller reverts to last committed offset, becomes active again
> 7. Next AlterPartition arrives → crash repeats → infinite failover loop
> 8. Cluster is unavailable; kafka-topics --describe shows no anomaly
> {*}Controller log evidence{*}:
> {code:java}
> ERROR Encountered quorum controller fault: alterPartition: event failed
> with NumberFormatException (treated as UnknownServerException) at epoch N
> java.lang.NumberFormatException: For input string: " 2"
> at java.lang.Integer.parseInt(Integer.java:614)
> at 
> org.apache.kafka.controller.ReplicationControlManager.getTopicEffectiveMinIsr(...){code}
> h4. Affected Call Sites
> *Call site 1 - Primary (availability-breaking):*
> {code:java}
> metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
>  ~line 2395
> // BEFORE (crashes controller)
> int getTopicEffectiveMinIsr(String topicName) {
> String minIsrConfig = configurationControl.getTopicConfig(topicName, 
> MIN_IN_SYNC_REPLICAS_CONFIG).value();
> int currentMinIsr = Integer.parseInt(minIsrConfig); // throws on " 2"
> ...
> }{code}
> *Call site 2 - Secondary (CLI tool exception):*
> {code:java}
> tools/src/main/java/org/apache/kafka/tools/TopicCommand.java ~line 315
> // BEFORE (throws in kafka-topics CLI)
> int minIsrCount() {
> return 
> Integer.parseInt(config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value());
> }{code}
> Note: Call site 2 is unrelated to ZK migration - it is a pre-existing raw 
> parseInt in the CLI tool that would fail on any whitespace-contaminated 
> value, regardless of how it arrived. Both call sites share the same pattern 
> and are fixed in the same PR for completeness.
> h3. *Scope of Audit: Why Only min.insync.replicas?*
> Ques: Whether other numeric topic configs (segment.bytes, retention.ms, 
> max.message.bytes, segment.ms, flush.messages, etc.) have the same raw 
> parseInt exposure ?
> Answer: most numeric config reads in the controller go through 
> ConfigDef.parseType(), which trims before parsing and is therefore safe. 
> min.insync.replicas is anomalous because getTopicEffectiveMinIsr() bypasses 
> ConfigDef entirely and reads directly from the raw metadata store for 
> hot-path performance.
> A grep for Integer.parseInt.getTopicConfig and Integer.parseInt.config.get 
> across the controller and tools packages identified the two call sites above 
> as the only instances of this pattern. However, a comprehensive audit of all 
> controller-side numeric config reads is acknowledged as part of the follow-up 
> work (see below), as the current search was targeted
> rather than exhaustive.
> h3. *Steps to Reproduce*
>  # Deploy Kafka 3.x or 4.x in KRaft mode (or ZK to KRaft migration mode).
>  # Write min.insync.replicas with a leading space directly into the metadata 
> store:
> {code:java}
> bash
> # Via ZooKeeper (migration scenario)
> zkCli.sh set /config/topics/<topic> \
> '{"version":1,"config":{"min.insync.replicas":" 2"}}'{code}
> Or via the Admin API (normal alterConfigs - value passes validation and is 
> stored raw):
> {code:java}
> bash
> kafka-configs.sh --bootstrap-server <broker:port> \
> —entity-type topics —entity-name <topic> \
> —alter —add-config 'min.insync.replicas= 2'{code}
>  # Confirm that describe command has no anomaly
>  # Trigger an `alterPartition` event: restart one replica broker.
>  # Observe the controller log entering the failover loop (see crash sequence 
> above).
> h3. *Expected Result*
> Controller parses `" 2"` correctly, returns `2`, and processes 
> `alterPartition` without error. Cluster remains available.
> h3. *Actual Result*
> `NumberFormatException` propagates through `QuorumController`, causing the 
> controller to renounce and restart in a loop. Cluster becomes unavailable. 
> `kafka-topics --describe` shows no anomaly.
> h3. *Proposed Fix*
> Add `.trim()` before each raw `Integer.parseInt()` call at the two identified 
> call sites. This is the minimum necessary change: it is surgical, has no 
> behaviour change for well-formed configs, and correctly tolerates all 
> whitespace variants (leading, trailing, both).
> *Fix 1 - ReplicationControlManager:*
> {code:java}
> // AFTER
> int getTopicEffectiveMinIsr(String topicName) {
> String minIsrConfig = configurationControl.getTopicConfig(topicName, 
> MIN_IN_SYNC_REPLICAS_CONFIG).value();
> int currentMinIsr = Integer.parseInt(minIsrConfig.trim()); // tolerates " 2", 
> "2 ", " 2 "
> ...
> }{code}
> *Fix 2 - TopicCommand:*
> {code:java}
> // AFTER
> int minIsrCount() {
> return 
> Integer.parseInt(config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value().trim());
> }{code}
> No public APIs, wire formats, config keys, metric names, or CLI flags are 
> added, modified, or removed. This change is purely internal.
> h3. Compatibility
>  * Backward compatible: Clusters without whitespace in configs see identical 
> behaviour.
>  * Forward compatible: No protocol or format changes.
> h3. Operational Workaround
> The following workaround cleans the corrupted config by rewriting it through 
> the validated `ConfigDef` path, which normalises the value before storage:
> {code:java}
> kafka-configs.sh --bootstrap-server <broker:port> \
> --entity-type topics \
> --entity-name <affected-topic> \
> --alter \
> --add-config min.insync.replicas=2{code}
> Important caveat : This workaround requires a functioning controller and 
> reachable Admin API. If the cluster is already in a failover loop when this 
> is attempted, the Admin API may be unreachable. In that scenario, use the 
> ZooKeeper-direct recovery path (applicable to clusters still running 
> ZooKeeper or in migration mode):
> {code:java}
> # In-incident ZooKeeper-direct remediation (no controller required)
> zkCli.sh set /config/topics/<affected-topic> \
> '{"version":1,"config":{"min.insync.replicas":"2"}}'{code}
> After this ZK write, trigger a controller restart to pick up the corrected 
> value. For pure-KRaft clusters in a failover loop, the fix must be deployed 
> (or a temporary leader forced) before the Admin API path becomes usable.
> h3. Rejected Alternatives
> h4. 1. Fix inside ConfigurationControlManager.getTopicConfig()
> Trim the value before returning the `ConfigEntry`. Rejected because:
>  * `getTopicConfig()` is the authoritative accessor for what is stored in the 
> metadata log.
> Silently mutating returned values would hide data quality issues and make 
> stored state unobservable.
>  * Does not fix the secondary `TopicCommand` call site.
>  * The targeted trim at each `parseInt` call site is the minimum necessary 
> change.
> h3. 2. Validate and reject configs with whitespace during alterConfigs
> Reject configs containing whitespace at write time. Rejected because:
>  * Does not help existing clusters already in migration or already carrying 
> corrupt ZK data.
>  * `ConfigDef.parseType()` already silently trims valid configs on the normal 
> write path; making
> whitespace a hard rejection would be a behavioural change for clients that 
> currently send padded values and expect them to be accepted.
> h3. 3. Normalise configs at the write path in `ConfigurationControlManager` - 
> Deferred to follow-up(see below)
> Trim all string configs when written into the KRaft log (both via 
> `alterConfigs` and ZK migration ingestion). This is the preferred long-term 
> fix (see Follow-up section below) but is intentionally out of scope here 
> because:
>  * It is a broader, higher-risk change to the migration and write code paths.
>  * It alters the canonical stored form of all string config values and 
> warrants comprehensive test coverage across all config types.
>  * It does not fix the immediate availability risk any faster than the 
> targeted trim.
> h3. 4. Catch `NumberFormatException` and fall back to a default
> Wrap `parseInt` in a try-catch and return a default `min.insync.replicas`. 
> Rejected because:
>  * Silently discarding the operator's intended value is dangerous; the padded 
> `" 2"` encodes a real intent of `2` that should be honoured.
>  * This papers over the root cause rather than fixing it.
> ----
> h2. Follow-up Work
> h3. KAFKA-XXXB: Normalise config values at the KRaft write path to close the 
> validate-vs-store mismatch
> The root cause of this class of bugs is a store-raw-but-validate-trimmed 
> mismatch** in ConfigurationControlManager:
> // ConfigurationControlManager.java:346 (current behaviour)
> // ConfigDef.parseType() trims " 2" → validates as 2, but stores the raw " 2"
> allConfigs.put(configRecord.name(), configRecord.value()); // raw, untrimmed
> `ControllerConfigurationValidator` delegates to `LogConfig.validate()` -> 
> ConfigDef.parse() -> `ConfigDef.parseType()`, which trims before validating. 
> But the raw `ConfigRecord` value is what is persisted. This means any client 
> sending a padded value will pass validation and permanently store a corrupted 
> value that the controller runtime later hits raw.
> The long-term fix normalises at the point of storage:
> // Proposed: trim at write time so validated form == persisted form
> allConfigs.put(configRecord.name(), configRecord.value().trim());
> This single change would prevent whitespace-padded values from entering the 
> metadata store through any path (normal alterConfigs or ZK migration), making 
> the .trim() guards at each parseInt call site belt-and-suspenders rather than 
> necessary runtime defences.
> Scope of follow-up also includes:
>  * Comprehensive audit of all controller-side numeric config reads to 
> identify any remaining raw parseInt call sites beyond the two fixed here.
>  * Assessment of whether the same write-path normalisation should be applied 
> during ZK migration ingestion in ZkMigrationClient.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to