dajac commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r449603684



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
##########
@@ -156,145 +66,98 @@ private Builder(short oldestAllowedVersion,
                         int replicaId,
                         IsolationLevel isolationLevel) {
             super(ApiKeys.LIST_OFFSETS, oldestAllowedVersion, 
latestAllowedVersion);
-            this.replicaId = replicaId;
-            this.isolationLevel = isolationLevel;
+            data = new ListOffsetRequestData()
+                    .setIsolationLevel(isolationLevel.id())
+                    .setReplicaId(replicaId);
         }
 
-        public Builder setTargetTimes(Map<TopicPartition, PartitionData> 
partitionTimestamps) {
-            this.partitionTimestamps = partitionTimestamps;
+        public Builder setTargetTimes(List<ListOffsetTopic> topics) {
+            data.setTopics(topics);
             return this;
         }
 
         @Override
         public ListOffsetRequest build(short version) {
-            return new ListOffsetRequest(replicaId, partitionTimestamps, 
isolationLevel, version);
+            return new ListOffsetRequest(version, data);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=ListOffsetRequest")
-               .append(", replicaId=").append(replicaId);
-            if (partitionTimestamps != null) {
-                bld.append(", 
partitionTimestamps=").append(partitionTimestamps);
-            }
-            bld.append(", isolationLevel=").append(isolationLevel);
-            bld.append(")");
-            return bld.toString();
-        }
-    }
-
-    public static final class PartitionData {
-        public final long timestamp;
-        public final int maxNumOffsets; // only supported in v0
-        public final Optional<Integer> currentLeaderEpoch;
-
-        private PartitionData(long timestamp, int maxNumOffsets, 
Optional<Integer> currentLeaderEpoch) {
-            this.timestamp = timestamp;
-            this.maxNumOffsets = maxNumOffsets;
-            this.currentLeaderEpoch = currentLeaderEpoch;
-        }
-
-        // For V0
-        public PartitionData(long timestamp, int maxNumOffsets) {
-            this(timestamp, maxNumOffsets, Optional.empty());
-        }
-
-        public PartitionData(long timestamp, Optional<Integer> 
currentLeaderEpoch) {
-            this(timestamp, 1, currentLeaderEpoch);
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (!(obj instanceof PartitionData)) return false;
-            PartitionData other = (PartitionData) obj;
-            return this.timestamp == other.timestamp &&
-                this.currentLeaderEpoch.equals(other.currentLeaderEpoch);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(timestamp, currentLeaderEpoch);
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("{timestamp: ").append(timestamp).
-                    append(", maxNumOffsets: ").append(maxNumOffsets).
-                    append(", currentLeaderEpoch: 
").append(currentLeaderEpoch).
-                    append("}");
-            return bld.toString();
+            return data.toString();
         }
     }
 
     /**
      * Private constructor with a specified version.
      */
-    private ListOffsetRequest(int replicaId,
-                              Map<TopicPartition, PartitionData> targetTimes,
-                              IsolationLevel isolationLevel,
-                              short version) {
+    private ListOffsetRequest(short version, ListOffsetRequestData data) {
         super(ApiKeys.LIST_OFFSETS, version);
-        this.replicaId = replicaId;
-        this.isolationLevel = isolationLevel;
-        this.partitionTimestamps = targetTimes;
+        this.data = data;
         this.duplicatePartitions = Collections.emptySet();
     }
 
     public ListOffsetRequest(Struct struct, short version) {
         super(ApiKeys.LIST_OFFSETS, version);
-        Set<TopicPartition> duplicatePartitions = new HashSet<>();
-        replicaId = struct.get(REPLICA_ID);
-        isolationLevel = struct.hasField(ISOLATION_LEVEL) ?
-                IsolationLevel.forId(struct.get(ISOLATION_LEVEL)) :
-                IsolationLevel.READ_UNCOMMITTED;
-        partitionTimestamps = new HashMap<>();
-        for (Object topicResponseObj : struct.get(TOPICS)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.get(PARTITION_ID);
-                long timestamp = partitionResponse.get(TIMESTAMP);
-                TopicPartition tp = new TopicPartition(topic, partition);
-
-                int maxNumOffsets = 
partitionResponse.getOrElse(MAX_NUM_OFFSETS, 1);
-                Optional<Integer> currentLeaderEpoch = 
RequestUtils.getLeaderEpoch(partitionResponse, CURRENT_LEADER_EPOCH);
-                PartitionData partitionData = new PartitionData(timestamp, 
maxNumOffsets, currentLeaderEpoch);
-                if (partitionTimestamps.put(tp, partitionData) != null)
+        data = new ListOffsetRequestData(struct, version);
+        duplicatePartitions = new HashSet<>();
+        Set<TopicPartition> partitions = new HashSet<>();
+        for (ListOffsetTopic topic : data.topics()) {
+            for (ListOffsetPartition partition : topic.partitions()) {
+                TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                if (!partitions.add(tp)) {
                     duplicatePartitions.add(tp);
+                }
             }
         }
-        this.duplicatePartitions = duplicatePartitions;
     }
 
     @Override
-    @SuppressWarnings("deprecation")
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {

Review comment:
       It would be great if we could add unit tests for this method and perhaps 
others as well.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3883,21 +3886,24 @@ void handleResponse(AbstractResponse abstractResponse) {
                     ListOffsetResponse response = (ListOffsetResponse) 
abstractResponse;
                     Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets 
= new HashMap<>();
 
-                    for (Entry<TopicPartition, PartitionData> result : 
response.responseData().entrySet()) {
-                        TopicPartition tp = result.getKey();
-                        PartitionData partitionData = result.getValue();
-
-                        KafkaFutureImpl<ListOffsetsResultInfo> future = 
futures.get(tp);
-                        Errors error = partitionData.error;
-                        OffsetSpec offsetRequestSpec = 
topicPartitionOffsets.get(tp);
-                        if (offsetRequestSpec == null) {
-                            future.completeExceptionally(new 
KafkaException("Unexpected topic partition " + tp + " in broker response!"));
-                        } else if 
(MetadataOperationContext.shouldRefreshMetadata(error)) {
-                            retryTopicPartitionOffsets.put(tp, 
offsetRequestSpec);
-                        } else if (error == Errors.NONE) {
-                            future.complete(new 
ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp, 
partitionData.leaderEpoch));
-                        } else {
-                            future.completeExceptionally(error.exception());
+                    for (ListOffsetTopicResponse topic : 
response.responseData()) {
+                        for (ListOffsetPartitionResponse partition : 
topic.partitions()) {
+                            TopicPartition tp = new 
TopicPartition(topic.name(), partition.partitionIndex());
+                            KafkaFutureImpl<ListOffsetsResultInfo> future = 
futures.get(tp);
+                            Errors error = 
Errors.forCode(partition.errorCode());
+                            OffsetSpec offsetRequestSpec = 
topicPartitionOffsets.get(tp);
+                            if (offsetRequestSpec == null) {
+                                future.completeExceptionally(new 
KafkaException("Unexpected topic partition " + tp + " in broker response!"));
+                            } else if 
(MetadataOperationContext.shouldRefreshMetadata(error)) {
+                                retryTopicPartitionOffsets.put(tp, 
offsetRequestSpec);
+                            } else if (error == Errors.NONE) {
+                                Optional<Integer> leaderEpoch = 
(partition.leaderEpoch() == ListOffsetResponse.UNKNOWN_EPOCH) 
+                                        ? Optional.empty() 
+                                        : Optional.of(partition.leaderEpoch());
+                                future.complete(new 
ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch));
+                            } else {
+                                
future.completeExceptionally(error.exception());
+                            }
                         }
                     }

Review comment:
       I just noticed that we don't ensure that all futures of the current 
broker are completed. It would be great to ensure it by using 
`completeUnrealizedFutures` method if `retryTopicPartitionOffsets` is empty. We 
already do this in `alterReplicaLogDirs()` if you want to see an example.
   

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -994,30 +1023,29 @@ public void onSuccess(ClientResponse response, 
RequestFuture<ListOffsetResult> f
      *               value of each partition may be null only for v0. In v1 
and later the ListOffset API would not
      *               return a null timestamp (-1 is returned instead when 
necessary).
      */
-    private void handleListOffsetResponse(Map<TopicPartition, 
ListOffsetRequest.PartitionData> timestampsToSearch,
+    private void handleListOffsetResponse(Map<TopicPartition, 
ListOffsetPartition> timestampsToSearch,
                                           ListOffsetResponse 
listOffsetResponse,
                                           RequestFuture<ListOffsetResult> 
future) {
         Map<TopicPartition, ListOffsetData> fetchedOffsets = new HashMap<>();
         Set<TopicPartition> partitionsToRetry = new HashSet<>();
         Set<String> unauthorizedTopics = new HashSet<>();
 
-        for (Map.Entry<TopicPartition, ListOffsetRequest.PartitionData> entry 
: timestampsToSearch.entrySet()) {
+        Map<TopicPartition, ListOffsetPartitionResponse> partitionsData = 
byTopicPartitions(listOffsetResponse.responseData());

Review comment:
       I wonder if grouping by `TopicPartition` is really necessary here. We 
iterate over `timestampsToSearch` to get the `ListOffsetPartitionResponse` for 
the current `TopicPartition` but we could also iterate over the response set 
directly and thus avoid grouping. Moreover, we always assume that the result 
set contains the `TopicPartition` that we are interested in so it would not 
change the semantic. Am I missing something? What do you think?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
##########
@@ -58,239 +47,54 @@
 public class ListOffsetResponse extends AbstractResponse {
     public static final long UNKNOWN_TIMESTAMP = -1L;
     public static final long UNKNOWN_OFFSET = -1L;
+    public static final int UNKNOWN_EPOCH = 
RecordBatch.NO_PARTITION_LEADER_EPOCH;
 
-    // top level fields
-    private static final Field.ComplexArray TOPICS = new 
Field.ComplexArray("responses",
-            "The listed offsets by topic");
-
-    // topic level fields
-    private static final Field.ComplexArray PARTITIONS = new 
Field.ComplexArray("partition_responses",
-            "The listed offsets by partition");
-
-    // partition level fields
-    // This key is only used by ListOffsetResponse v0
-    @Deprecated
-    private static final Field.Array OFFSETS = new Field.Array("offsets", 
INT64, "A list of offsets.");
-    private static final Field.Int64 TIMESTAMP = new Field.Int64("timestamp",
-            "The timestamp associated with the returned offset");
-    private static final Field.Int64 OFFSET = new Field.Int64("offset",
-            "The offset found");
-
-    private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
-            PARTITION_ID,
-            ERROR_CODE,
-            OFFSETS);
-
-    private static final Field TOPICS_V0 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V0);
-
-    private static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema(
-            TOPICS_V0);
-
-    // V1 bumped for the removal of the offsets array
-    private static final Field PARTITIONS_V1 = PARTITIONS.withFields(
-            PARTITION_ID,
-            ERROR_CODE,
-            TIMESTAMP,
-            OFFSET);
-
-    private static final Field TOPICS_V1 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V1);
-
-    private static final Schema LIST_OFFSET_RESPONSE_V1 = new Schema(
-            TOPICS_V1);
-
-    // V2 bumped for the addition of the throttle time
-    private static final Schema LIST_OFFSET_RESPONSE_V2 = new Schema(
-            THROTTLE_TIME_MS,
-            TOPICS_V1);
-
-    // V3 bumped to indicate that on quota violation brokers send out 
responses before throttling.
-    private static final Schema LIST_OFFSET_RESPONSE_V3 = 
LIST_OFFSET_RESPONSE_V2;
-
-    // V4 bumped for the addition of the current leader epoch in the request 
schema and the
-    // leader epoch in the response partition data
-    private static final Field PARTITIONS_V4 = PARTITIONS.withFields(
-            PARTITION_ID,
-            ERROR_CODE,
-            TIMESTAMP,
-            OFFSET,
-            LEADER_EPOCH);
+    private final ListOffsetResponseData data;
 
-    private static final Field TOPICS_V4 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V4);
-
-    private static final Schema LIST_OFFSET_RESPONSE_V4 = new Schema(
-            THROTTLE_TIME_MS,
-            TOPICS_V4);
-
-    private static final Schema LIST_OFFSET_RESPONSE_V5 = 
LIST_OFFSET_RESPONSE_V4;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, 
LIST_OFFSET_RESPONSE_V2,
-            LIST_OFFSET_RESPONSE_V3, LIST_OFFSET_RESPONSE_V4, 
LIST_OFFSET_RESPONSE_V5};
-    }
-
-    public static final class PartitionData {
-        public final Errors error;
-        // The offsets list is only used in ListOffsetResponse v0.
-        public final List<Long> offsets;
-        public final Long timestamp;
-        public final Long offset;
-        public final Optional<Integer> leaderEpoch;
-
-        /**
-         * Constructor for ListOffsetResponse v0
-         */
-        public PartitionData(Errors error, List<Long> offsets) {
-            this.error = error;
-            this.offsets = offsets;
-            this.timestamp = null;
-            this.offset = null;
-            this.leaderEpoch = Optional.empty();
-        }
-
-        /**
-         * Constructor for ListOffsetResponse v1
-         */
-        public PartitionData(Errors error, long timestamp, long offset, 
Optional<Integer> leaderEpoch) {
-            this.error = error;
-            this.timestamp = timestamp;
-            this.offset = offset;
-            this.offsets = null;
-            this.leaderEpoch = leaderEpoch;
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("PartitionData(").
-                    append("errorCode: ").append(error.code());
-
-            if (offsets == null) {
-                bld.append(", timestamp: ").append(timestamp).
-                        append(", offset: ").append(offset).
-                        append(", leaderEpoch: ").append(leaderEpoch);
-            } else {
-                bld.append(", offsets: ").
-                        append("[").
-                        append(Utils.join(this.offsets, ",")).
-                        append("]");
-            }
-            bld.append(")");
-            return bld.toString();
-        }
+    public ListOffsetResponse(ListOffsetResponseData data) {
+        this.data = data;
     }
 
-    private final int throttleTimeMs;
-    private final Map<TopicPartition, PartitionData> responseData;
-
-    /**
-     * Constructor for all versions without throttle time
-     */
-    public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) 
{
-        this(DEFAULT_THROTTLE_TIME, responseData);
-    }
-
-    public ListOffsetResponse(int throttleTimeMs, Map<TopicPartition, 
PartitionData> responseData) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.responseData = responseData;
-    }
-
-    public ListOffsetResponse(Struct struct) {
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, 
DEFAULT_THROTTLE_TIME);
-        responseData = new HashMap<>();
-        for (Object topicResponseObj : struct.get(TOPICS)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.get(PARTITION_ID);
-                Errors error = 
Errors.forCode(partitionResponse.get(ERROR_CODE));
-                PartitionData partitionData;
-                if (partitionResponse.hasField(OFFSETS)) {
-                    Object[] offsets = partitionResponse.get(OFFSETS);
-                    List<Long> offsetsList = new ArrayList<>();
-                    for (Object offset : offsets)
-                        offsetsList.add((Long) offset);
-                    partitionData = new PartitionData(error, offsetsList);
-                } else {
-                    long timestamp = partitionResponse.get(TIMESTAMP);
-                    long offset = partitionResponse.get(OFFSET);
-                    Optional<Integer> leaderEpoch = 
RequestUtils.getLeaderEpoch(partitionResponse, LEADER_EPOCH);
-                    partitionData = new PartitionData(error, timestamp, 
offset, leaderEpoch);
-                }
-                responseData.put(new TopicPartition(topic, partition), 
partitionData);
-            }
-        }
+    public ListOffsetResponse(Struct struct, short version) {
+        data = new ListOffsetResponseData(struct, version);
     }
 
     @Override
     public int throttleTimeMs() {
-        return throttleTimeMs;
+        return data.throttleTimeMs();
     }
 
-    public Map<TopicPartition, PartitionData> responseData() {
-        return responseData;
+    public ListOffsetResponseData data() {
+        return data;
+    }
+
+    public List<ListOffsetTopicResponse> responseData() {

Review comment:
       I would call this one `topics()` as you did already in the request.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
 
-    val mergedResponseMap = if (version == 0)
+    val topics = if (version == 0)
       handleListOffsetRequestV0(request)
     else
       handleListOffsetRequestV1AndAbove(request)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+      .setThrottleTimeMs(requestThrottleMs)
+      .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      try {
-        val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-          topicPartition = topicPartition,
-          timestamp = partitionData.timestamp,
-          maxNumOffsets = partitionData.maxNumOffsets,
-          isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-          fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-      } catch {
-        // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-        // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderForPartitionException |
-                  _ : KafkaStorageException) =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-            correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-        case e: Throwable =>
-          error("Error while responding to offset request", e)
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-      }
-    }
-    responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-    val correlationId = request.header.correlationId
-    val clientId = request.header.clientId
-    val offsetRequest = request.body[ListOffsetRequest]
-
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        ListOffsetResponse.UNKNOWN_TIMESTAMP,
-        ListOffsetResponse.UNKNOWN_OFFSET,
-        Optional.empty())
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-            s"failed because the partition is duplicated in the request.")
-        (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-          ListOffsetResponse.UNKNOWN_TIMESTAMP,
-          ListOffsetResponse.UNKNOWN_OFFSET,
-          Optional.empty()))
-      } else {
-
-        def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-          (topicPartition, new ListOffsetResponse.PartitionData(
-            e,
-            ListOffsetResponse.UNKNOWN_TIMESTAMP,
-            ListOffsetResponse.UNKNOWN_OFFSET,
-            Optional.empty()))
-        }
-
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
         try {
-          val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
-          val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
-          val isolationLevelOpt = if (isClientRequest)
-            Some(offsetRequest.isolationLevel)
-          else
-            None
-
-          val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
-            partitionData.timestamp,
-            isolationLevelOpt,
-            partitionData.currentLeaderEpoch,
-            fetchOnlyFromLeader)
-
-          val response = foundOpt match {
-            case Some(found) =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
found.timestamp, found.offset, found.leaderEpoch)
-            case None =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
ListOffsetResponse.UNKNOWN_TIMESTAMP,
-                ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
-          }
-          (topicPartition, response)
+          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+            topicPartition = topicPartition,
+            timestamp = partition.timestamp,
+            maxNumOffsets = partition.maxNumOffsets,
+            isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
+            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
         } catch {
-          // NOTE: These exceptions are special cased since these error 
messages are typically transient or the client
-          // would have received a clear exception and there is no value in 
logging the entire stack trace for the same
+          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
+          // are typically transient and there is no value in logging the 
entire stack trace for the same
           case e @ (_ : UnknownTopicOrPartitionException |
                     _ : NotLeaderForPartitionException |
-                    _ : UnknownLeaderEpochException |
-                    _ : FencedLeaderEpochException |
-                    _ : KafkaStorageException |
-                    _ : UnsupportedForMessageFormatException) =>
-            debug(s"Offset request with correlation id $correlationId from 
client $clientId on " +
-                s"partition $topicPartition failed due to ${e.getMessage}")
-            buildErrorResponse(Errors.forException(e))
-
-          // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
-          case e: OffsetNotAvailableException =>
-            if(request.header.apiVersion >= 5) {
-              buildErrorResponse(Errors.forException(e))
-            } else {
-              buildErrorResponse(Errors.LEADER_NOT_AVAILABLE)
-            }
-
+                    _ : KafkaStorageException) =>
+            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
+              correlationId, clientId, topicPartition, e.getMessage))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
           case e: Throwable =>
             error("Error while responding to offset request", e)
