showuon commented on a change in pull request #11979:
URL: https://github.com/apache/kafka/pull/11979#discussion_r840353943



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##########
@@ -378,12 +378,12 @@ public int splitAndReenqueue(ProducerBatch bigBatch) {
     // producer id. We will not attempt to reorder messages if the producer id 
has changed, we will throw an
     // IllegalStateException instead.
     private void insertInSequenceOrder(Deque<ProducerBatch> deque, 
ProducerBatch batch) {
-        // When we are requeing and have enabled idempotence, the reenqueued 
batch must always have a sequence.
+        // When we are re-enqueueing and have enabled idempotence, the 
re-enqueued batch must always have a sequence.
         if (batch.baseSequence() == RecordBatch.NO_SEQUENCE)
             throw new IllegalStateException("Trying to re-enqueue a batch 
which doesn't have a sequence even " +
                 "though idempotency is enabled.");
 
-        if (transactionManager.nextBatchBySequence(batch.topicPartition) == 
null)
+        if (!transactionManager.hasInflightBatches(batch.topicPartition))

Review comment:
       Nice cleanup. This way we don't have to return the first element in 
queue.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -790,7 +781,6 @@ private void adjustSequencesDueToFailedBatch(ProducerBatch 
batch) {
                 throw new IllegalStateException("Sequence number for batch 
with sequence " + inFlightBatch.baseSequence()
                         + " for partition " + batch.topicPartition + " is 
going to become negative: " + newSequence);
 
-            log.info("Resetting sequence number of batch with current sequence 
{} for partition {} to {}", inFlightBatch.baseSequence(), batch.topicPartition, 
newSequence);

Review comment:
       why should we remove this log? 

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -113,16 +113,7 @@ private TopicPartitionEntry getPartition(TopicPartition 
topicPartition) {
         }
 
         private TopicPartitionEntry getOrCreatePartition(TopicPartition 
topicPartition) {
-            TopicPartitionEntry ent = topicPartitions.get(topicPartition);
-            if (ent == null) {
-                ent = new TopicPartitionEntry();
-                topicPartitions.put(topicPartition, ent);
-            }
-            return ent;
-        }
-
-        private void addPartition(TopicPartition topicPartition) {
-            this.topicPartitions.putIfAbsent(topicPartition, new 
TopicPartitionEntry());
+            return topicPartitions.putIfAbsent(topicPartition, new 
TopicPartitionEntry());

Review comment:
       I don't think this refactor is correct. `putIfAbsent` will return 
**previous** value of the key. That is, if the current partition is null, after 
`getOrCreatePartition`, it'll return `null`, which is not what we want.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to