jeqo commented on code in PR #19197:
URL: https://github.com/apache/kafka/pull/19197#discussion_r1994541530


##########
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##########
@@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
         this.remoteReadTimer = remoteReadTimer;
     }
 
+    public void cancel() {
+        LOGGER.debug("Cancelling remote log reader for topic partition {}", 
fetchInfo.topicPartition);
+        callback.accept(new RemoteLogReadResult(Optional.empty(), 
Optional.of(new InterruptedException("Cancelled remote log reader"))));
+        this.cancelled = true;
+    }
+
     @Override
     public Void call() {
+        if (cancelled) {

Review Comment:
   I'm failing to see how would passing `cancelled` to the `rlm.read()` work 
here. 
   
   IIUC, when `rlm.read()` is running, it's already late, and we should let the 
task complete. By cancelling before `rlm.read()` is called here at 
`RemoteLogRead.call` it's the only place where it could complete early. Though 
I may be missing something  



##########
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##########
@@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
         this.remoteReadTimer = remoteReadTimer;
     }
 
+    public void cancel() {
+        LOGGER.debug("Cancelling remote log reader for topic partition {}", 
fetchInfo.topicPartition);
+        callback.accept(new RemoteLogReadResult(Optional.empty(), 
Optional.of(new InterruptedException("Cancelled remote log reader"))));

Review Comment:
   Thanks for pointing this out. I'm moving this callback call out of the 
`cancel()`



##########
core/src/main/scala/kafka/server/DelayedRemoteFetch.scala:
##########
@@ -36,6 +37,7 @@ import scala.collection._
  * in the remote fetch operation purgatory
  */
 class DelayedRemoteFetch(remoteFetchTask: Future[Void],
+                         remoteLogReader: RemoteLogReader,

Review Comment:
   Good idea! I'm giving a try to this by defining a `RemoteFetchFuture` that 
includes both futures and the RemoteLogReader to handle all the cancel 
procedure. 



##########
core/src/main/scala/kafka/server/DelayedRemoteFetch.scala:
##########
@@ -88,8 +90,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
 
   override def onExpiration(): Unit = {
     // cancel the remote storage read task, if it has not been executed yet
-    val cancelled = remoteFetchTask.cancel(true)
-    if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: 
$remoteFetchInfo could not be cancelled and its isDone value is 
${remoteFetchTask.isDone}")
+    val cancelled = remoteFetchTask.cancel(false)

Review Comment:
   Aha, thanks for the clarification.
   
   Agree, we could cancel the future, and use the `!future.isCancelled` for 
logging on the `onExpiration()` instead



##########
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##########
@@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
         this.remoteReadTimer = remoteReadTimer;
     }
 
+    public void cancel() {
+        LOGGER.debug("Cancelling remote log reader for topic partition {}", 
fetchInfo.topicPartition);
+        callback.accept(new RemoteLogReadResult(Optional.empty(), 
Optional.of(new InterruptedException("Cancelled remote log reader"))));
+        this.cancelled = true;
+    }
+
     @Override
     public Void call() {
+        if (cancelled) {

Review Comment:
   Another question: would it make sense to make RemoteLogReader extend 
ShutdownableThread? I just wonder if that was the initial suggestion when 
pointing to CleanerThread and other non-interruptible threads. I haven't 
applied this yet as those threads are usually started manually, instead of 
managing them via thread pools IIUC. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to