-            buildErrorResponse(Errors.forException(e))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)

Review comment:
       Same comment as above.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
 
-    val mergedResponseMap = if (version == 0)
+    val topics = if (version == 0)
       handleListOffsetRequestV0(request)
     else
       handleListOffsetRequestV1AndAbove(request)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+      .setThrottleTimeMs(requestThrottleMs)
+      .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      try {
-        val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-          topicPartition = topicPartition,
-          timestamp = partitionData.timestamp,
-          maxNumOffsets = partitionData.maxNumOffsets,
-          isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-          fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-      } catch {
-        // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-        // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderForPartitionException |
-                  _ : KafkaStorageException) =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-            correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-        case e: Throwable =>
-          error("Error while responding to offset request", e)
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-      }
-    }
-    responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-    val correlationId = request.header.correlationId
-    val clientId = request.header.clientId
-    val offsetRequest = request.body[ListOffsetRequest]
-
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        ListOffsetResponse.UNKNOWN_TIMESTAMP,
-        ListOffsetResponse.UNKNOWN_OFFSET,
-        Optional.empty())
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-            s"failed because the partition is duplicated in the request.")
-        (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-          ListOffsetResponse.UNKNOWN_TIMESTAMP,
-          ListOffsetResponse.UNKNOWN_OFFSET,
-          Optional.empty()))
-      } else {
-
-        def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-          (topicPartition, new ListOffsetResponse.PartitionData(
-            e,
-            ListOffsetResponse.UNKNOWN_TIMESTAMP,
-            ListOffsetResponse.UNKNOWN_OFFSET,
-            Optional.empty()))
-        }
-
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
         try {
-          val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
-          val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
-          val isolationLevelOpt = if (isClientRequest)
-            Some(offsetRequest.isolationLevel)
-          else
-            None
-
-          val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
-            partitionData.timestamp,
-            isolationLevelOpt,
-            partitionData.currentLeaderEpoch,
-            fetchOnlyFromLeader)
-
-          val response = foundOpt match {
-            case Some(found) =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
found.timestamp, found.offset, found.leaderEpoch)
-            case None =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
ListOffsetResponse.UNKNOWN_TIMESTAMP,
-                ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
-          }
-          (topicPartition, response)
+          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+            topicPartition = topicPartition,
+            timestamp = partition.timestamp,
+            maxNumOffsets = partition.maxNumOffsets,
+            isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
+            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
         } catch {
-          // NOTE: These exceptions are special cased since these error 
messages are typically transient or the client
-          // would have received a clear exception and there is no value in 
logging the entire stack trace for the same
+          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
+          // are typically transient and there is no value in logging the 
entire stack trace for the same
           case e @ (_ : UnknownTopicOrPartitionException |
                     _ : NotLeaderForPartitionException |
-                    _ : UnknownLeaderEpochException |
-                    _ : FencedLeaderEpochException |
-                    _ : KafkaStorageException |
-                    _ : UnsupportedForMessageFormatException) =>
-            debug(s"Offset request with correlation id $correlationId from 
client $clientId on " +
-                s"partition $topicPartition failed due to ${e.getMessage}")
-            buildErrorResponse(Errors.forException(e))
-
-          // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
-          case e: OffsetNotAvailableException =>
-            if(request.header.apiVersion >= 5) {
-              buildErrorResponse(Errors.forException(e))
-            } else {
-              buildErrorResponse(Errors.LEADER_NOT_AVAILABLE)
-            }
-
+                    _ : KafkaStorageException) =>
+            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
+              correlationId, clientId, topicPartition, e.getMessage))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)

