[ 
https://issues.apache.org/jira/browse/KAFKA-19880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18064344#comment-18064344
 ] 

Travis Bischel commented on KAFKA-19880:
----------------------------------------

Likely related: KAFKA-14312

> First batch in producer epoch can be appended out-of-order even for 
> idempotent producer
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19880
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19880
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, producer 
>    Affects Versions: 4.0.1, 4.1.0
>            Reporter: Ivan Yurchenko
>            Priority: Minor
>
> I observe the following behavior. A Java producer with 
> enable.idempotence=true and max.in.flight.requests.per.connection=5 sends a 
> number of records, enough to form several batches in several Produce 
> requests. The topic is empty, freshly created.
> {code:java}
> [2025-11-11 11:41:19,125] DEBUG [Producer clientId=producer-1] Assigned 
> producerId 0 and producerEpoch 0 to batch with base sequence 0 being sent to 
> partition input-topic-0 
> (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2025-11-11 11:41:19,132] DEBUG [Producer clientId=producer-1] Sending 
> PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=13, 
> clientId=producer-1, correlationId=21, headerVersion=2) and timeout 30000 to 
> node 3: 
> {acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16383]}
>  (org.apache.kafka.clients.NetworkClient)
> [2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Assigned 
> producerId 0 and producerEpoch 0 to batch with base sequence 1458 being sent 
> to partition input-topic-0 
> (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Sending 
> PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=13, 
> clientId=producer-1, correlationId=22, headerVersion=2) and timeout 30000 to 
> node 3: 
> {acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
>  (org.apache.kafka.clients.NetworkClient)
> [2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Assigned 
> producerId 0 and producerEpoch 0 to batch with base sequence 2823 being sent 
> to partition input-topic-0 
> (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Sending 
> PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=13, 
> clientId=producer-1, correlationId=23, headerVersion=2) and timeout 30000 to 
> node 3: 
> {acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
>  (org.apache.kafka.clients.NetworkClient)
> [2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Assigned 
> producerId 0 and producerEpoch 0 to batch with base sequence 4188 being sent 
> to partition input-topic-0 
> (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2025-11-11 11:41:19,135] DEBUG [Producer clientId=producer-1] Sending 
> PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=13, 
> clientId=producer-1, correlationId=24, headerVersion=2) and timeout 30000 to 
> node 3: 
> {acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
>  (org.apache.kafka.clients.NetworkClient)
> [2025-11-11 11:41:19,135] DEBUG [Producer clientId=producer-1] Assigned 
> producerId 0 and producerEpoch 0 to batch with base sequence 5553 being sent 
> to partition input-topic-0 
> (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2025-11-11 11:41:19,135] DEBUG [Producer clientId=producer-1] Sending 
> PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=13, 
> clientId=producer-1, correlationId=25, headerVersion=2) and timeout 30000 to 
> node 3: 
> {acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
>  (org.apache.kafka.clients.NetworkClient){code}
>  
> The cluster and the target broker are experiencing some connectivity issue 
> affecting metadata propagation. The target broker doesn't think it's the 
> leader of the partition for some time, so it rejects first requests:
>  
> {code:java}
> [2025-11-11 11:41:20,456] DEBUG [KafkaApi-3] Produce request with correlation 
> id 21 from client producer-1 on partition 
> 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0 failed due to 
> org.apache.kafka.common.errors.NotLeaderOrFollowerException 
> (kafka.server.KafkaApis)
> [2025-11-11 11:41:24,876] DEBUG [KafkaApi-3] Produce request with correlation 
> id 22 from client producer-1 on partition 
> 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0 failed due to 
> org.apache.kafka.common.errors.NotLeaderOrFollowerException 
> (kafka.server.KafkaApis)
> {code}
> Then it finds out it's the leader of the partition, but doesn't yet learn 
> about the other replica:
>  
> {code:java}
> [2025-11-11 11:41:45,498] ERROR [ReplicaManager broker=3] Error processing 
> append operation on partition 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR : 1 is insufficient to satisfy the min.isr requirement of 2 for 
> partition input-topic-0, live replica(s) broker.id are : Set(3)
> [2025-11-11 11:41:45,498] DEBUG [KafkaApi-3] Produce request with correlation 
> id 23 from client producer-1 on partition 
> 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0 failed due to 
> org.apache.kafka.common.errors.NotEnoughReplicasException 
> (kafka.server.KafkaApis){code}
> And finally when it comes to the request with correlation ID 24, it succeeds. 
> So the first written batch is logically out-of-sequence, but accepted.
>  
> The previously failed batches may be retried later, with new producer epoch 
> (like happened in my case), but this doesn't matter: the intended order of 
> records is violated.
>  
> I'm able to live reproduce this quite reliably under 
> [Antithesis|https://antithesis.com/]. Probably, it won't be easy to make a 
> local setup to catch this with good frequency. But I boiled this down to the 
> following unit test (in ReplicaManagerTest):
> {noformat}
>   @Test
>   def testX(): Unit = {
>     val mockLogMgr = 
> TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)))
>     val node = new Node(0, "host0", 0)
>     mockGetAliveBrokerFunctions(metadataCache, Seq(node))
>     val rm = new ReplicaManager(
>       metrics = metrics,
>       config = config,
>       time = time,
>       scheduler = new MockScheduler(time),
>       logManager = mockLogMgr,
>       quotaManagers = quotaManager,
>       metadataCache = new KRaftMetadataCache(config.brokerId, () => 
> KRaftVersion.KRAFT_VERSION_0),
>       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
>       alterPartitionManager = alterPartitionManager,
>       threadNamePrefix = Option(this.getClass.getName))
>     try {
>       val tp = new TopicPartition(topic, 0)
>       val partition = rm.createPartition(tp)
>       partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
>         new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
>       rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(0, 0, 
> brokerEpoch,
>         Seq(new LeaderAndIsrRequest.PartitionState()
>           .setTopicName(topic)
>           .setPartitionIndex(0)
>           .setControllerEpoch(0)
>           .setLeader(config.brokerId())
>           .setLeaderEpoch(0)
>           .setIsr(Seq[Integer](config.brokerId()).asJava)
>           .setPartitionEpoch(0)
>           .setReplicas(Seq[Integer](config.brokerId()).asJava)
>           .setIsNew(false)).asJava,
>         Collections.singletonMap(topic, topicId),
>         Set(node).asJava).build(), (_, _) => ())
>       rm.getPartitionOrException(tp)
>         .localLogOrException
>       val topicIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, 
> partition.topicPartition)
>       val producerId = 10L
>       val producerEpoch: Short = 10
>       val r0 = MemoryRecords.withIdempotentRecords(Compression.NONE, 
> producerId, producerEpoch, 0, new SimpleRecord("record 0".getBytes()))
>       val r1 = MemoryRecords.withIdempotentRecords(Compression.NONE, 
> producerId, producerEpoch, 1, new SimpleRecord("record 1".getBytes()))
> //      val order = List(r0, r1)  // normal order -- no ordering violation
>       val order = List(r1, r0)  // r1 will be accepted out of order; r0 will 
> be retried later with newer epoch and accepted
>       def callback(responseStatus: Map[TopicIdPartition, PartitionResponse]): 
> Unit = {
>         println(responseStatus(topicIdPartition).error)
>       }
>       for (r <- order) {
>         rm.appendRecords(
>           timeout = 1000,
>           requiredAcks = -1,
>           internalTopicsAllowed = false,
>           origin = AppendOrigin.CLIENT,
>           entriesPerPartition = Map(topicIdPartition -> r),
>           responseCallback = callback,
>         )
>       }
>     } finally {
>       rm.shutdown(checkpointHW = false)
>     }
>   }
> {noformat}
>  
> As far as I understand, it isn't necessary for the partition to be empty. 
> It's enough for the producer state to be empty and the first requests in the 
> producer epoch to fail like above.
> If a first request in the producer epoch succeeds, the situation will not be 
> possible. For example, we send r1, r2, r3. r1 succeeds, r2 fails with 
> NotLeaderOrFollowerException, r3 won't be accepted and will fail with 
> OutOfOrderSequenceException.
> I attribute this difference to [this validation on the broker 
> side|https://github.com/apache/kafka/blob/3479ce793bafc6a1c42e6afa77e3fbfc3a36c80c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java#L144-L148].
>  
> AFAIU, this is a bug, at least according to how I read the documentation. I 
> would expect enable.idempotence to prevent this from happening. Am I right or 
> it is acceptable and we should improve the documentation instead?
> Can we require on the broker side that the first batch in a producer epoch 
> always starts with the sequence 0?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to