mimaison commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r452140714
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion - val mergedResponseMap = if (version == 0) + val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) - sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) + sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] - val partitionTimestamps = offsetRequest.partitionTimestamps.asScala - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { - val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { - // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages - // are typically transient and there is no value in logging the entire stack trace for the same - case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( - correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } - } - responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { - val correlationId = request.header.correlationId - val clientId = request.header.clientId - val offsetRequest = request.body[ListOffsetRequest] - - val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - - val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty()) - } - - val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { - debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + - s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - - def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( - e, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } - + val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name) + + val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic => + new ListOffsetTopicResponse() + .setName(topic.name) + .setPartitions(topic.partitions.asScala.map(partition => + new ListOffsetPartitionResponse() Review comment: It looks weird but it's 2 to the right which should be "correct" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org