jim0987795064 commented on code in PR #20170:
URL: https://github.com/apache/kafka/pull/20170#discussion_r2250078157


##########
clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java:
##########
@@ -96,4 +96,24 @@ public void testRoundRobinWithKeyBytes() {
         assertEquals(10, partitionCount.get(1).intValue());
         assertEquals(10, partitionCount.get(2).intValue());
     }
+
+    @Test
+    public void testRoundRobinWithAbortForNewBatch() throws Exception {
+        final String topicA = "topicA";
+        final String topicB = "topicB";
+
+        Cluster testCluster = new Cluster("clusterId", asList(NODES[0]), 
Collections.emptyList(),
+                Collections.<String>emptySet(), 
Collections.<String>emptySet());
+
+        Partitioner partitioner = new RoundRobinPartitioner();
+
+        //abort for new batch - previous partition should be returned on 
subsequent call
+        //simulate three threads producing to two topics, with race condition 
in producer
+        partitioner.onNewBatch(topicA, testCluster, 7);
+        partitioner.onNewBatch(topicA, testCluster, 8);
+        partitioner.onNewBatch(topicB, testCluster, 1);
+        assertEquals(7, partitioner.partition(topicA, null, null, null, null, 
testCluster));
+        assertEquals(8, partitioner.partition(topicA, null, null, null, null, 
testCluster));
+        assertEquals(1, partitioner.partition(topicB, null, null, null, null, 
testCluster));

Review Comment:
   Hello @ash-at-github, 
   Thanks for this suggestion. I've added a test case for the empty queue 
scenario.
   Let me know if you have more questions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to