junrao commented on code in PR #19197:
URL: https://github.com/apache/kafka/pull/19197#discussion_r1995965819


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1544,12 +1544,14 @@ class ReplicaManager(val config: KafkaConfig,
                                  fetchPartitionStatus: Seq[(TopicIdPartition, 
FetchPartitionStatus)]): Option[LogReadResult] = {
     val key = new 
TopicPartitionOperationKey(remoteFetchInfo.topicPartition.topic(), 
remoteFetchInfo.topicPartition.partition())
     val remoteFetchResult = new CompletableFuture[RemoteLogReadResult]
-    var remoteFetchTask: Future[Void] = null
+    var remoteFetchFuture: RemoteFetchFuture = null
     try {
-      remoteFetchTask = remoteLogManager.get.asyncRead(remoteFetchInfo, 
(result: RemoteLogReadResult) => {
+      val remoteLogReader = 
remoteLogManager.get.remoteLogReaderTask(remoteFetchInfo, (result: 
RemoteLogReadResult) => {
         remoteFetchResult.complete(result)
         delayedRemoteFetchPurgatory.checkAndComplete(key)
       })
+      val remoteFetchTask = remoteLogManager.get.asyncRead(remoteLogReader)
+      remoteFetchFuture = new RemoteFetchFuture(remoteLogReader, 
remoteFetchTask)

Review Comment:
   Hmm, is this correct? We need to pass in remoteFetchFuture to 
RemoteFetchFuture, right?



##########
core/src/main/scala/kafka/server/DelayedRemoteFetch.scala:
##########
@@ -119,6 +118,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
             false)
         }
       } else {
+        // cancel the remote storage read task, if it has not been executed yet
+        if (!remoteFetchFuture.isDone)
+          remoteFetchFuture.cancel()

Review Comment:
   Sorry, I didn't realize this branch doesn't depend on the result from the 
future. So, if the future is not done, we simply return empty. Given this, it's 
better to call remoteFetchFuture.cancel() in onExpiration() as it was before.



##########
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##########
@@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
         this.remoteReadTimer = remoteReadTimer;
     }
 
+    public void cancel() {
+        LOGGER.debug("Cancelling remote log reader for topic partition {}", 
fetchInfo.topicPartition);
+        callback.accept(new RemoteLogReadResult(Optional.empty(), 
Optional.of(new InterruptedException("Cancelled remote log reader"))));
+        this.cancelled = true;
+    }
+
     @Override
     public Void call() {
+        if (cancelled) {

Review Comment:
   When a task is cancelled, it could be in the middle of the following code in 
`rlm.read()`. I was thinking that if we keep checking the cancelled flag during 
the iteration, we could short-circuit it.
   
   `                Utils.readFully(remoteSegInputStream, buffer);`
   
   However, this is a rare situation and is probably an over optimization. For 
now, I think just cancelling the future is enough. We could remove the 
`cancelled` flag in this class.
   
   > Another question: would it make sense to make RemoteLogReader extend 
ShutdownableThread? 
   We can't easily used that. We will need to implement some sort of a task 
queue on top of it. It's simpler to just use the ExecutorService.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to