guozhangwang commented on a change in pull request #9836:
URL: https://github.com/apache/kafka/pull/9836#discussion_r562288355



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -2041,6 +2052,118 @@ public void testInvalidGroupMetadata() throws 
InterruptedException {
         assertThrows(IllegalStateException.class, consumer::groupMetadata);
     }
 
+    @Test
+    public void testPollMetadata() {
+        final Time time = new MockTime();
+        final SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+        final ConsumerMetadata metadata = createMetadata(subscription);
+        final MockClient client = new MockClient(time, metadata);
+
+        initMetadata(client, singletonMap(topic, 1));
+        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer =
+            newConsumer(time, client, subscription, metadata, assignor, true, 
groupInstanceId);
+
+        consumer.assign(singleton(tp0));
+        consumer.seek(tp0, 50L);
+
+        final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
+        client.prepareResponse(fetchResponse(singletonMap(tp0, fetchInfo)));
+
+        final ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1));
+        assertEquals(5, records.count());
+        assertEquals(55L, consumer.position(tp0));
+
+        // verify that the consumer computes the correct metadata based on the 
fetch response
+        final ConsumerRecords.Metadata actualMetadata = 
records.metadata().get(tp0);
+        assertEquals(100L, (long) actualMetadata.endOffset());
+        assertEquals(55L, (long) actualMetadata.position());
+        assertEquals(45L, (long) actualMetadata.lag());
+        consumer.close(Duration.ZERO);
+    }
+
+
+    @Test
+    public void testPollMetadataWithExtraPartitions() {

Review comment:
       Cool, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to