AndrewJSchofield commented on code in PR #17660:
URL: https://github.com/apache/kafka/pull/17660#discussion_r1826110811


##########
share/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -679,9 +679,9 @@ protected void handleRequestResponse(ClientResponse 
response) {
                             // check retryable errors
                             case COORDINATOR_NOT_AVAILABLE:
                             case COORDINATOR_LOAD_IN_PROGRESS:
-                                log.warn("Received retryable error in read 
state RPC: {}", error.message());
+                                log.warn("Received retryable error in read 
state RPC for key {}: {}", partitionKey(), error.message());

Review Comment:
   I know it's not quite a real word, but Kafka uses "retriable" not 
"retryable" such as in `RetriableException`. Please could you change the 
spelling here in the comments and log messages too to make searching easier?



##########
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java:
##########
@@ -637,4 +640,35 @@ public void testTopicPartitionFor() {
         assertEquals(Topic.SHARE_GROUP_STATE_TOPIC_NAME, tp.topic());
         assertEquals(expectedPartition, tp.partition());
     }
+
+    @Test
+    public void testPartitionFor() {
+        CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        ShareCoordinatorService service = new ShareCoordinatorService(
+                new LogContext(),
+                
ShareCoordinatorConfigTest.createConfig(ShareCoordinatorConfigTest.testConfigMap()),
+                runtime,
+                new ShareCoordinatorMetrics(),
+                Time.SYSTEM
+        );
+
+        String groupId = "group1";
+        Uuid topicId = Uuid.randomUuid();
+        int partition = 0;
+
+        // inactive shard should throw exception
+        assertThrows(CoordinatorNotAvailableException.class, () -> 
service.partitionFor(SharePartitionKey.getInstance(groupId, topicId, 
partition)));
+
+        final int numPartitions = 50;
+        service.startup(() -> numPartitions);
+
+        final SharePartitionKey key1 = SharePartitionKey.getInstance(groupId, 
new TopicIdPartition(topicId, partition, null));
+        int sharePartitionKey = service.partitionFor(key1);
+        assertEquals(Utils.abs(key1.asCoordinatorKey().hashCode()) % 
numPartitions, sharePartitionKey);
+
+        // The presence of a topic name should not affect the choice of 
partition
+        final SharePartitionKey key2 = new SharePartitionKey(groupId, new 
TopicIdPartition(topicId, partition, "whatever"));
+        sharePartitionKey = service.partitionFor(key2);
+        assertEquals(Utils.abs(key2.asCoordinatorKey().hashCode()) % 
numPartitions, sharePartitionKey);
+    }

Review Comment:
   Isn't it also true that `assertEquals(key1.asCoordinatorKey(), 
key2.asCoordinatorKey())`? That seems like a sensible assertion too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to