Review comment:
       I think `oldStyleOffsets` is an empty array by default.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3883,21 +3886,24 @@ void handleResponse(AbstractResponse abstractResponse) {
                     ListOffsetResponse response = (ListOffsetResponse) 
abstractResponse;
                     Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets 
= new HashMap<>();
 
-                    for (Entry<TopicPartition, PartitionData> result : 
response.responseData().entrySet()) {
-                        TopicPartition tp = result.getKey();
-                        PartitionData partitionData = result.getValue();
-
-                        KafkaFutureImpl<ListOffsetsResultInfo> future = 
futures.get(tp);
-                        Errors error = partitionData.error;
-                        OffsetSpec offsetRequestSpec = 
topicPartitionOffsets.get(tp);
-                        if (offsetRequestSpec == null) {
-                            future.completeExceptionally(new 
KafkaException("Unexpected topic partition " + tp + " in broker response!"));
-                        } else if 
(MetadataOperationContext.shouldRefreshMetadata(error)) {
-                            retryTopicPartitionOffsets.put(tp, 
offsetRequestSpec);
-                        } else if (error == Errors.NONE) {
-                            future.complete(new 
ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp, 
partitionData.leaderEpoch));
-                        } else {
-                            future.completeExceptionally(error.exception());
+                    for (ListOffsetTopicResponse topic : 
response.responseData()) {
+                        for (ListOffsetPartitionResponse partition : 
topic.partitions()) {
+                            TopicPartition tp = new 
TopicPartition(topic.name(), partition.partitionIndex());
+                            KafkaFutureImpl<ListOffsetsResultInfo> future = 
futures.get(tp);
+                            Errors error = 
Errors.forCode(partition.errorCode());
+                            OffsetSpec offsetRequestSpec = 
topicPartitionOffsets.get(tp);
+                            if (offsetRequestSpec == null) {
+                                future.completeExceptionally(new 
KafkaException("Unexpected topic partition " + tp + " in broker response!"));

Review comment:
       This is not related to your PR at all. It seems that if 
`offsetRequestSpec` is `null` here, `future` will be `null` as well cause 
`futures` is initialised based on `topicPartitionOffsets`. If it turns out to 
be correct, it may be better to just log a warning here like we do in 
`createTopics()`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -965,11 +994,11 @@ public void onFailure(RuntimeException e) {
      * @return A response which can be polled to obtain the corresponding 
timestamps and offsets.
      */
     private RequestFuture<ListOffsetResult> sendListOffsetRequest(final Node 
node,
-                                                                  final 
Map<TopicPartition, ListOffsetRequest.PartitionData> timestampsToSearch,
+                                                                  final 
Map<TopicPartition, ListOffsetPartition> timestampsToSearch,
                                                                   boolean 
requireTimestamp) {
         ListOffsetRequest.Builder builder = ListOffsetRequest.Builder
                 .forConsumer(requireTimestamp, isolationLevel)
-                .setTargetTimes(timestampsToSearch);
+                .setTargetTimes(toListOffsetTopics(timestampsToSearch));

Review comment:
       This conversion is a bit unfortunate as we have to traverse all the 
partitions again to build the `List<ListOffsetTopic>`. Instead, we could 
compute it directly within `groupListOffsetRequests` and could receive 
`Map<Node, List<ListOffsetTopic>` directly here. That seems doable but I may 
have missed something.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
##########
@@ -1581,8 +1582,19 @@ SaslClient createSaslClient() {
 
     @Test
     public void testConvertListOffsetResponseToSaslHandshakeResponse() {
-        ListOffsetResponse response = new ListOffsetResponse(0, 
Collections.singletonMap(new TopicPartition("topic", 0),
-            new ListOffsetResponse.PartitionData(Errors.NONE, 0, 0, 
Optional.empty())));
+        ListOffsetResponseData data = new ListOffsetResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Collections.singletonList(new 
ListOffsetTopicResponse()
+                        .setName("topic")
+                        .setPartitions(Collections.singletonList(new 
ListOffsetPartitionResponse()
+                                .setErrorCode(Errors.NONE.code())
+                                
.setLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH)
+                                .setPartitionIndex(0)
+                                .setOffset(0)
+                                .setTimestamp(0)))));
+        ListOffsetResponse response = new ListOffsetResponse(data);
+//        ListOffsetResponse response = new ListOffsetResponse(0, 
Collections.singletonMap(new TopicPartition("topic", 0),
+//            new ListOffsetResponse.PartitionData(Errors.NONE, 0, 0, 
Optional.empty())));

Review comment:
       Can we remove these two?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
 
-    val mergedResponseMap = if (version == 0)
+    val topics = if (version == 0)
       handleListOffsetRequestV0(request)
     else
       handleListOffsetRequestV1AndAbove(request)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+      .setThrottleTimeMs(requestThrottleMs)
+      .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      try {
-        val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-          topicPartition = topicPartition,
-          timestamp = partitionData.timestamp,
-          maxNumOffsets = partitionData.maxNumOffsets,
-          isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-          fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-      } catch {
-        // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-        // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderForPartitionException |
-                  _ : KafkaStorageException) =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-            correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-        case e: Throwable =>
-          error("Error while responding to offset request", e)
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-      }
-    }
-    responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-    val correlationId = request.header.correlationId
-    val clientId = request.header.clientId
-    val offsetRequest = request.body[ListOffsetRequest]
-
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        ListOffsetResponse.UNKNOWN_TIMESTAMP,
-        ListOffsetResponse.UNKNOWN_OFFSET,
-        Optional.empty())
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-            s"failed because the partition is duplicated in the request.")
-        (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-          ListOffsetResponse.UNKNOWN_TIMESTAMP,
-          ListOffsetResponse.UNKNOWN_OFFSET,
-          Optional.empty()))
-      } else {
-
-        def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-          (topicPartition, new ListOffsetResponse.PartitionData(
-            e,
-            ListOffsetResponse.UNKNOWN_TIMESTAMP,
-            ListOffsetResponse.UNKNOWN_OFFSET,
-            Optional.empty()))
-        }
-
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)

