showuon commented on a change in pull request #11979: URL: https://github.com/apache/kafka/pull/11979#discussion_r840353943
########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ########## @@ -378,12 +378,12 @@ public int splitAndReenqueue(ProducerBatch bigBatch) { // producer id. We will not attempt to reorder messages if the producer id has changed, we will throw an // IllegalStateException instead. private void insertInSequenceOrder(Deque<ProducerBatch> deque, ProducerBatch batch) { - // When we are requeing and have enabled idempotence, the reenqueued batch must always have a sequence. + // When we are re-enqueueing and have enabled idempotence, the re-enqueued batch must always have a sequence. if (batch.baseSequence() == RecordBatch.NO_SEQUENCE) throw new IllegalStateException("Trying to re-enqueue a batch which doesn't have a sequence even " + "though idempotency is enabled."); - if (transactionManager.nextBatchBySequence(batch.topicPartition) == null) + if (!transactionManager.hasInflightBatches(batch.topicPartition)) Review comment: Nice cleanup. This way we don't have to return the first element in queue. ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ########## @@ -790,7 +781,6 @@ private void adjustSequencesDueToFailedBatch(ProducerBatch batch) { throw new IllegalStateException("Sequence number for batch with sequence " + inFlightBatch.baseSequence() + " for partition " + batch.topicPartition + " is going to become negative: " + newSequence); - log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}", inFlightBatch.baseSequence(), batch.topicPartition, newSequence); Review comment: why should we remove this log? ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ########## @@ -113,16 +113,7 @@ private TopicPartitionEntry getPartition(TopicPartition topicPartition) { } private TopicPartitionEntry getOrCreatePartition(TopicPartition topicPartition) { - TopicPartitionEntry ent = topicPartitions.get(topicPartition); - if (ent == null) { - ent = new TopicPartitionEntry(); - topicPartitions.put(topicPartition, ent); - } - return ent; - } - - private void addPartition(TopicPartition topicPartition) { - this.topicPartitions.putIfAbsent(topicPartition, new TopicPartitionEntry()); + return topicPartitions.putIfAbsent(topicPartition, new TopicPartitionEntry()); Review comment: I don't think this refactor is correct. `putIfAbsent` will return **previous** value of the key. That is, if the current partition is null, after `getOrCreatePartition`, it'll return `null`, which is not what we want. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org