lianetm commented on code in PR #17755:
URL: https://github.com/apache/kafka/pull/17755#discussion_r1837153202


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java:
##########
@@ -189,169 +143,188 @@ public static void assertRecordEquals(
         }
     }
 
+    private static void assertConsumerGroupHeartbeatResponse(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        ConsumerGroupHeartbeatResponseData expected = 
(ConsumerGroupHeartbeatResponseData) exp.duplicate();
+        ConsumerGroupHeartbeatResponseData actual = 
(ConsumerGroupHeartbeatResponseData) act.duplicate();
+
+        Consumer<ConsumerGroupHeartbeatResponseData> normalize = message -> {
+            if (message.assignment() != null) {
+                
message.assignment().topicPartitions().sort(Comparator.comparing(ConsumerGroupHeartbeatResponseData.TopicPartitions::topicId));
+                message.assignment().topicPartitions().forEach(topic -> 
topic.partitions().sort(Integer::compareTo));
+            }
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
+    }
+
+    private static void assertShareGroupHeartbeatResponse(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        ShareGroupHeartbeatResponseData expected = 
(ShareGroupHeartbeatResponseData) exp.duplicate();
+        ShareGroupHeartbeatResponseData actual = 
(ShareGroupHeartbeatResponseData) act.duplicate();
+
+        Consumer<ShareGroupHeartbeatResponseData> normalize = message -> {
+            if (message.assignment() != null) {
+                
message.assignment().topicPartitions().sort(Comparator.comparing(ShareGroupHeartbeatResponseData.TopicPartitions::topicId));
+                message.assignment().topicPartitions().forEach(topic -> 
topic.partitions().sort(Integer::compareTo));
+            }
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
+    }
+
     private static void assertApiMessageAndVersionEquals(
         ApiMessageAndVersion expected,
         ApiMessageAndVersion actual
     ) {
         if (expected == actual) return;
-
         assertEquals(expected.version(), actual.version());

Review Comment:
   couldn't we have the case of null record value landing here and NPE instead 
of failed assertion? (for when only actual or expected is null, not both) 



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java:
##########
@@ -189,169 +143,188 @@ public static void assertRecordEquals(
         }
     }
 
+    private static void assertConsumerGroupHeartbeatResponse(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        ConsumerGroupHeartbeatResponseData expected = 
(ConsumerGroupHeartbeatResponseData) exp.duplicate();
+        ConsumerGroupHeartbeatResponseData actual = 
(ConsumerGroupHeartbeatResponseData) act.duplicate();
+
+        Consumer<ConsumerGroupHeartbeatResponseData> normalize = message -> {
+            if (message.assignment() != null) {
+                
message.assignment().topicPartitions().sort(Comparator.comparing(ConsumerGroupHeartbeatResponseData.TopicPartitions::topicId));
+                message.assignment().topicPartitions().forEach(topic -> 
topic.partitions().sort(Integer::compareTo));
+            }
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
+    }
+
+    private static void assertShareGroupHeartbeatResponse(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        ShareGroupHeartbeatResponseData expected = 
(ShareGroupHeartbeatResponseData) exp.duplicate();
+        ShareGroupHeartbeatResponseData actual = 
(ShareGroupHeartbeatResponseData) act.duplicate();
+
+        Consumer<ShareGroupHeartbeatResponseData> normalize = message -> {
+            if (message.assignment() != null) {
+                
message.assignment().topicPartitions().sort(Comparator.comparing(ShareGroupHeartbeatResponseData.TopicPartitions::topicId));
+                message.assignment().topicPartitions().forEach(topic -> 
topic.partitions().sort(Integer::compareTo));
+            }
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
+    }
+
     private static void assertApiMessageAndVersionEquals(
         ApiMessageAndVersion expected,
         ApiMessageAndVersion actual
     ) {
         if (expected == actual) return;
-
         assertEquals(expected.version(), actual.version());
+        BiConsumer<ApiMessage, ApiMessage> asserter = API_MESSAGE_COMPARATORS
+            .getOrDefault(expected.message().getClass(), DEFAULT_COMPARATOR);
+        asserter.accept(expected.message(), actual.message());
+    }
 
-        if (actual.message() instanceof 
ConsumerGroupCurrentMemberAssignmentValue) {
-            // The order of the topics stored in 
ConsumerGroupCurrentMemberAssignmentValue is not
-            // always guaranteed. Therefore, we need a special comparator.
-            ConsumerGroupCurrentMemberAssignmentValue expectedValue =
-                (ConsumerGroupCurrentMemberAssignmentValue) expected.message();
-            ConsumerGroupCurrentMemberAssignmentValue actualValue =
-                (ConsumerGroupCurrentMemberAssignmentValue) actual.message();
-
-            assertEquals(expectedValue.memberEpoch(), 
actualValue.memberEpoch());
-            assertEquals(expectedValue.previousMemberEpoch(), 
actualValue.previousMemberEpoch());
-
-            // We transform those to Maps before comparing them.
-            
assertEquals(fromTopicPartitions(expectedValue.assignedPartitions()),
-                fromTopicPartitions(actualValue.assignedPartitions()));
-            
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingRevocation()),
-                
fromTopicPartitions(actualValue.partitionsPendingRevocation()));
-        } else if (actual.message() instanceof 
ConsumerGroupPartitionMetadataValue) {
-            // The order of the racks stored in the PartitionMetadata of the 
ConsumerGroupPartitionMetadataValue
-            // is not always guaranteed. Therefore, we need a special 
comparator.
-            ConsumerGroupPartitionMetadataValue expectedValue =
-                (ConsumerGroupPartitionMetadataValue) 
expected.message().duplicate();
-            ConsumerGroupPartitionMetadataValue actualValue =
-                (ConsumerGroupPartitionMetadataValue) 
actual.message().duplicate();
-
-            List<ConsumerGroupPartitionMetadataValue.TopicMetadata> 
expectedTopicMetadataList =
-                expectedValue.topics();
-            List<ConsumerGroupPartitionMetadataValue.TopicMetadata> 
actualTopicMetadataList =
-                actualValue.topics();
-
-            if (expectedTopicMetadataList.size() != 
actualTopicMetadataList.size()) {
-                fail("Topic metadata lists have different sizes");
-            }
+    private static void assertConsumerGroupCurrentMemberAssignmentValue(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        // The order of the topics stored in 
ConsumerGroupCurrentMemberAssignmentValue is not
+        // always guaranteed. Therefore, we need a special comparator.
+        ConsumerGroupCurrentMemberAssignmentValue expected = 
(ConsumerGroupCurrentMemberAssignmentValue) exp.duplicate();
+        ConsumerGroupCurrentMemberAssignmentValue actual = 
(ConsumerGroupCurrentMemberAssignmentValue) act.duplicate();
+
+        Consumer<ConsumerGroupCurrentMemberAssignmentValue> normalize = 
message -> {
+            
message.assignedPartitions().sort(Comparator.comparing(ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions::topicId));
+            message.assignedPartitions().forEach(topic -> 
topic.partitions().sort(Integer::compareTo));

Review Comment:
   what about this in a helper to have a clear name, and also reused for 
assignedPartitions and partitionsPendingRevocation?
   ```
   sortByTopicIdAndPartition(message.assignedPartitions())
   sortByTopicIdAndPartition(message.partitionsPendingRevocation())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to