jim0987795064 commented on code in PR #20170: URL: https://github.com/apache/kafka/pull/20170#discussion_r2250078157
########## clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java: ########## @@ -96,4 +96,24 @@ public void testRoundRobinWithKeyBytes() { assertEquals(10, partitionCount.get(1).intValue()); assertEquals(10, partitionCount.get(2).intValue()); } + + @Test + public void testRoundRobinWithAbortForNewBatch() throws Exception { + final String topicA = "topicA"; + final String topicB = "topicB"; + + Cluster testCluster = new Cluster("clusterId", asList(NODES[0]), Collections.emptyList(), + Collections.<String>emptySet(), Collections.<String>emptySet()); + + Partitioner partitioner = new RoundRobinPartitioner(); + + //abort for new batch - previous partition should be returned on subsequent call + //simulate three threads producing to two topics, with race condition in producer + partitioner.onNewBatch(topicA, testCluster, 7); + partitioner.onNewBatch(topicA, testCluster, 8); + partitioner.onNewBatch(topicB, testCluster, 1); + assertEquals(7, partitioner.partition(topicA, null, null, null, null, testCluster)); + assertEquals(8, partitioner.partition(topicA, null, null, null, null, testCluster)); + assertEquals(1, partitioner.partition(topicB, null, null, null, null, testCluster)); Review Comment: Hello @ash-at-github, Thanks for this suggestion. I've added a test case for the empty queue scenario. Let me know if you have more questions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org