[
https://issues.apache.org/jira/browse/KAFKA-19880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18064344#comment-18064344
]
Travis Bischel commented on KAFKA-19880:
----------------------------------------
Likely related: KAFKA-14312
> First batch in producer epoch can be appended out-of-order even for
> idempotent producer
> ---------------------------------------------------------------------------------------
>
> Key: KAFKA-19880
> URL: https://issues.apache.org/jira/browse/KAFKA-19880
> Project: Kafka
> Issue Type: Bug
> Components: core, producer
> Affects Versions: 4.0.1, 4.1.0
> Reporter: Ivan Yurchenko
> Priority: Minor
>
> I observe the following behavior. A Java producer with
> enable.idempotence=true and max.in.flight.requests.per.connection=5 sends a
> number of records, enough to form several batches in several Produce
> requests. The topic is empty, freshly created.
> {code:java}
> [2025-11-11 11:41:19,125] DEBUG [Producer clientId=producer-1] Assigned
> producerId 0 and producerEpoch 0 to batch with base sequence 0 being sent to
> partition input-topic-0
> (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2025-11-11 11:41:19,132] DEBUG [Producer clientId=producer-1] Sending
> PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=13,
> clientId=producer-1, correlationId=21, headerVersion=2) and timeout 30000 to
> node 3:
> {acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16383]}
> (org.apache.kafka.clients.NetworkClient)
> [2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Assigned
> producerId 0 and producerEpoch 0 to batch with base sequence 1458 being sent
> to partition input-topic-0
> (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Sending
> PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=13,
> clientId=producer-1, correlationId=22, headerVersion=2) and timeout 30000 to
> node 3:
> {acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
> (org.apache.kafka.clients.NetworkClient)
> [2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Assigned
> producerId 0 and producerEpoch 0 to batch with base sequence 2823 being sent
> to partition input-topic-0
> (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Sending
> PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=13,
> clientId=producer-1, correlationId=23, headerVersion=2) and timeout 30000 to
> node 3:
> {acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
> (org.apache.kafka.clients.NetworkClient)
> [2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Assigned
> producerId 0 and producerEpoch 0 to batch with base sequence 4188 being sent
> to partition input-topic-0
> (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2025-11-11 11:41:19,135] DEBUG [Producer clientId=producer-1] Sending
> PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=13,
> clientId=producer-1, correlationId=24, headerVersion=2) and timeout 30000 to
> node 3:
> {acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
> (org.apache.kafka.clients.NetworkClient)
> [2025-11-11 11:41:19,135] DEBUG [Producer clientId=producer-1] Assigned
> producerId 0 and producerEpoch 0 to batch with base sequence 5553 being sent
> to partition input-topic-0
> (org.apache.kafka.clients.producer.internals.RecordAccumulator)
> [2025-11-11 11:41:19,135] DEBUG [Producer clientId=producer-1] Sending
> PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=13,
> clientId=producer-1, correlationId=25, headerVersion=2) and timeout 30000 to
> node 3:
> {acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
> (org.apache.kafka.clients.NetworkClient){code}
>
> The cluster and the target broker are experiencing some connectivity issue
> affecting metadata propagation. The target broker doesn't think it's the
> leader of the partition for some time, so it rejects first requests:
>
> {code:java}
> [2025-11-11 11:41:20,456] DEBUG [KafkaApi-3] Produce request with correlation
> id 21 from client producer-1 on partition
> 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0 failed due to
> org.apache.kafka.common.errors.NotLeaderOrFollowerException
> (kafka.server.KafkaApis)
> [2025-11-11 11:41:24,876] DEBUG [KafkaApi-3] Produce request with correlation
> id 22 from client producer-1 on partition
> 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0 failed due to
> org.apache.kafka.common.errors.NotLeaderOrFollowerException
> (kafka.server.KafkaApis)
> {code}
> Then it finds out it's the leader of the partition, but doesn't yet learn
> about the other replica:
>
> {code:java}
> [2025-11-11 11:41:45,498] ERROR [ReplicaManager broker=3] Error processing
> append operation on partition 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the
> current ISR : 1 is insufficient to satisfy the min.isr requirement of 2 for
> partition input-topic-0, live replica(s) broker.id are : Set(3)
> [2025-11-11 11:41:45,498] DEBUG [KafkaApi-3] Produce request with correlation
> id 23 from client producer-1 on partition
> 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0 failed due to
> org.apache.kafka.common.errors.NotEnoughReplicasException
> (kafka.server.KafkaApis){code}
> And finally when it comes to the request with correlation ID 24, it succeeds.
> So the first written batch is logically out-of-sequence, but accepted.
>
> The previously failed batches may be retried later, with new producer epoch
> (like happened in my case), but this doesn't matter: the intended order of
> records is violated.
>
> I'm able to live reproduce this quite reliably under
> [Antithesis|https://antithesis.com/]. Probably, it won't be easy to make a
> local setup to catch this with good frequency. But I boiled this down to the
> following unit test (in ReplicaManagerTest):
> {noformat}
> @Test
> def testX(): Unit = {
> val mockLogMgr =
> TestUtils.createLogManager(config.logDirs.asScala.map(new File(_)))
> val node = new Node(0, "host0", 0)
> mockGetAliveBrokerFunctions(metadataCache, Seq(node))
> val rm = new ReplicaManager(
> metrics = metrics,
> config = config,
> time = time,
> scheduler = new MockScheduler(time),
> logManager = mockLogMgr,
> quotaManagers = quotaManager,
> metadataCache = new KRaftMetadataCache(config.brokerId, () =>
> KRaftVersion.KRAFT_VERSION_0),
> logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
> alterPartitionManager = alterPartitionManager,
> threadNamePrefix = Option(this.getClass.getName))
> try {
> val tp = new TopicPartition(topic, 0)
> val partition = rm.createPartition(tp)
> partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
> new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
> rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(0, 0,
> brokerEpoch,
> Seq(new LeaderAndIsrRequest.PartitionState()
> .setTopicName(topic)
> .setPartitionIndex(0)
> .setControllerEpoch(0)
> .setLeader(config.brokerId())
> .setLeaderEpoch(0)
> .setIsr(Seq[Integer](config.brokerId()).asJava)
> .setPartitionEpoch(0)
> .setReplicas(Seq[Integer](config.brokerId()).asJava)
> .setIsNew(false)).asJava,
> Collections.singletonMap(topic, topicId),
> Set(node).asJava).build(), (_, _) => ())
> rm.getPartitionOrException(tp)
> .localLogOrException
> val topicIdPartition = new TopicIdPartition(Uuid.ZERO_UUID,
> partition.topicPartition)
> val producerId = 10L
> val producerEpoch: Short = 10
> val r0 = MemoryRecords.withIdempotentRecords(Compression.NONE,
> producerId, producerEpoch, 0, new SimpleRecord("record 0".getBytes()))
> val r1 = MemoryRecords.withIdempotentRecords(Compression.NONE,
> producerId, producerEpoch, 1, new SimpleRecord("record 1".getBytes()))
> // val order = List(r0, r1) // normal order -- no ordering violation
> val order = List(r1, r0) // r1 will be accepted out of order; r0 will
> be retried later with newer epoch and accepted
> def callback(responseStatus: Map[TopicIdPartition, PartitionResponse]):
> Unit = {
> println(responseStatus(topicIdPartition).error)
> }
> for (r <- order) {
> rm.appendRecords(
> timeout = 1000,
> requiredAcks = -1,
> internalTopicsAllowed = false,
> origin = AppendOrigin.CLIENT,
> entriesPerPartition = Map(topicIdPartition -> r),
> responseCallback = callback,
> )
> }
> } finally {
> rm.shutdown(checkpointHW = false)
> }
> }
> {noformat}
>
> As far as I understand, it isn't necessary for the partition to be empty.
> It's enough for the producer state to be empty and the first requests in the
> producer epoch to fail like above.
> If a first request in the producer epoch succeeds, the situation will not be
> possible. For example, we send r1, r2, r3. r1 succeeds, r2 fails with
> NotLeaderOrFollowerException, r3 won't be accepted and will fail with
> OutOfOrderSequenceException.
> I attribute this difference to [this validation on the broker
> side|https://github.com/apache/kafka/blob/3479ce793bafc6a1c42e6afa77e3fbfc3a36c80c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java#L144-L148].
>
> AFAIU, this is a bug, at least according to how I read the documentation. I
> would expect enable.idempotence to prevent this from happening. Am I right or
> it is acceptable and we should improve the documentation instead?
> Can we require on the broker side that the first batch in a producer epoch
> always starts with the sequence 0?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)