Review comment:
       nit: Could we split the line as before?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
 
-    val mergedResponseMap = if (version == 0)
+    val topics = if (version == 0)
       handleListOffsetRequestV0(request)
     else
       handleListOffsetRequestV1AndAbove(request)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+      .setThrottleTimeMs(requestThrottleMs)
+      .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      try {
-        val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-          topicPartition = topicPartition,
-          timestamp = partitionData.timestamp,
-          maxNumOffsets = partitionData.maxNumOffsets,
-          isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-          fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-      } catch {
-        // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-        // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderForPartitionException |
-                  _ : KafkaStorageException) =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-            correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-        case e: Throwable =>
-          error("Error while responding to offset request", e)
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-      }
-    }
-    responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-    val correlationId = request.header.correlationId
-    val clientId = request.header.clientId
-    val offsetRequest = request.body[ListOffsetRequest]
-
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        ListOffsetResponse.UNKNOWN_TIMESTAMP,
-        ListOffsetResponse.UNKNOWN_OFFSET,
-        Optional.empty())
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-            s"failed because the partition is duplicated in the request.")
-        (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-          ListOffsetResponse.UNKNOWN_TIMESTAMP,
-          ListOffsetResponse.UNKNOWN_OFFSET,
-          Optional.empty()))
-      } else {
-
-        def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-          (topicPartition, new ListOffsetResponse.PartitionData(
-            e,
-            ListOffsetResponse.UNKNOWN_TIMESTAMP,
-            ListOffsetResponse.UNKNOWN_OFFSET,
-            Optional.empty()))
-        }
-
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()

