junrao commented on code in PR #16873:
URL: https://github.com/apache/kafka/pull/16873#discussion_r1731530754


##########
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java:
##########
@@ -82,8 +82,13 @@ else if (requireTimestamp)
             return new Builder(minVersion, 
ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);
         }
 
+        public static Builder forReplica(short allowedVersion, int replicaId, 
boolean requireEarliestLocalTimestamp) {
+            short minVersion = requireEarliestLocalTimestamp ? (short) 8 : 
(short) 0;

Review Comment:
   I understand the intention of this logic, but it doesn't seem to do what you 
want. The problem is the following. The client determines the version for a 
request using the following logic in NetworkClient.
   
   ```
           try {
               NodeApiVersions versionInfo = apiVersions.get(nodeId);
               short version;
               // Note: if versionInfo is null, we have no server version 
information. This would be
               // the case when sending the initial ApiVersionRequest which 
fetches the version
               // information itself.  It is also the case when 
discoverBrokerVersions is set to false.
               if (versionInfo == null) {
                   version = builder.latestAllowedVersion();
                   if (discoverBrokerVersions && log.isTraceEnabled())
                       log.trace("No version information found when sending {} 
with correlation id {} to node {}. " +
                               "Assuming version {}.", clientRequest.apiKey(), 
clientRequest.correlationId(), nodeId, version);
               } else {
                   version = 
versionInfo.latestUsableVersion(clientRequest.apiKey(), 
builder.oldestAllowedVersion(),
                           builder.latestAllowedVersion());
               }
   ```
   As you can see, `builder.oldestAllowedVersion()` is only used for 
determining the request version when `versionInfo` is not null. However, in 
BrokerBlockingSender, NetworkClient is created with 
discoverBrokerVersions=false, which means in NetworkClient, `apiVersions` is 
not populated and `versionInfo`  is always null.
   
   To me, if we can gate tier storage with MV, gating ListOffset here is less 
important.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1103,35 +1103,41 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val responseTopics = authorizedRequestInfo.map { topic =>
       val responsePartitions = topic.partitions.asScala.map { partition =>
-        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
-
-        try {
-          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-            topicPartition = topicPartition,
-            timestamp = partition.timestamp,
-            maxNumOffsets = partition.maxNumOffsets,
-            isFromConsumer = offsetRequest.replicaId == 
ListOffsetsRequest.CONSUMER_REPLICA_ID,
-            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetsRequest.DEBUGGING_REPLICA_ID)
+        if (partition.timestamp() < ListOffsetsRequest.EARLIEST_TIMESTAMP) {
           new ListOffsetsPartitionResponse()
             .setPartitionIndex(partition.partitionIndex)
-            .setErrorCode(Errors.NONE.code)
-            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
-        } catch {
-          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderOrFollowerException are special cases since these error messages
-          // are typically transient and there is no value in logging the 
entire stack trace for the same
-          case e @ (_ : UnknownTopicOrPartitionException |
-                    _ : NotLeaderOrFollowerException |
-                    _ : KafkaStorageException) =>
-            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-              correlationId, clientId, topicPartition, e.getMessage))
-            new ListOffsetsPartitionResponse()
-              .setPartitionIndex(partition.partitionIndex)
-              .setErrorCode(Errors.forException(e).code)
-          case e: Throwable =>
-            error("Error while responding to offset request", e)
+            .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+        } else {
+          val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
+
+          try {
+            val offsets = replicaManager.legacyFetchOffsetsForTimestamp(

Review Comment:
   Regarding gating tier storage on MV 3.6, we could potentially pass in a 
remoteLogManagerSupplier instead of a direct remoteLogManager to 
ReplicaManager. The supplier will instantiate remoteLogManager on first usage 
based on the remote storage config and the MV setting at that time. The MV 
initialization happens before the opening of socket server. So, by the time the 
supplier is called, we can be sure that MV has been initialized. This still 
doesn't support enabling remote storage dynamically, but is probably good 
enough in 3.9.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to