abbccdda commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r490509357
##########
File path:
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -2496,46 +2501,81 @@ public void
testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
client.updateMetadata(initialUpdateResponse);
final long fetchTimestamp = 10L;
- Map<TopicPartition, ListOffsetResponse.PartitionData>
allPartitionData = new HashMap<>();
- allPartitionData.put(tp0, new ListOffsetResponse.PartitionData(
- Errors.NONE, fetchTimestamp, 4L, Optional.empty()));
- allPartitionData.put(tp1, new ListOffsetResponse.PartitionData(
- retriableError, ListOffsetRequest.LATEST_TIMESTAMP, -1L,
Optional.empty()));
+ List<ListOffsetTopicResponse> topics = Collections.singletonList(
+ new ListOffsetTopicResponse()
+ .setName(tp0.topic())
+ .setPartitions(Arrays.asList(
+ new ListOffsetPartitionResponse()
+ .setPartitionIndex(tp0.partition())
+ .setErrorCode(Errors.NONE.code())
+ .setTimestamp(fetchTimestamp)
+ .setOffset(4L),
+ new ListOffsetPartitionResponse()
+ .setPartitionIndex(tp1.partition())
+ .setErrorCode(retriableError.code())
+
.setTimestamp(ListOffsetRequest.LATEST_TIMESTAMP)
+ .setOffset(-1L))));
+ ListOffsetResponseData data = new ListOffsetResponseData()
+ .setThrottleTimeMs(0)
+ .setTopics(topics);
client.prepareResponseFrom(body -> {
boolean isListOffsetRequest = body instanceof
ListOffsetRequest;
if (isListOffsetRequest) {
ListOffsetRequest request = (ListOffsetRequest) body;
- Map<TopicPartition, ListOffsetRequest.PartitionData>
expectedTopicPartitions = new HashMap<>();
- expectedTopicPartitions.put(tp0, new
ListOffsetRequest.PartitionData(
- fetchTimestamp, Optional.empty()));
- expectedTopicPartitions.put(tp1, new
ListOffsetRequest.PartitionData(
- fetchTimestamp, Optional.empty()));
-
- return
request.partitionTimestamps().equals(expectedTopicPartitions);
+ List<ListOffsetTopic> expectedTopics =
Collections.singletonList(
+ new ListOffsetTopic()
+ .setName(tp0.topic())
+ .setPartitions(Arrays.asList(
+ new ListOffsetPartition()
+ .setPartitionIndex(tp1.partition())
+ .setTimestamp(fetchTimestamp)
+
.setCurrentLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH),
+ new ListOffsetPartition()
+ .setPartitionIndex(tp0.partition())
+ .setTimestamp(fetchTimestamp)
+
.setCurrentLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH))));
+ return request.topics().equals(expectedTopics);
} else {
return false;
}
- }, new ListOffsetResponse(allPartitionData), originalLeader);
+ }, new ListOffsetResponse(data), originalLeader);
client.prepareMetadataUpdate(updatedMetadata);
// If the metadata wasn't updated before retrying, the fetcher
would consult the original leader and hit a NOT_LEADER exception.
// We will count the answered future response in the end to verify
if this is the case.
- Map<TopicPartition, ListOffsetResponse.PartitionData>
paritionDataWithFatalError = new HashMap<>(allPartitionData);
- paritionDataWithFatalError.put(tp1, new
ListOffsetResponse.PartitionData(
- Errors.NOT_LEADER_OR_FOLLOWER,
ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty()));
- client.prepareResponseFrom(new
ListOffsetResponse(paritionDataWithFatalError), originalLeader);
+ List<ListOffsetTopicResponse> topicsWithFatalError =
Collections.singletonList(
+ new ListOffsetTopicResponse()
+ .setName(tp0.topic())
+ .setPartitions(Arrays.asList(
+ new ListOffsetPartitionResponse()
Review comment:
Could we reuse the struct in L2508?
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -910,136 +913,161 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
val version = request.header.apiVersion
- val mergedResponseMap = if (version == 0)
+ val topics = if (version == 0)
handleListOffsetRequestV0(request)
else
handleListOffsetRequestV1AndAbove(request)
- sendResponseMaybeThrottle(request, requestThrottleMs => new
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+ sendResponseMaybeThrottle(request, requestThrottleMs => new
ListOffsetResponse(new ListOffsetResponseData()
+ .setThrottleTimeMs(requestThrottleMs)
+ .setTopics(topics.asJava)))
}
- private def handleListOffsetRequestV0(request : RequestChannel.Request) :
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+ private def handleListOffsetRequestV0(request : RequestChannel.Request) :
List[ListOffsetTopicResponse] = {
val correlationId = request.header.correlationId
val clientId = request.header.clientId
val offsetRequest = request.body[ListOffsetRequest]
- val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
- val (authorizedRequestInfo, unauthorizedRequestInfo) =
partitionMapByAuthorized(request.context,
- DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
- val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _)
=>
- k -> new
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
Seq.empty[JLong].asJava)
- }
-
- val responseMap = authorizedRequestInfo.map { case (topicPartition,
partitionData) =>
- try {
- val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
- topicPartition = topicPartition,
- timestamp = partitionData.timestamp,
- maxNumOffsets = partitionData.maxNumOffsets,
- isFromConsumer = offsetRequest.replicaId ==
ListOffsetRequest.CONSUMER_REPLICA_ID,
- fetchOnlyFromLeader = offsetRequest.replicaId !=
ListOffsetRequest.DEBUGGING_REPLICA_ID)
- (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE,
offsets.map(JLong.valueOf).asJava))
- } catch {
- // NOTE: UnknownTopicOrPartitionException and
NotLeaderOrFollowerException are special cased since these error messages
- // are typically transient and there is no value in logging the entire
stack trace for the same
- case e @ (_ : UnknownTopicOrPartitionException |
- _ : NotLeaderOrFollowerException |
- _ : KafkaStorageException) =>
- debug("Offset request with correlation id %d from client %s on
partition %s failed due to %s".format(
- correlationId, clientId, topicPartition, e.getMessage))
- (topicPartition, new
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
- case e: Throwable =>
- error("Error while responding to offset request", e)
- (topicPartition, new
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
- }
- }
- responseMap ++ unauthorizedResponseStatus
- }
-
- private def handleListOffsetRequestV1AndAbove(request :
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData]
= {
- val correlationId = request.header.correlationId
- val clientId = request.header.clientId
- val offsetRequest = request.body[ListOffsetRequest]
-
- val (authorizedRequestInfo, unauthorizedRequestInfo) =
partitionMapByAuthorized(request.context,
- DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
+ val (authorizedRequestInfo, unauthorizedRequestInfo) =
partitionSeqByAuthorized(request.context,
+ DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name)
- val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _)
=>
- k -> new
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
- ListOffsetResponse.UNKNOWN_TIMESTAMP,
- ListOffsetResponse.UNKNOWN_OFFSET,
- Optional.empty())
- }
-
- val responseMap = authorizedRequestInfo.map { case (topicPartition,
partitionData) =>
- if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
- debug(s"OffsetRequest with correlation id $correlationId from client
$clientId on partition $topicPartition " +
- s"failed because the partition is duplicated in the request.")
- (topicPartition, new
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
- ListOffsetResponse.UNKNOWN_TIMESTAMP,
- ListOffsetResponse.UNKNOWN_OFFSET,
- Optional.empty()))
- } else {
+ val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+ new ListOffsetTopicResponse()
+ .setName(topic.name)
+ .setPartitions(topic.partitions.asScala.map(partition =>
+ new ListOffsetPartitionResponse()
+ .setPartitionIndex(partition.partitionIndex)
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)).asJava)
+ )
- def buildErrorResponse(e: Errors): (TopicPartition,
ListOffsetResponse.PartitionData) = {
- (topicPartition, new ListOffsetResponse.PartitionData(
- e,
- ListOffsetResponse.UNKNOWN_TIMESTAMP,
- ListOffsetResponse.UNKNOWN_OFFSET,
- Optional.empty()))
- }
+ val responseTopics = authorizedRequestInfo.map { topic =>
+ val responsePartitions = topic.partitions.asScala.map { partition =>
+ val topicPartition = new TopicPartition(topic.name,
partition.partitionIndex)
try {
- val fetchOnlyFromLeader = offsetRequest.replicaId !=
ListOffsetRequest.DEBUGGING_REPLICA_ID
- val isClientRequest = offsetRequest.replicaId ==
ListOffsetRequest.CONSUMER_REPLICA_ID
- val isolationLevelOpt = if (isClientRequest)
- Some(offsetRequest.isolationLevel)
- else
- None
-
- val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
- partitionData.timestamp,
- isolationLevelOpt,
- partitionData.currentLeaderEpoch,
- fetchOnlyFromLeader)
-
- val response = foundOpt match {
- case Some(found) =>
- new ListOffsetResponse.PartitionData(Errors.NONE,
found.timestamp, found.offset, found.leaderEpoch)
- case None =>
- new ListOffsetResponse.PartitionData(Errors.NONE,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
- ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
- }
- (topicPartition, response)
+ val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+ topicPartition = topicPartition,
+ timestamp = partition.timestamp,
+ maxNumOffsets = partition.maxNumOffsets,
+ isFromConsumer = offsetRequest.replicaId ==
ListOffsetRequest.CONSUMER_REPLICA_ID,
+ fetchOnlyFromLeader = offsetRequest.replicaId !=
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+ new ListOffsetPartitionResponse()
+ .setPartitionIndex(partition.partitionIndex)
+ .setErrorCode(Errors.NONE.code)
+ .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
} catch {
- // NOTE: These exceptions are special cased since these error
messages are typically transient or the client
- // would have received a clear exception and there is no value in
logging the entire stack trace for the same
+ // NOTE: UnknownTopicOrPartitionException and
NotLeaderOrFollowerException are special cases since these error messages
Review comment:
Are we missing storage exception here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]