[ 
https://issues.apache.org/jira/browse/KAFKA-20237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yin Lei updated KAFKA-20237:
----------------------------
    Description: 
I encountered a scenario where the `KafkaProducer` fails to recover if the 
initial SSL handshake with the broker fails, even after the underlying SSL 
configuration is corrected.

 

*Steps to Reproduce:*

1. Configure a `KafkaProducer` with SSL enabled, but use an incorrect/untrusted 
certificate on the server side to trigger an `SSLHandshakeException`.
2. Start the Producer and attempt to send a message.
3. The Producer logs show recurring SSL handshake errors. At this point, 
`TransactionManager` enters the `INITIALIZING` state.
4. Correct the SSL certificate configuration on the {*}{{*}}server side{{*}}{*} 
so that the broker is now reachable and the handshake can succeed.
5. Observe the Producer's behavior, messages still cat not be sent to broker.

 

*Expected Behavior:*

The Producer should successfully complete the SSL handshake, and the `Sender` 
thread should retry the `InitProducerId` request, allowing the 
`TransactionManager` to transition from `INITIALIZING` to `READY`.

 

*Actual Behavior:*

Even though the network/SSL layer is recovered, the `KafkaProducer` remains 
unable to send messages. The `TransactionManager` stays stuck in `INITIALIZING` 
because the initial failure to obtain a `ProducerId` isn't properly 
re-triggered or the state machine doesn't recover from the specific handshake 
exception during the transition.
h3. *Potential Impact:*

In long-running microservices, if the initial connection to Kafka fails due to 
temporary infrastructure or certificate issues, the Producer becomes 
permanently "broken" and requires a full application restart to recover, which 
is not ideal for high-availability systems.

 
h3. *PS: Log Snippet*

> The producer thread repeatedly prints the following log, and no message 
> sending record was found.

```

02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][Sender 444] [Producer clientId=producer-4] Nodes with data ready to 
send: [192.168.0.10:9812 (id: 0 rack: null)]  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][ProducerBatch 121] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
attempt: 0  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will 
not backoff, shouldWaitMore false, hasLeaderChanged false  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][BuiltInPartitioner 258] [Producer clientId=producer-4] The number 
of partitions is too small: available=1, all=1, not using adaptive for topic 
dte_nb_federation_receive  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][ProducerBatch 121] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
attempt: 0  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will 
not backoff, shouldWaitMore false, hasLeaderChanged false  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][Sender 444] [Producer clientId=producer-4] Nodes with data ready to 
send: [192.168.0.10:9812 (id: 0 rack: null)]  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][ProducerBatch 121] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
attempt: 0  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will 
not backoff, shouldWaitMore false, hasLeaderChanged false  
02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread | 
producer-4][BuiltInPartitioner 258] [Producer clientId=producer-4] The number 
of partitions is too small: available=1, all=1, not using adaptive for topic 
dte_nb_federation_receive  
02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread | 
producer-4][ProducerBatch 121] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
attempt: 0  
02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread | 
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will 
not backoff, shouldWaitMore false, hasLeaderChanged false 

```

 

 

  was:
I encountered a scenario where the `KafkaProducer` fails to recover if the 
initial SSL handshake with the broker fails, even after the underlying SSL 
configuration is corrected.

 

*{*}Steps to Reproduce:{*}*

1. Configure a `KafkaProducer` with SSL enabled, but use an incorrect/untrusted 
certificate on the server side to trigger an `SSLHandshakeException`.
2. Start the Producer and attempt to send a message.
3. The Producer logs show recurring SSL handshake errors. At this point, 
`TransactionManager` enters the `INITIALIZING` state.
4. Correct the SSL certificate configuration on the *{*}server side{*}* so that 
the broker is now reachable and the handshake can succeed.
5. Observe the Producer's behavior, messages still cat not be sent to broker.

 

*{*}Expected Behavior:{*}*

The Producer should successfully complete the SSL handshake, and the `Sender` 
thread should retry the `InitProducerId` request, allowing the 
`TransactionManager` to transition from `INITIALIZING` to `READY`.

 

*{*}Actual Behavior:{*}*

Even though the network/SSL layer is recovered, the `KafkaProducer` remains 
unable to send messages. The `TransactionManager` stays stuck in `INITIALIZING` 
because the initial failure to obtain a `ProducerId` isn't properly 
re-triggered or the state machine doesn't recover from the specific handshake 
exception during the transition.
h3. *Potential Impact:*

In long-running microservices, if the initial connection to Kafka fails due to 
temporary infrastructure or certificate issues, the Producer becomes 
permanently "broken" and requires a full application restart to recover, which 
is not ideal for high-availability systems.

 
h3. *PS: Log Snippet*

> The producer thread repeatedly prints the following log, and no message 
> sending record was found.

```

02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][Sender 444] [Producer clientId=producer-4] Nodes with data ready to 
send: [192.168.0.10:9812 (id: 0 rack: null)]  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][ProducerBatch 121] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
attempt: 0  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will 
not backoff, shouldWaitMore false, hasLeaderChanged false  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][BuiltInPartitioner 258] [Producer clientId=producer-4] The number 
of partitions is too small: available=1, all=1, not using adaptive for topic 
dte_nb_federation_receive  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][ProducerBatch 121] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
attempt: 0  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will 
not backoff, shouldWaitMore false, hasLeaderChanged false  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][Sender 444] [Producer clientId=producer-4] Nodes with data ready to 
send: [192.168.0.10:9812 (id: 0 rack: null)]  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][ProducerBatch 121] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
attempt: 0  
02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will 
not backoff, shouldWaitMore false, hasLeaderChanged false  
02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread | 
producer-4][BuiltInPartitioner 258] [Producer clientId=producer-4] The number 
of partitions is too small: available=1, all=1, not using adaptive for topic 
dte_nb_federation_receive  
02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread | 
producer-4][ProducerBatch 121] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
attempt: 0  
02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread | 
producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), will 
not backoff, shouldWaitMore false, hasLeaderChanged false 

```

 

 


>  TransactionManager stuck in `INITIALIZING` state after initial SSL handshake 
> failure
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-20237
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20237
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, producer 
>    Affects Versions: 3.9.0
>         Environment: - Operating System: Linux aarch64;
> - Kafka Version (Both Client and Server): 3.9.0;
> - security.protocol: SSL;
> - Some producer configurations: retries=2, reconnect.backoff.ms=30000, 
> transactional.id not set, enable.idempotence not set;
>            Reporter: Yin Lei
>            Priority: Major
>
> I encountered a scenario where the `KafkaProducer` fails to recover if the 
> initial SSL handshake with the broker fails, even after the underlying SSL 
> configuration is corrected.
>  
> *Steps to Reproduce:*
> 1. Configure a `KafkaProducer` with SSL enabled, but use an 
> incorrect/untrusted certificate on the server side to trigger an 
> `SSLHandshakeException`.
> 2. Start the Producer and attempt to send a message.
> 3. The Producer logs show recurring SSL handshake errors. At this point, 
> `TransactionManager` enters the `INITIALIZING` state.
> 4. Correct the SSL certificate configuration on the {*}{{*}}server 
> side{{*}}{*} so that the broker is now reachable and the handshake can 
> succeed.
> 5. Observe the Producer's behavior, messages still cat not be sent to broker.
>  
> *Expected Behavior:*
> The Producer should successfully complete the SSL handshake, and the `Sender` 
> thread should retry the `InitProducerId` request, allowing the 
> `TransactionManager` to transition from `INITIALIZING` to `READY`.
>  
> *Actual Behavior:*
> Even though the network/SSL layer is recovered, the `KafkaProducer` remains 
> unable to send messages. The `TransactionManager` stays stuck in 
> `INITIALIZING` because the initial failure to obtain a `ProducerId` isn't 
> properly re-triggered or the state machine doesn't recover from the specific 
> handshake exception during the transition.
> h3. *Potential Impact:*
> In long-running microservices, if the initial connection to Kafka fails due 
> to temporary infrastructure or certificate issues, the Producer becomes 
> permanently "broken" and requires a full application restart to recover, 
> which is not ideal for high-availability systems.
>  
> h3. *PS: Log Snippet*
> > The producer thread repeatedly prints the following log, and no message 
> > sending record was found.
> ```
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][Sender 444] [Producer clientId=producer-4] Nodes with data ready 
> to send: [192.168.0.10:9812 (id: 0 rack: null)]  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][ProducerBatch 121] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
> attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
> attempt: 0  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> will not backoff, shouldWaitMore false, hasLeaderChanged false  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][BuiltInPartitioner 258] [Producer clientId=producer-4] The number 
> of partitions is too small: available=1, all=1, not using adaptive for topic 
> dte_nb_federation_receive  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][ProducerBatch 121] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
> attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
> attempt: 0  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> will not backoff, shouldWaitMore false, hasLeaderChanged false  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][Sender 444] [Producer clientId=producer-4] Nodes with data ready 
> to send: [192.168.0.10:9812 (id: 0 rack: null)]  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][ProducerBatch 121] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
> attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
> attempt: 0  
> 02-25 21:19:33.716+0800[TRACE][kafka-producer-network-thread | 
> producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> will not backoff, shouldWaitMore false, hasLeaderChanged false  
> 02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread | 
> producer-4][BuiltInPartitioner 258] [Producer clientId=producer-4] The number 
> of partitions is too small: available=1, all=1, not using adaptive for topic 
> dte_nb_federation_receive  
> 02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread | 
> producer-4][ProducerBatch 121] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> leader wasn't updated, currentLeaderEpoch: OptionalInt[25], 
> attemptsWhenLeaderLastChanged:0, latestLeaderEpoch: OptionalInt[25], current 
> attempt: 0  
> 02-25 21:19:33.717+0800[TRACE][kafka-producer-network-thread | 
> producer-4][RecordAccumulator 823] [Producer clientId=producer-4] For 
> ProducerBatch(topicPartition=dte_nb_federation_receive-0, recordCount=7), 
> will not backoff, shouldWaitMore false, hasLeaderChanged false 
> ```
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to