dajac commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r454845616
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -3888,25 +3892,40 @@ void handleResponse(AbstractResponse abstractResponse) { ListOffsetResponse response = (ListOffsetResponse) abstractResponse; Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets = new HashMap<>(); - for (Entry<TopicPartition, PartitionData> result : response.responseData().entrySet()) { - TopicPartition tp = result.getKey(); - PartitionData partitionData = result.getValue(); - - KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp); - Errors error = partitionData.error; - OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); - if (offsetRequestSpec == null) { - future.completeExceptionally(new KafkaException("Unexpected topic partition " + tp + " in broker response!")); - } else if (MetadataOperationContext.shouldRefreshMetadata(error)) { - retryTopicPartitionOffsets.put(tp, offsetRequestSpec); - } else if (error == Errors.NONE) { - future.complete(new ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp, partitionData.leaderEpoch)); - } else { - future.completeExceptionally(error.exception()); + for (ListOffsetTopicResponse topic : response.topics()) { + for (ListOffsetPartitionResponse partition : topic.partitions()) { + TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); + KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp); + Errors error = Errors.forCode(partition.errorCode()); + OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp); + if (offsetRequestSpec == null) { + log.warn("Server response mentioned unknown topic partition {}", tp); + } else if (MetadataOperationContext.shouldRefreshMetadata(error)) { + retryTopicPartitionOffsets.put(tp, offsetRequestSpec); + } else if (error == Errors.NONE) { + Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetResponse.UNKNOWN_EPOCH) + ? Optional.empty() + : Optional.of(partition.leaderEpoch()); + future.complete(new ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch)); + } else { + future.completeExceptionally(error.exception()); + } } } - if (!retryTopicPartitionOffsets.isEmpty()) { + if (retryTopicPartitionOffsets.isEmpty()) { + // The server should send back a response for every topic partition. But do a sanity check anyway. + Set<TopicPartition> tpsOnBroker = new HashSet<>(); + for (ListOffsetTopic topic : partitionsToQuery) { + for (ListOffsetPartition partition : topic.partitions()) { + tpsOnBroker.add(new TopicPartition(topic.name(), partition.partitionIndex())); + } + } + completeUnrealizedFutures( + futures.entrySet().stream().filter(entry -> tpsOnBroker.contains(entry.getKey())), + tp -> "The response from broker " + brokerId + + " did not contain a result for topic partition " + tp); Review comment: As we don't have the list of TopicPartition available to filter the list of futures, we could actually directly complete the future within the loop instead of populating the HashSet. It avoids building the HashSet and having to traverse the futures. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -3870,12 +3873,13 @@ public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartit } } - for (final Map.Entry<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> entry: leaders.entrySet()) { + for (final Map.Entry<Node, Map<String, ListOffsetTopic>> entry: leaders.entrySet()) { Review comment: nit: We usually put a space before and after `:`. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -994,30 +1023,29 @@ public void onSuccess(ClientResponse response, RequestFuture<ListOffsetResult> f * value of each partition may be null only for v0. In v1 and later the ListOffset API would not * return a null timestamp (-1 is returned instead when necessary). */ - private void handleListOffsetResponse(Map<TopicPartition, ListOffsetRequest.PartitionData> timestampsToSearch, + private void handleListOffsetResponse(Map<TopicPartition, ListOffsetPartition> timestampsToSearch, ListOffsetResponse listOffsetResponse, RequestFuture<ListOffsetResult> future) { Map<TopicPartition, ListOffsetData> fetchedOffsets = new HashMap<>(); Set<TopicPartition> partitionsToRetry = new HashSet<>(); Set<String> unauthorizedTopics = new HashSet<>(); - for (Map.Entry<TopicPartition, ListOffsetRequest.PartitionData> entry : timestampsToSearch.entrySet()) { + Map<TopicPartition, ListOffsetPartitionResponse> partitionsData = byTopicPartitions(listOffsetResponse.responseData()); Review comment: I agree that we should at minimum avoid hitting a NPE. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -965,11 +994,11 @@ public void onFailure(RuntimeException e) { * @return A response which can be polled to obtain the corresponding timestamps and offsets. */ private RequestFuture<ListOffsetResult> sendListOffsetRequest(final Node node, - final Map<TopicPartition, ListOffsetRequest.PartitionData> timestampsToSearch, + final Map<TopicPartition, ListOffsetPartition> timestampsToSearch, boolean requireTimestamp) { ListOffsetRequest.Builder builder = ListOffsetRequest.Builder .forConsumer(requireTimestamp, isolationLevel) - .setTargetTimes(timestampsToSearch); + .setTargetTimes(toListOffsetTopics(timestampsToSearch)); Review comment: I had a look at this and your are right. It seems that keeping `TopicPartition` is better and difficult to change. In this case, have you considered pushing the conversion to the `Builder` by providing an overload of `setTargetTimes` which accepts a `Map<TopicPartition, ListOffsetPartition>`? That could make the code in the `Fetcher` a bit cleaner. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org