guozhangwang commented on a change in pull request #9836:
URL: https://github.com/apache/kafka/pull/9836#discussion_r561449314



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -2041,6 +2052,118 @@ public void testInvalidGroupMetadata() throws 
InterruptedException {
         assertThrows(IllegalStateException.class, consumer::groupMetadata);
     }
 
+    @Test
+    public void testPollMetadata() {
+        final Time time = new MockTime();
+        final SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+        final ConsumerMetadata metadata = createMetadata(subscription);
+        final MockClient client = new MockClient(time, metadata);
+
+        initMetadata(client, singletonMap(topic, 1));
+        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer =
+            newConsumer(time, client, subscription, metadata, assignor, true, 
groupInstanceId);
+
+        consumer.assign(singleton(tp0));
+        consumer.seek(tp0, 50L);
+
+        final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
+        client.prepareResponse(fetchResponse(singletonMap(tp0, fetchInfo)));
+
+        final ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1));
+        assertEquals(5, records.count());
+        assertEquals(55L, consumer.position(tp0));
+
+        // verify that the consumer computes the correct metadata based on the 
fetch response
+        final ConsumerRecords.Metadata actualMetadata = 
records.metadata().get(tp0);
+        assertEquals(100L, (long) actualMetadata.endOffset());
+        assertEquals(55L, (long) actualMetadata.position());
+        assertEquals(45L, (long) actualMetadata.lag());
+        consumer.close(Duration.ZERO);
+    }
+
+
+    @Test
+    public void testPollMetadataWithExtraPartitions() {

Review comment:
       Does the test cover 1) stale epoch, 2) no prev value, cases?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -637,20 +636,32 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
                 } else {
                     List<ConsumerRecord<K, V>> records = 
fetchRecords(nextInLineFetch, recordsRemaining);
 
-                    if (!records.isEmpty()) {
-                        TopicPartition partition = nextInLineFetch.partition;
-                        List<ConsumerRecord<K, V>> currentRecords = 
fetched.get(partition);
-                        if (currentRecords == null) {
-                            fetched.put(partition, records);
-                        } else {
-                            // this case shouldn't usually happen because we 
only send one fetch at a time per partition,
-                            // but it might conceivably happen in some rare 
cases (such as partition leader changes).
-                            // we have to copy to a new list because the old 
one may be immutable
-                            List<ConsumerRecord<K, V>> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());
-                            newRecords.addAll(currentRecords);
-                            newRecords.addAll(records);
-                            fetched.put(partition, newRecords);
+                    TopicPartition partition = nextInLineFetch.partition;
+
+                    if (subscriptions.isAssigned(partition)) {
+                        // initializeCompletedFetch, above, has already 
persisted the metadata from the fetch in the
+                        // SubscriptionState, so we can just read it out, 
which in particular lets us re-use the logic
+                        // for determining the end offset
+                        final long receivedTimestamp = 
nextInLineFetch.receivedTimestamp;
+                        final Long beginningOffset = 
subscriptions.logStartOffset(partition);
+                        final Long endOffset = 
subscriptions.logEndOffset(partition, isolationLevel);
+                        final FetchPosition fetchPosition = 
subscriptions.position(partition);
+
+                        final FetchedRecords.FetchMetadata fetchMetadata = 
fetched.metadata().get(partition);
+                        if (fetchMetadata == null
+                            || 
!fetchMetadata.position().offsetEpoch.isPresent()
+                            || fetchPosition.offsetEpoch.isPresent()
+                            && fetchMetadata.position().offsetEpoch.get() <= 
fetchPosition.offsetEpoch.get()) {

Review comment:
       Interesting, why we do not want to update the metadata if epoch is stale?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -637,20 +636,32 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
                 } else {
                     List<ConsumerRecord<K, V>> records = 
fetchRecords(nextInLineFetch, recordsRemaining);
 
-                    if (!records.isEmpty()) {
-                        TopicPartition partition = nextInLineFetch.partition;
-                        List<ConsumerRecord<K, V>> currentRecords = 
fetched.get(partition);
-                        if (currentRecords == null) {
-                            fetched.put(partition, records);
-                        } else {
-                            // this case shouldn't usually happen because we 
only send one fetch at a time per partition,
-                            // but it might conceivably happen in some rare 
cases (such as partition leader changes).
-                            // we have to copy to a new list because the old 
one may be immutable
-                            List<ConsumerRecord<K, V>> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());
-                            newRecords.addAll(currentRecords);
-                            newRecords.addAll(records);
-                            fetched.put(partition, newRecords);
+                    TopicPartition partition = nextInLineFetch.partition;
+
+                    if (subscriptions.isAssigned(partition)) {
+                        // initializeCompletedFetch, above, has already 
persisted the metadata from the fetch in the
+                        // SubscriptionState, so we can just read it out, 
which in particular lets us re-use the logic
+                        // for determining the end offset
+                        final long receivedTimestamp = 
nextInLineFetch.receivedTimestamp;
+                        final Long beginningOffset = 
subscriptions.logStartOffset(partition);
+                        final Long endOffset = 
subscriptions.logEndOffset(partition, isolationLevel);
+                        final FetchPosition fetchPosition = 
subscriptions.position(partition);
+
+                        final FetchedRecords.FetchMetadata fetchMetadata = 
fetched.metadata().get(partition);
+                        if (fetchMetadata == null
+                            || 
!fetchMetadata.position().offsetEpoch.isPresent()
+                            || fetchPosition.offsetEpoch.isPresent()
+                            && fetchMetadata.position().offsetEpoch.get() <= 
fetchPosition.offsetEpoch.get()) {

Review comment:
       Also, would `<=` accept `null` from 
`fetchMetadata.position().offsetEpoch.get()` on left hand side?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -637,20 +636,32 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
                 } else {
                     List<ConsumerRecord<K, V>> records = 
fetchRecords(nextInLineFetch, recordsRemaining);
 
-                    if (!records.isEmpty()) {
-                        TopicPartition partition = nextInLineFetch.partition;
-                        List<ConsumerRecord<K, V>> currentRecords = 
fetched.get(partition);
-                        if (currentRecords == null) {
-                            fetched.put(partition, records);
-                        } else {
-                            // this case shouldn't usually happen because we 
only send one fetch at a time per partition,
-                            // but it might conceivably happen in some rare 
cases (such as partition leader changes).
-                            // we have to copy to a new list because the old 
one may be immutable
-                            List<ConsumerRecord<K, V>> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());
-                            newRecords.addAll(currentRecords);
-                            newRecords.addAll(records);
-                            fetched.put(partition, newRecords);
+                    TopicPartition partition = nextInLineFetch.partition;
+
+                    if (subscriptions.isAssigned(partition)) {

Review comment:
       Could it ever happen that this condition failed except mocking tests?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
##########
@@ -218,7 +218,21 @@ public synchronized void unsubscribe() {
         }
 
         toClear.forEach(p -> this.records.remove(p));
-        return new ConsumerRecords<>(results);
+
+        final Map<TopicPartition, ConsumerRecords.Metadata> metadata = new 
HashMap<>();
+        for (final TopicPartition partition : 
subscriptions.assignedPartitions()) {
+            if (subscriptions.hasValidPosition(partition) && 
beginningOffsets.containsKey(partition) && endOffsets.containsKey(partition)) {

Review comment:
       Why also check `beginningOffsets` here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to