[ 
https://issues.apache.org/jira/browse/KAFKA-19012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18042614#comment-18042614
 ] 

Jun Rao commented on KAFKA-19012:
---------------------------------

[~dnadolny] : Thanks for the detailed analysis! I believe that you identify the 
bug correctly. Great finding and really appreciate your effort!

 

Just to add a bit more details. In Sender, we have the following code in 
sendProducerData().
{code:java}
accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

// Reset the producer id if an expired batch has previously been sent to the 
broker. Also update the metrics
// for expired batches. see the documentation of 
@TransactionState.resetIdempotentProducerId to understand why
// we need to reset the producer id here.
if (!expiredBatches.isEmpty())
    log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
    String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) 
for " + expiredBatch.topicPartition
        + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch 
creation";
    failBatch(expiredBatch, new TimeoutException(errorMessage), false); {code}
We identify all expired batches in expiredBatches and call failBatch() on each 
of them, which eventually calls BufferPoll.deallocate() and allows the buffer 
to be reused. Expired batches from this.accumulator.expiredBatches() haven't 
been sent to NetworkClient. So, their batches could be deallocated immediately. 
The problem are those expired batches from getExpiredInflightBatches(). Those 
batches have been sent to NetworkClient, but their bytes may not have been 
completely sent to the Socket layer. So, it's incorrect to deallocate those 
batches at this point.

Regarding the fix, one simple fix is to avoid calling 
getExpiredInflightBatches(now) sendProducerData(). Those batches are already in 
the NetworkClient and there is existing logic for handling the timing out of 
those batches eventually. At that point, we are safe to deallocate those 
batches since they will never be needed to sent through the network. cc 
[~chia7712] .

> Messages ending up on the wrong topic
> -------------------------------------
>
>                 Key: KAFKA-19012
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19012
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, producer 
>    Affects Versions: 3.2.3, 3.8.1
>            Reporter: Donny Nadolny
>            Assignee: Kirk True
>            Priority: Blocker
>         Attachments: image-2025-08-06-13-34-30-830.png, rawnotes.txt
>
>
> We're experiencing messages very occasionally ending up on a different topic 
> than what they were published to. That is, we publish a message to topicA and 
> consumers of topicB see it and fail to parse it because the message contents 
> are meant for topicA. This has happened for various topics. 
> We've begun adding a header with the intended topic (which we get just by 
> reading the topic from the record that we're about to pass to the OSS client) 
> right before we call producer.send, this header shows the correct topic 
> (which also matches up with the message contents itself). Similarly we're 
> able to use this header and compare it to the actual topic to prevent 
> consuming these misrouted messages, but this is still concerning.
> Some details:
>  - This happens rarely: it happened approximately once per 10 trillion 
> messages for a few months, though there was a period of a week or so where it 
> happened more frequently (once per 1 trillion messages or so)
>  - It often happens in a small burst, eg 2 or 3 messages very close in time 
> (but from different hosts) will be misrouted
>  - It often but not always coincides with some sort of event in the cluster 
> (a broker restarting or being replaced, network issues causing errors, etc). 
> Also these cluster events happen quite often with no misrouted messages
>  - We run many clusters, it has happened for several of them
>  - There is no pattern between intended and actual topic, other than the 
> intended topic tends to be higher volume ones (but I'd attribute that to 
> there being more messages published -> more occurrences affecting it rather 
> than it being more likely per-message)
>  - It only occurs with clients that are using a non-zero linger
>  - Once it happened with two sequential messages, both were intended for 
> topicA but both ended up on topicB, published by the same host (presumably 
> within the same linger batch)
>  - Most of our clients are 3.2.3 and it has only affected those, most of our 
> brokers are 3.2.3 but it has also happened with a cluster that's running 
> 3.8.1 (but I suspect a client rather than broker problem because of it never 
> happening with clients that use 0 linger)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to