[
https://issues.apache.org/jira/browse/KAFKA-20237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yin Lei updated KAFKA-20237:
----------------------------
Description:
I encountered a scenario where the `KafkaProducer` fails to recover if the
initial SSL handshake with the broker fails, even after the underlying SSL
configuration is corrected.
*Steps to Reproduce:*
1. Configure a `KafkaProducer` with SSL enabled, but use an incorrect/untrusted
certificate on the server side to trigger an `SSLHandshakeException`.
2. Start the Producer and attempt to send a message.
3. The Producer logs show recurring SSL handshake errors. At this point,
`TransactionManager` enters the `INITIALIZING` state.
4. Correct the SSL certificate configuration on the *server side,* so that the
broker is now reachable and the handshake can succeed.
5. Observe the Producer's behavior, messages still cat not be sent to broker.
*Expected Behavior:*
The Producer should successfully complete the SSL handshake, and the `Sender`
thread should retry the `InitProducerId` request, allowing the
`TransactionManager` to transition from `INITIALIZING` to `READY`.
*Actual Behavior:*
Even though the network/SSL layer is recovered, the `KafkaProducer` remains
unable to send messages. The `TransactionManager` stays stuck in `INITIALIZING`
because the initial failure to obtain a `ProducerId` isn't properly
re-triggered, or the state machine doesn't recover from the specific handshake
exception during the transition.
h3. *Potential Impact:*
In long-running microservices, if the initial connection to Kafka fails due to
temporary infrastructure or certificate issues, the Producer becomes
permanently "broken" and requires a full application restart to recover, which
is not ideal for high-availability systems.
h3. *PS: Log Snippet*
> The producer thread repeatedly prints the following log, and no message
> sending record was found.
```
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][Sender 444] [Producer clientId=producer-4] Nodes with data ready to
send: [192.168.0.10:9812 (id: 0 rack: null)]
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][ProducerBatch 121] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
leader wasn't updated, currentLeaderEpoch: OptionalInt[25],
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current
attempt: 0
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will
not backoff, shouldWaitMore false, hasLeaderChanged false
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][BuiltInPartitioner 258] [Producer clientId=producer-4] The number
of partitions is too small: available=1, all=1, not using adaptive for topic
dte_nb_federation_receive
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][ProducerBatch 121] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
leader wasn't updated, currentLeaderEpoch: OptionalInt[25],
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current
attempt: 0
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will
not backoff, shouldWaitMore false, hasLeaderChanged false
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][Sender 444] [Producer clientId=producer-4] Nodes with data ready to
send: [192.168.0.10:9812 (id: 0 rack: null)]
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][ProducerBatch 121] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
leader wasn't updated, currentLeaderEpoch: OptionalInt[25],
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current
attempt: 0
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will
not backoff, shouldWaitMore false, hasLeaderChanged false
02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread |
producer-4][BuiltInPartitioner 258] [Producer clientId=producer-4] The number
of partitions is too small: available=1, all=1, not using adaptive for topic
dte_nb_federation_receive
02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread |
producer-4][ProducerBatch 121] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
leader wasn't updated, currentLeaderEpoch: OptionalInt[25],
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current
attempt: 0
02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread |
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will
not backoff, shouldWaitMore false, hasLeaderChanged false
```
was:
I encountered a scenario where the `KafkaProducer` fails to recover if the
initial SSL handshake with the broker fails, even after the underlying SSL
configuration is corrected.
*Steps to Reproduce:*
1. Configure a `KafkaProducer` with SSL enabled, but use an incorrect/untrusted
certificate on the server side to trigger an `SSLHandshakeException`.
2. Start the Producer and attempt to send a message.
3. The Producer logs show recurring SSL handshake errors. At this point,
`TransactionManager` enters the `INITIALIZING` state.
4. Correct the SSL certificate configuration on the *server side,* so that the
broker is now reachable and the handshake can succeed.
5. Observe the Producer's behavior, messages still cat not be sent to broker.
*Expected Behavior:*
The Producer should successfully complete the SSL handshake, and the `Sender`
thread should retry the `InitProducerId` request, allowing the
`TransactionManager` to transition from `INITIALIZING` to `READY`.
*Actual Behavior:*
Even though the network/SSL layer is recovered, the `KafkaProducer` remains
unable to send messages. The `TransactionManager` stays stuck in `INITIALIZING`
because the initial failure to obtain a `ProducerId` isn't properly
re-triggered or the state machine doesn't recover from the specific handshake
exception during the transition.
h3. *Potential Impact:*
In long-running microservices, if the initial connection to Kafka fails due to
temporary infrastructure or certificate issues, the Producer becomes
permanently "broken" and requires a full application restart to recover, which
is not ideal for high-availability systems.
h3. *PS: Log Snippet*
> The producer thread repeatedly prints the following log, and no message
> sending record was found.
```
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][Sender 444] [Producer clientId=producer-4] Nodes with data ready to
send: [192.168.0.10:9812 (id: 0 rack: null)]
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][ProducerBatch 121] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
leader wasn't updated, currentLeaderEpoch: OptionalInt[25],
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current
attempt: 0
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will
not backoff, shouldWaitMore false, hasLeaderChanged false
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][BuiltInPartitioner 258] [Producer clientId=producer-4] The number
of partitions is too small: available=1, all=1, not using adaptive for topic
dte_nb_federation_receive
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][ProducerBatch 121] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
leader wasn't updated, currentLeaderEpoch: OptionalInt[25],
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current
attempt: 0
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will
not backoff, shouldWaitMore false, hasLeaderChanged false
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][Sender 444] [Producer clientId=producer-4] Nodes with data ready to
send: [192.168.0.10:9812 (id: 0 rack: null)]
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][ProducerBatch 121] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
leader wasn't updated, currentLeaderEpoch: OptionalInt[25],
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current
attempt: 0
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will
not backoff, shouldWaitMore false, hasLeaderChanged false
02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread |
producer-4][BuiltInPartitioner 258] [Producer clientId=producer-4] The number
of partitions is too small: available=1, all=1, not using adaptive for topic
dte_nb_federation_receive
02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread |
producer-4][ProducerBatch 121] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
leader wasn't updated, currentLeaderEpoch: OptionalInt[25],
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current
attempt: 0
02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread |
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will
not backoff, shouldWaitMore false, hasLeaderChanged false
```
> TransactionManager stuck in `INITIALIZING` state after initial SSL handshake
> failure
> -------------------------------------------------------------------------------------
>
> Key: KAFKA-20237
> URL: https://issues.apache.org/jira/browse/KAFKA-20237
> Project: Kafka
> Issue Type: Bug
> Components: clients, producer
> Affects Versions: 3.9.0
> Environment: - Operating System: Linux aarch64;
> - Kafka Version (Both Client and Server): 3.9.0;
> - security.protocol: SSL;
> - Some producer configurations: retries=2, reconnect.backoff.ms=30000,
> transactional.id not set, enable.idempotence not set;
> Reporter: Yin Lei
> Priority: Major
>
> I encountered a scenario where the `KafkaProducer` fails to recover if the
> initial SSL handshake with the broker fails, even after the underlying SSL
> configuration is corrected.
>
> *Steps to Reproduce:*
> 1. Configure a `KafkaProducer` with SSL enabled, but use an
> incorrect/untrusted certificate on the server side to trigger an
> `SSLHandshakeException`.
> 2. Start the Producer and attempt to send a message.
> 3. The Producer logs show recurring SSL handshake errors. At this point,
> `TransactionManager` enters the `INITIALIZING` state.
> 4. Correct the SSL certificate configuration on the *server side,* so that
> the broker is now reachable and the handshake can succeed.
> 5. Observe the Producer's behavior, messages still cat not be sent to broker.
>
> *Expected Behavior:*
> The Producer should successfully complete the SSL handshake, and the `Sender`
> thread should retry the `InitProducerId` request, allowing the
> `TransactionManager` to transition from `INITIALIZING` to `READY`.
>
> *Actual Behavior:*
> Even though the network/SSL layer is recovered, the `KafkaProducer` remains
> unable to send messages. The `TransactionManager` stays stuck in
> `INITIALIZING` because the initial failure to obtain a `ProducerId` isn't
> properly re-triggered, or the state machine doesn't recover from the specific
> handshake exception during the transition.
> h3. *Potential Impact:*
> In long-running microservices, if the initial connection to Kafka fails due
> to temporary infrastructure or certificate issues, the Producer becomes
> permanently "broken" and requires a full application restart to recover,
> which is not ideal for high-availability systems.
> h3. *PS: Log Snippet*
> > The producer thread repeatedly prints the following log, and no message
> > sending record was found.
> ```
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
> producer-4][Sender 444] [Producer clientId=producer-4] Nodes with data ready
> to send: [192.168.0.10:9812 (id: 0 rack: null)]
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
> producer-4][ProducerBatch 121] For
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
> leader wasn't updated, currentLeaderEpoch: OptionalInt[25],
> attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current
> attempt: 0
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
> producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
> will not backoff, shouldWaitMore false, hasLeaderChanged false
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
> producer-4][BuiltInPartitioner 258] [Producer clientId=producer-4] The number
> of partitions is too small: available=1, all=1, not using adaptive for topic
> dte_nb_federation_receive
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
> producer-4][ProducerBatch 121] For
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
> leader wasn't updated, currentLeaderEpoch: OptionalInt[25],
> attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current
> attempt: 0
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
> producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
> will not backoff, shouldWaitMore false, hasLeaderChanged false
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
> producer-4][Sender 444] [Producer clientId=producer-4] Nodes with data ready
> to send: [192.168.0.10:9812 (id: 0 rack: null)]
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
> producer-4][ProducerBatch 121] For
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
> leader wasn't updated, currentLeaderEpoch: OptionalInt[25],
> attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current
> attempt: 0
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread |
> producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
> will not backoff, shouldWaitMore false, hasLeaderChanged false
> 02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread |
> producer-4][BuiltInPartitioner 258] [Producer clientId=producer-4] The number
> of partitions is too small: available=1, all=1, not using adaptive for topic
> dte_nb_federation_receive
> 02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread |
> producer-4][ProducerBatch 121] For
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
> leader wasn't updated, currentLeaderEpoch: OptionalInt[25],
> attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current
> attempt: 0
> 02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread |
> producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7),
> will not backoff, shouldWaitMore false, hasLeaderChanged false
> ```
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)