Review comment:
       nit: The indentation looks weird.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3220,12 +3286,30 @@ public void testListOffsetsMetadataRetriableErrors() 
throws Exception {
             env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
 
             // listoffsets response from broker 0
-            Map<TopicPartition, PartitionData> responseData = new HashMap<>();
-            responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 345L, 
Optional.of(543)));
+            ListOffsetTopicResponse t0 = new ListOffsetTopicResponse()
+                    .setName(tp0.topic())
+                    .setPartitions(Collections.singletonList(new 
ListOffsetPartitionResponse()
+                            .setPartitionIndex(tp0.partition())
+                            .setErrorCode(Errors.NONE.code())
+                            .setTimestamp(-1L)
+                            .setOffset(345L)
+                            .setLeaderEpoch(543)));

Review comment:
       nit: What about creating a small helper to create a 
`ListOffsetTopicResponse` for a given `TopicPartition` & co? That would reduce 
the boilerplate code.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
 
-    val mergedResponseMap = if (version == 0)
+    val topics = if (version == 0)
       handleListOffsetRequestV0(request)
     else
       handleListOffsetRequestV1AndAbove(request)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+      .setThrottleTimeMs(requestThrottleMs)
+      .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      try {
-        val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-          topicPartition = topicPartition,
-          timestamp = partitionData.timestamp,
-          maxNumOffsets = partitionData.maxNumOffsets,
-          isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-          fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-      } catch {
-        // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-        // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderForPartitionException |
-                  _ : KafkaStorageException) =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-            correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-        case e: Throwable =>
-          error("Error while responding to offset request", e)
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-      }
-    }
-    responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-    val correlationId = request.header.correlationId
-    val clientId = request.header.clientId
-    val offsetRequest = request.body[ListOffsetRequest]
-
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        ListOffsetResponse.UNKNOWN_TIMESTAMP,
-        ListOffsetResponse.UNKNOWN_OFFSET,
-        Optional.empty())
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-            s"failed because the partition is duplicated in the request.")
-        (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-          ListOffsetResponse.UNKNOWN_TIMESTAMP,
-          ListOffsetResponse.UNKNOWN_OFFSET,
-          Optional.empty()))
-      } else {
-
-        def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-          (topicPartition, new ListOffsetResponse.PartitionData(
-            e,
-            ListOffsetResponse.UNKNOWN_TIMESTAMP,
-            ListOffsetResponse.UNKNOWN_OFFSET,
-            Optional.empty()))
-        }
-
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
         try {
-          val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
-          val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
-          val isolationLevelOpt = if (isClientRequest)
-            Some(offsetRequest.isolationLevel)
-          else
-            None
-
-          val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
-            partitionData.timestamp,
-            isolationLevelOpt,
-            partitionData.currentLeaderEpoch,
-            fetchOnlyFromLeader)
-
-          val response = foundOpt match {
-            case Some(found) =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
found.timestamp, found.offset, found.leaderEpoch)
-            case None =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
ListOffsetResponse.UNKNOWN_TIMESTAMP,
-                ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
-          }
-          (topicPartition, response)
+          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+            topicPartition = topicPartition,
+            timestamp = partition.timestamp,
+            maxNumOffsets = partition.maxNumOffsets,
+            isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
+            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
         } catch {
-          // NOTE: These exceptions are special cased since these error 
messages are typically transient or the client
-          // would have received a clear exception and there is no value in 
logging the entire stack trace for the same
+          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
+          // are typically transient and there is no value in logging the 
entire stack trace for the same
           case e @ (_ : UnknownTopicOrPartitionException |
                     _ : NotLeaderForPartitionException |
-                    _ : UnknownLeaderEpochException |
-                    _ : FencedLeaderEpochException |
-                    _ : KafkaStorageException |
-                    _ : UnsupportedForMessageFormatException) =>
-            debug(s"Offset request with correlation id $correlationId from 
client $clientId on " +
-                s"partition $topicPartition failed due to ${e.getMessage}")
-            buildErrorResponse(Errors.forException(e))
-
-          // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
-          case e: OffsetNotAvailableException =>
-            if(request.header.apiVersion >= 5) {
-              buildErrorResponse(Errors.forException(e))
-            } else {
-              buildErrorResponse(Errors.LEADER_NOT_AVAILABLE)
-            }
-
+                    _ : KafkaStorageException) =>
+            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
+              correlationId, clientId, topicPartition, e.getMessage))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
           case e: Throwable =>
             error("Error while responding to offset request", e)
