jeqo commented on code in PR #19197: URL: https://github.com/apache/kafka/pull/19197#discussion_r1994541530
########## core/src/main/java/kafka/log/remote/RemoteLogReader.java: ########## @@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo, this.remoteReadTimer = remoteReadTimer; } + public void cancel() { + LOGGER.debug("Cancelling remote log reader for topic partition {}", fetchInfo.topicPartition); + callback.accept(new RemoteLogReadResult(Optional.empty(), Optional.of(new InterruptedException("Cancelled remote log reader")))); + this.cancelled = true; + } + @Override public Void call() { + if (cancelled) { Review Comment: I'm failing to see how would passing `cancelled` to the `rlm.read()` work here. IIUC, when `rlm.read()` is running, it's already late, and we should let the task complete. By cancelling before `rlm.read()` is called here at `RemoteLogRead.call` it's the only place where it could complete early. Though I may be missing something ########## core/src/main/java/kafka/log/remote/RemoteLogReader.java: ########## @@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo, this.remoteReadTimer = remoteReadTimer; } + public void cancel() { + LOGGER.debug("Cancelling remote log reader for topic partition {}", fetchInfo.topicPartition); + callback.accept(new RemoteLogReadResult(Optional.empty(), Optional.of(new InterruptedException("Cancelled remote log reader")))); Review Comment: Thanks for pointing this out. I'm moving this callback call out of the `cancel()` ########## core/src/main/scala/kafka/server/DelayedRemoteFetch.scala: ########## @@ -36,6 +37,7 @@ import scala.collection._ * in the remote fetch operation purgatory */ class DelayedRemoteFetch(remoteFetchTask: Future[Void], + remoteLogReader: RemoteLogReader, Review Comment: Good idea! I'm giving a try to this by defining a `RemoteFetchFuture` that includes both futures and the RemoteLogReader to handle all the cancel procedure. ########## core/src/main/scala/kafka/server/DelayedRemoteFetch.scala: ########## @@ -88,8 +90,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void], override def onExpiration(): Unit = { // cancel the remote storage read task, if it has not been executed yet - val cancelled = remoteFetchTask.cancel(true) - if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: $remoteFetchInfo could not be cancelled and its isDone value is ${remoteFetchTask.isDone}") + val cancelled = remoteFetchTask.cancel(false) Review Comment: Aha, thanks for the clarification. Agree, we could cancel the future, and use the `!future.isCancelled` for logging on the `onExpiration()` instead ########## core/src/main/java/kafka/log/remote/RemoteLogReader.java: ########## @@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo, this.remoteReadTimer = remoteReadTimer; } + public void cancel() { + LOGGER.debug("Cancelling remote log reader for topic partition {}", fetchInfo.topicPartition); + callback.accept(new RemoteLogReadResult(Optional.empty(), Optional.of(new InterruptedException("Cancelled remote log reader")))); + this.cancelled = true; + } + @Override public Void call() { + if (cancelled) { Review Comment: Another question: would it make sense to make RemoteLogReader extend ShutdownableThread? I just wonder if that was the initial suggestion when pointing to CleanerThread and other non-interruptible threads. I haven't applied this yet as those threads are usually started manually, instead of managing them via thread pools IIUC. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org