junrao commented on code in PR #16873:
URL: https://github.com/apache/kafka/pull/16873#discussion_r1715685719


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1170,6 +1170,10 @@ class KafkaApis(val requestChannel: RequestChannel,
           debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
               s"failed because the partition is duplicated in the request.")
           buildErrorResponse(Errors.INVALID_REQUEST, partition)
+        } else if (partition.timestamp() == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP && version < 8) {
+          throw new UnsupportedVersionException(s"apiVersion must be >=8 for 
EARLIEST_LOCAL_TIMESTAMP")
+        } else if (partition.timestamp() == 
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP && version < 9) {
+          throw new UnsupportedVersionException(s"apiVersion must be >=9 for 
LATEST_TIERED_TIMESTAMP")

Review Comment:
   This covers the case when a client sets a timestamp constant not supported 
in a version. It only happens if the client is incorrectly implemented. It 
could happen, but is less common.
   
   A more common case is that a correctly implemented client sends a new 
version of the request to an old server that doesn't understand the new 
version. To cover this case, we probably could change the following method in 
UnifiedLog.
   
   ```
     def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: 
Option[RemoteLogManager] = None): Option[TimestampAndOffset] = {
       maybeHandleIOException(s"Error while fetching offset by timestamp for 
$topicPartition in dir ${dir.getParent}") {
        ...
         } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
        ...
         } else {
           // We need to search the first segment whose largest timestamp is >= 
the target timestamp if there is one.
             }
           } else {
           ...
           }
         }
   ```
   
   Currently, if the code falls into the last `else`, it assumes that the 
timestamp is positive. However, this won't be the case if the request is on a 
newer version including a new negative constant. To be more defensive, we could 
throw an UnsupportedVersionException if the timestamp is negative in the `else` 
clause.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to