mimaison commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r490941989
########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ########## @@ -2496,46 +2501,81 @@ public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() { client.updateMetadata(initialUpdateResponse); final long fetchTimestamp = 10L; - Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>(); - allPartitionData.put(tp0, new ListOffsetResponse.PartitionData( - Errors.NONE, fetchTimestamp, 4L, Optional.empty())); - allPartitionData.put(tp1, new ListOffsetResponse.PartitionData( - retriableError, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty())); + List<ListOffsetTopicResponse> topics = Collections.singletonList( + new ListOffsetTopicResponse() + .setName(tp0.topic()) + .setPartitions(Arrays.asList( + new ListOffsetPartitionResponse() + .setPartitionIndex(tp0.partition()) + .setErrorCode(Errors.NONE.code()) + .setTimestamp(fetchTimestamp) + .setOffset(4L), + new ListOffsetPartitionResponse() + .setPartitionIndex(tp1.partition()) + .setErrorCode(retriableError.code()) + .setTimestamp(ListOffsetRequest.LATEST_TIMESTAMP) + .setOffset(-1L)))); + ListOffsetResponseData data = new ListOffsetResponseData() + .setThrottleTimeMs(0) + .setTopics(topics); client.prepareResponseFrom(body -> { boolean isListOffsetRequest = body instanceof ListOffsetRequest; if (isListOffsetRequest) { ListOffsetRequest request = (ListOffsetRequest) body; - Map<TopicPartition, ListOffsetRequest.PartitionData> expectedTopicPartitions = new HashMap<>(); - expectedTopicPartitions.put(tp0, new ListOffsetRequest.PartitionData( - fetchTimestamp, Optional.empty())); - expectedTopicPartitions.put(tp1, new ListOffsetRequest.PartitionData( - fetchTimestamp, Optional.empty())); - - return request.partitionTimestamps().equals(expectedTopicPartitions); + List<ListOffsetTopic> expectedTopics = Collections.singletonList( + new ListOffsetTopic() + .setName(tp0.topic()) + .setPartitions(Arrays.asList( + new ListOffsetPartition() + .setPartitionIndex(tp1.partition()) + .setTimestamp(fetchTimestamp) + .setCurrentLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH), + new ListOffsetPartition() + .setPartitionIndex(tp0.partition()) + .setTimestamp(fetchTimestamp) + .setCurrentLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH)))); + return request.topics().equals(expectedTopics); } else { return false; } - }, new ListOffsetResponse(allPartitionData), originalLeader); + }, new ListOffsetResponse(data), originalLeader); client.prepareMetadataUpdate(updatedMetadata); // If the metadata wasn't updated before retrying, the fetcher would consult the original leader and hit a NOT_LEADER exception. // We will count the answered future response in the end to verify if this is the case. - Map<TopicPartition, ListOffsetResponse.PartitionData> paritionDataWithFatalError = new HashMap<>(allPartitionData); - paritionDataWithFatalError.put(tp1, new ListOffsetResponse.PartitionData( - Errors.NOT_LEADER_OR_FOLLOWER, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty())); - client.prepareResponseFrom(new ListOffsetResponse(paritionDataWithFatalError), originalLeader); + List<ListOffsetTopicResponse> topicsWithFatalError = Collections.singletonList( + new ListOffsetTopicResponse() + .setName(tp0.topic()) + .setPartitions(Arrays.asList( + new ListOffsetPartitionResponse() Review comment: Done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org