-            buildErrorResponse(Errors.forException(e))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
+        }
+      }
+      new 
ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava)
+    }
+    (responseTopics ++ unauthorizedResponseStatus).toList
+  }
+
+  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): List[ListOffsetTopicResponse] = {
+    val correlationId = request.header.correlationId
+    val clientId = request.header.clientId
+    val offsetRequest = request.body[ListOffsetRequest]
+
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)

Review comment:
       nit: Could we split the line?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
 
-    val mergedResponseMap = if (version == 0)
+    val topics = if (version == 0)
       handleListOffsetRequestV0(request)
     else
       handleListOffsetRequestV1AndAbove(request)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+      .setThrottleTimeMs(requestThrottleMs)
+      .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      try {
-        val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-          topicPartition = topicPartition,
-          timestamp = partitionData.timestamp,
-          maxNumOffsets = partitionData.maxNumOffsets,
-          isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-          fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-      } catch {
-        // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-        // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderForPartitionException |
-                  _ : KafkaStorageException) =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-            correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-        case e: Throwable =>
-          error("Error while responding to offset request", e)
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-      }
-    }
-    responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-    val correlationId = request.header.correlationId
-    val clientId = request.header.clientId
-    val offsetRequest = request.body[ListOffsetRequest]
-
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        ListOffsetResponse.UNKNOWN_TIMESTAMP,
-        ListOffsetResponse.UNKNOWN_OFFSET,
-        Optional.empty())
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-            s"failed because the partition is duplicated in the request.")
-        (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-          ListOffsetResponse.UNKNOWN_TIMESTAMP,
-          ListOffsetResponse.UNKNOWN_OFFSET,
-          Optional.empty()))
-      } else {
-
-        def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-          (topicPartition, new ListOffsetResponse.PartitionData(
-            e,
-            ListOffsetResponse.UNKNOWN_TIMESTAMP,
-            ListOffsetResponse.UNKNOWN_OFFSET,
-            Optional.empty()))
-        }
-
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
         try {
-          val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
-          val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
-          val isolationLevelOpt = if (isClientRequest)
-            Some(offsetRequest.isolationLevel)
-          else
-            None
-
-          val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
-            partitionData.timestamp,
-            isolationLevelOpt,
-            partitionData.currentLeaderEpoch,
-            fetchOnlyFromLeader)
-
-          val response = foundOpt match {
-            case Some(found) =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
found.timestamp, found.offset, found.leaderEpoch)
-            case None =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
ListOffsetResponse.UNKNOWN_TIMESTAMP,
-                ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
-          }
-          (topicPartition, response)
+          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+            topicPartition = topicPartition,
+            timestamp = partition.timestamp,
+            maxNumOffsets = partition.maxNumOffsets,
+            isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
+            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
         } catch {
-          // NOTE: These exceptions are special cased since these error 
messages are typically transient or the client
-          // would have received a clear exception and there is no value in 
logging the entire stack trace for the same
+          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
+          // are typically transient and there is no value in logging the 
entire stack trace for the same
           case e @ (_ : UnknownTopicOrPartitionException |
                     _ : NotLeaderForPartitionException |
-                    _ : UnknownLeaderEpochException |
-                    _ : FencedLeaderEpochException |
-                    _ : KafkaStorageException |
-                    _ : UnsupportedForMessageFormatException) =>
-            debug(s"Offset request with correlation id $correlationId from 
client $clientId on " +
-                s"partition $topicPartition failed due to ${e.getMessage}")
-            buildErrorResponse(Errors.forException(e))
-
-          // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
-          case e: OffsetNotAvailableException =>
-            if(request.header.apiVersion >= 5) {
-              buildErrorResponse(Errors.forException(e))
-            } else {
-              buildErrorResponse(Errors.LEADER_NOT_AVAILABLE)
-            }
-
+                    _ : KafkaStorageException) =>
+            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
+              correlationId, clientId, topicPartition, e.getMessage))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
           case e: Throwable =>
             error("Error while responding to offset request", e)
