mimaison commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r469964306
########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ########## @@ -3546,12 +3594,54 @@ private void testGetOffsetsForTimesWithUnknownOffset() { MetadataResponse initialMetadataUpdate = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1)); client.updateMetadata(initialMetadataUpdate); - Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>(); - partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NONE, - ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())); + ListOffsetResponseData data = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(Collections.singletonList(new ListOffsetTopicResponse() + .setName(tp0.topic()) + .setPartitions(Collections.singletonList(new ListOffsetPartitionResponse() + .setPartitionIndex(tp0.partition()) + .setErrorCode(Errors.NONE.code()) + .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP) + .setOffset(ListOffsetResponse.UNKNOWN_OFFSET))))); + + client.prepareResponseFrom(new ListOffsetResponse(data), + metadata.fetch().leaderFor(tp0)); + + Map<TopicPartition, Long> timestampToSearch = new HashMap<>(); + timestampToSearch.put(tp0, 0L); + Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = + fetcher.offsetsForTimes(timestampToSearch, time.timer(Long.MAX_VALUE)); - client.prepareResponseFrom(new ListOffsetResponse(0, partitionData), + assertTrue(offsetAndTimestampMap.containsKey(tp0)); + assertNull(offsetAndTimestampMap.get(tp0)); + } + + @Test + public void testGetOffsetsForTimesWithUnknownOffsetV0() { Review comment: I think so. I used the following test on trunk and it passes: ``` @Test public void testGetOffsetsForTimesWithUnknownOffsetV0() { buildFetcher(); client.reset(); // Ensure metadata has both partition. MetadataResponse initialMetadataUpdate = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1)); client.updateMetadata(initialMetadataUpdate); Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>(); partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NONE, Collections.emptyList())); client.prepareResponseFrom(new ListOffsetResponse(0, partitionData), metadata.fetch().leaderFor(tp0)); Map<TopicPartition, Long> timestampToSearch = new HashMap<>(); timestampToSearch.put(tp0, 0L); Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = fetcher.offsetsForTimes(timestampToSearch, time.timer(Long.MAX_VALUE)); assertTrue(offsetAndTimestampMap.containsKey(tp0)); assertNull(offsetAndTimestampMap.get(tp0)); } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org