chia7712 commented on code in PR #22264:
URL: https://github.com/apache/kafka/pull/22264#discussion_r3283232737


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java:
##########
@@ -356,4 +359,86 @@ private ByteBuffer generateFutureSubscriptionVersionData() 
{
 
         return buffer;
     }
+
+    @Test
+    public void 
deserializeSubscriptionThrowsSchemaExceptionForEveryTruncation() {
+        Subscription subscription = new Subscription(
+            Arrays.asList("foo", "bar"),
+            ByteBuffer.wrap(new byte[]{0x01, 0x02}),
+            Arrays.asList(new TopicPartition("foo", 0), new 
TopicPartition("bar", 1)),
+            DEFAULT_GENERATION,
+            Optional.of("rack"));
+        ByteBuffer serialized = 
ConsumerProtocol.serializeSubscription(subscription,
+            ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION);
+        byte[] serializedBytes = new byte[serialized.remaining()];
+        serialized.duplicate().get(serializedBytes);
+
+        for (int len = 0; len < serializedBytes.length; len++) {
+            byte[] truncated = Arrays.copyOf(serializedBytes, len);
+            assertThrows(SchemaException.class,
+                () -> 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(truncated)),
+                "Expected SchemaException for subscription truncated to length 
" + len);
+        }
+        assertDoesNotThrow(() -> 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(serializedBytes)));
+    }
+
+    @Test
+    public void deserializeAssignmentThrowsSchemaExceptionForEveryTruncation() 
{
+        Assignment assignment = new Assignment(
+            Arrays.asList(new TopicPartition("foo", 0), new 
TopicPartition("bar", 1)),
+            ByteBuffer.wrap(new byte[]{0x01, 0x02}));
+        ByteBuffer serialized = 
ConsumerProtocol.serializeAssignment(assignment,
+            ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION);
+        byte[] serializedBytes = new byte[serialized.remaining()];
+        serialized.duplicate().get(serializedBytes);
+
+        for (int len = 0; len < serializedBytes.length; len++) {
+            byte[] truncated = Arrays.copyOf(serializedBytes, len);
+            assertThrows(SchemaException.class,
+                () -> 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(truncated)),
+                "Expected SchemaException for assignment truncated to length " 
+ len);
+        }
+        assertDoesNotThrow(() -> 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(serializedBytes)));
+    }
+
+    @Test
+    public void 
deserializeConsumerProtocolSubscriptionThrowsSchemaExceptionForEveryTruncation()
 {
+        Subscription subscription = new Subscription(
+            Arrays.asList("foo", "bar"),
+            ByteBuffer.wrap(new byte[]{0x01, 0x02}),
+            Arrays.asList(new TopicPartition("foo", 0), new 
TopicPartition("bar", 1)),
+            DEFAULT_GENERATION,
+            Optional.of("rack"));
+        ByteBuffer serialized = 
ConsumerProtocol.serializeSubscription(subscription,
+            ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION);
+        byte[] serializedBytes = new byte[serialized.remaining()];
+        serialized.duplicate().get(serializedBytes);
+
+        for (int len = 0; len < serializedBytes.length; len++) {
+            byte[] truncated = Arrays.copyOf(serializedBytes, len);
+            assertThrows(SchemaException.class,
+                () -> 
ConsumerProtocol.deserializeConsumerProtocolSubscription(ByteBuffer.wrap(truncated)),

Review Comment:
   Out of curiosity, in the `classicGroupJoinToConsumerGroup` path, 
`SchemaException` is converted to `IllegalStateException`, which is pretty  
inconsistent with other cases that throw `GroupIdNotFoundException`. Could you 
share more details or the rationale behind this design?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to