-            buildErrorResponse(Errors.forException(e))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
+        }
+      }
+      new 
ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava)
+    }
+    (responseTopics ++ unauthorizedResponseStatus).toList
+  }
+
+  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): List[ListOffsetTopicResponse] = {
+    val correlationId = request.header.correlationId
+    val clientId = request.header.clientId
+    val offsetRequest = request.body[ListOffsetRequest]
+
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+            .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
+        if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
+          debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
+              s"failed because the partition is duplicated in the request.")
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.INVALID_REQUEST.code)
+            .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+            .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+        } else {
+  
+          def buildErrorResponse(e: Errors): ListOffsetPartitionResponse = {
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(e.code)
+              .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+              .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+          }
+  
+          try {
+            val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
+            val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
+            val isolationLevelOpt = if (isClientRequest)
+              Some(offsetRequest.isolationLevel)
+            else
+              None
+  
+            val foundOpt = 
replicaManager.fetchOffsetForTimestamp(topicPartition,
+              partition.timestamp,
+              isolationLevelOpt,
+              if (partition.currentLeaderEpoch == 
ListOffsetResponse.UNKNOWN_EPOCH) Optional.empty() else 
Optional.of(partition.currentLeaderEpoch),
+              fetchOnlyFromLeader)
+  
+            val response = foundOpt match {
+              case Some(found) => {
+                val partitionResponse = new ListOffsetPartitionResponse()
+                  .setPartitionIndex(partition.partitionIndex)
+                  .setErrorCode(Errors.NONE.code)
+                  .setTimestamp(found.timestamp)
+                  .setOffset(found.offset)
+                if (found.leaderEpoch.isPresent)
+                  partitionResponse.setLeaderEpoch(found.leaderEpoch.get)
+                partitionResponse
+              }
+              case None =>
+                new ListOffsetPartitionResponse()

Review comment:
       Replace by `buildErrorResponse`.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
 
-    val mergedResponseMap = if (version == 0)
+    val topics = if (version == 0)
       handleListOffsetRequestV0(request)
     else
       handleListOffsetRequestV1AndAbove(request)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+      .setThrottleTimeMs(requestThrottleMs)
+      .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      try {
-        val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-          topicPartition = topicPartition,
-          timestamp = partitionData.timestamp,
-          maxNumOffsets = partitionData.maxNumOffsets,
-          isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-          fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-      } catch {
-        // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-        // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderForPartitionException |
-                  _ : KafkaStorageException) =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-            correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-        case e: Throwable =>
-          error("Error while responding to offset request", e)
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-      }
-    }
-    responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-    val correlationId = request.header.correlationId
-    val clientId = request.header.clientId
-    val offsetRequest = request.body[ListOffsetRequest]
-
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        ListOffsetResponse.UNKNOWN_TIMESTAMP,
-        ListOffsetResponse.UNKNOWN_OFFSET,
-        Optional.empty())
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-            s"failed because the partition is duplicated in the request.")
-        (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-          ListOffsetResponse.UNKNOWN_TIMESTAMP,
-          ListOffsetResponse.UNKNOWN_OFFSET,
-          Optional.empty()))
-      } else {
-
-        def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-          (topicPartition, new ListOffsetResponse.PartitionData(
-            e,
-            ListOffsetResponse.UNKNOWN_TIMESTAMP,
-            ListOffsetResponse.UNKNOWN_OFFSET,
-            Optional.empty()))
-        }
-
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
         try {
-          val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
-          val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
-          val isolationLevelOpt = if (isClientRequest)
-            Some(offsetRequest.isolationLevel)
-          else
-            None
-
-          val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
-            partitionData.timestamp,
-            isolationLevelOpt,
-            partitionData.currentLeaderEpoch,
-            fetchOnlyFromLeader)
-
-          val response = foundOpt match {
-            case Some(found) =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
found.timestamp, found.offset, found.leaderEpoch)
-            case None =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
ListOffsetResponse.UNKNOWN_TIMESTAMP,
-                ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
-          }
-          (topicPartition, response)
+          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+            topicPartition = topicPartition,
+            timestamp = partition.timestamp,
+            maxNumOffsets = partition.maxNumOffsets,
+            isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
+            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
         } catch {
-          // NOTE: These exceptions are special cased since these error 
messages are typically transient or the client
-          // would have received a clear exception and there is no value in 
logging the entire stack trace for the same
+          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
+          // are typically transient and there is no value in logging the 
entire stack trace for the same
           case e @ (_ : UnknownTopicOrPartitionException |
                     _ : NotLeaderForPartitionException |
-                    _ : UnknownLeaderEpochException |
-                    _ : FencedLeaderEpochException |
-                    _ : KafkaStorageException |
-                    _ : UnsupportedForMessageFormatException) =>
-            debug(s"Offset request with correlation id $correlationId from 
client $clientId on " +
-                s"partition $topicPartition failed due to ${e.getMessage}")
-            buildErrorResponse(Errors.forException(e))
-
-          // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
-          case e: OffsetNotAvailableException =>
-            if(request.header.apiVersion >= 5) {
-              buildErrorResponse(Errors.forException(e))
-            } else {
-              buildErrorResponse(Errors.LEADER_NOT_AVAILABLE)
-            }
-
+                    _ : KafkaStorageException) =>
+            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
+              correlationId, clientId, topicPartition, e.getMessage))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
           case e: Throwable =>
             error("Error while responding to offset request", e)
-            buildErrorResponse(Errors.forException(e))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
+        }
+      }
+      new 
ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava)
+    }
+    (responseTopics ++ unauthorizedResponseStatus).toList
+  }
+
+  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): List[ListOffsetTopicResponse] = {
+    val correlationId = request.header.correlationId
+    val clientId = request.header.clientId
+    val offsetRequest = request.body[ListOffsetRequest]
+
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+            .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
+        if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
+          debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
+              s"failed because the partition is duplicated in the request.")
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.INVALID_REQUEST.code)
+            .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+            .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+        } else {
+  
+          def buildErrorResponse(e: Errors): ListOffsetPartitionResponse = {
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(e.code)
+              .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+              .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+          }
+  
+          try {
+            val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
+            val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
+            val isolationLevelOpt = if (isClientRequest)
+              Some(offsetRequest.isolationLevel)
+            else
+              None
+  
+            val foundOpt = 
replicaManager.fetchOffsetForTimestamp(topicPartition,
+              partition.timestamp,
+              isolationLevelOpt,
+              if (partition.currentLeaderEpoch == 
ListOffsetResponse.UNKNOWN_EPOCH) Optional.empty() else 
Optional.of(partition.currentLeaderEpoch),

Review comment:
       We could use a scala `Option` now.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
 
-    val mergedResponseMap = if (version == 0)
+    val topics = if (version == 0)
       handleListOffsetRequestV0(request)
     else
       handleListOffsetRequestV1AndAbove(request)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+      .setThrottleTimeMs(requestThrottleMs)
+      .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      try {
-        val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-          topicPartition = topicPartition,
-          timestamp = partitionData.timestamp,
-          maxNumOffsets = partitionData.maxNumOffsets,
-          isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-          fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-      } catch {
-        // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-        // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderForPartitionException |
-                  _ : KafkaStorageException) =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-            correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-        case e: Throwable =>
-          error("Error while responding to offset request", e)
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-      }
-    }
-    responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-    val correlationId = request.header.correlationId
-    val clientId = request.header.clientId
-    val offsetRequest = request.body[ListOffsetRequest]
-
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        ListOffsetResponse.UNKNOWN_TIMESTAMP,
-        ListOffsetResponse.UNKNOWN_OFFSET,
-        Optional.empty())
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-            s"failed because the partition is duplicated in the request.")
-        (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-          ListOffsetResponse.UNKNOWN_TIMESTAMP,
-          ListOffsetResponse.UNKNOWN_OFFSET,
-          Optional.empty()))
-      } else {
-
-        def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-          (topicPartition, new ListOffsetResponse.PartitionData(
-            e,
-            ListOffsetResponse.UNKNOWN_TIMESTAMP,
-            ListOffsetResponse.UNKNOWN_OFFSET,
-            Optional.empty()))
-        }
-
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
         try {
-          val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
-          val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
-          val isolationLevelOpt = if (isClientRequest)
-            Some(offsetRequest.isolationLevel)
-          else
-            None
-
-          val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
-            partitionData.timestamp,
-            isolationLevelOpt,
-            partitionData.currentLeaderEpoch,
-            fetchOnlyFromLeader)
-
-          val response = foundOpt match {
-            case Some(found) =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
found.timestamp, found.offset, found.leaderEpoch)
-            case None =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
ListOffsetResponse.UNKNOWN_TIMESTAMP,
-                ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
-          }
-          (topicPartition, response)
+          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+            topicPartition = topicPartition,
+            timestamp = partition.timestamp,
+            maxNumOffsets = partition.maxNumOffsets,
+            isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
+            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
         } catch {
-          // NOTE: These exceptions are special cased since these error 
messages are typically transient or the client
-          // would have received a clear exception and there is no value in 
logging the entire stack trace for the same
+          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
+          // are typically transient and there is no value in logging the 
entire stack trace for the same
           case e @ (_ : UnknownTopicOrPartitionException |
                     _ : NotLeaderForPartitionException |
-                    _ : UnknownLeaderEpochException |
-                    _ : FencedLeaderEpochException |
-                    _ : KafkaStorageException |
-                    _ : UnsupportedForMessageFormatException) =>
-            debug(s"Offset request with correlation id $correlationId from 
client $clientId on " +
-                s"partition $topicPartition failed due to ${e.getMessage}")
-            buildErrorResponse(Errors.forException(e))
-
-          // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
-          case e: OffsetNotAvailableException =>
-            if(request.header.apiVersion >= 5) {
-              buildErrorResponse(Errors.forException(e))
-            } else {
-              buildErrorResponse(Errors.LEADER_NOT_AVAILABLE)
-            }
-
+                    _ : KafkaStorageException) =>
+            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
+              correlationId, clientId, topicPartition, e.getMessage))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
           case e: Throwable =>
             error("Error while responding to offset request", e)
-            buildErrorResponse(Errors.forException(e))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
+        }
+      }
+      new 
ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava)
+    }
+    (responseTopics ++ unauthorizedResponseStatus).toList
+  }
+
+  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): List[ListOffsetTopicResponse] = {
+    val correlationId = request.header.correlationId
+    val clientId = request.header.clientId
+    val offsetRequest = request.body[ListOffsetRequest]
+
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+            .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
+        if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
+          debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
+              s"failed because the partition is duplicated in the request.")
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.INVALID_REQUEST.code)
+            .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+            .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+        } else {
+  
+          def buildErrorResponse(e: Errors): ListOffsetPartitionResponse = {
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(e.code)
+              .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+              .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+          }

Review comment:
       I suggest to move this one up and ensure it is used everywhere.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
 
-    val mergedResponseMap = if (version == 0)
+    val topics = if (version == 0)
       handleListOffsetRequestV0(request)
     else
       handleListOffsetRequestV1AndAbove(request)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+      .setThrottleTimeMs(requestThrottleMs)
+      .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      try {
-        val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-          topicPartition = topicPartition,
-          timestamp = partitionData.timestamp,
-          maxNumOffsets = partitionData.maxNumOffsets,
-          isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-          fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-      } catch {
-        // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-        // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderForPartitionException |
-                  _ : KafkaStorageException) =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-            correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-        case e: Throwable =>
-          error("Error while responding to offset request", e)
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-      }
-    }
-    responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-    val correlationId = request.header.correlationId
-    val clientId = request.header.clientId
-    val offsetRequest = request.body[ListOffsetRequest]
-
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        ListOffsetResponse.UNKNOWN_TIMESTAMP,
-        ListOffsetResponse.UNKNOWN_OFFSET,
-        Optional.empty())
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-            s"failed because the partition is duplicated in the request.")
-        (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-          ListOffsetResponse.UNKNOWN_TIMESTAMP,
-          ListOffsetResponse.UNKNOWN_OFFSET,
-          Optional.empty()))
-      } else {
-
-        def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-          (topicPartition, new ListOffsetResponse.PartitionData(
-            e,
-            ListOffsetResponse.UNKNOWN_TIMESTAMP,
-            ListOffsetResponse.UNKNOWN_OFFSET,
-            Optional.empty()))
-        }
-
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
         try {
-          val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
-          val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
-          val isolationLevelOpt = if (isClientRequest)
-            Some(offsetRequest.isolationLevel)
-          else
-            None
-
-          val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
-            partitionData.timestamp,
-            isolationLevelOpt,
-            partitionData.currentLeaderEpoch,
-            fetchOnlyFromLeader)
-
-          val response = foundOpt match {
-            case Some(found) =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
found.timestamp, found.offset, found.leaderEpoch)
-            case None =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
ListOffsetResponse.UNKNOWN_TIMESTAMP,
-                ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
-          }
-          (topicPartition, response)
+          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+            topicPartition = topicPartition,
+            timestamp = partition.timestamp,
+            maxNumOffsets = partition.maxNumOffsets,
+            isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
+            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
         } catch {
-          // NOTE: These exceptions are special cased since these error 
messages are typically transient or the client
-          // would have received a clear exception and there is no value in 
logging the entire stack trace for the same
+          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
+          // are typically transient and there is no value in logging the 
entire stack trace for the same
           case e @ (_ : UnknownTopicOrPartitionException |
                     _ : NotLeaderForPartitionException |
-                    _ : UnknownLeaderEpochException |
-                    _ : FencedLeaderEpochException |
-                    _ : KafkaStorageException |
-                    _ : UnsupportedForMessageFormatException) =>
-            debug(s"Offset request with correlation id $correlationId from 
client $clientId on " +
-                s"partition $topicPartition failed due to ${e.getMessage}")
-            buildErrorResponse(Errors.forException(e))
-
-          // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
-          case e: OffsetNotAvailableException =>
-            if(request.header.apiVersion >= 5) {
-              buildErrorResponse(Errors.forException(e))
-            } else {
-              buildErrorResponse(Errors.LEADER_NOT_AVAILABLE)
-            }
-
+                    _ : KafkaStorageException) =>
+            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
+              correlationId, clientId, topicPartition, e.getMessage))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
           case e: Throwable =>
             error("Error while responding to offset request", e)
-            buildErrorResponse(Errors.forException(e))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
+        }
+      }
+      new 
ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava)
+    }
+    (responseTopics ++ unauthorizedResponseStatus).toList
+  }
+
+  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): List[ListOffsetTopicResponse] = {
+    val correlationId = request.header.correlationId
+    val clientId = request.header.clientId
+    val offsetRequest = request.body[ListOffsetRequest]
+
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+            .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
+        if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
+          debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
+              s"failed because the partition is duplicated in the request.")
+          new ListOffsetPartitionResponse()

Review comment:
       Replace by `buildErrorResponse`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to