junrao commented on code in PR #19197: URL: https://github.com/apache/kafka/pull/19197#discussion_r1995965819
########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1544,12 +1544,14 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Option[LogReadResult] = { val key = new TopicPartitionOperationKey(remoteFetchInfo.topicPartition.topic(), remoteFetchInfo.topicPartition.partition()) val remoteFetchResult = new CompletableFuture[RemoteLogReadResult] - var remoteFetchTask: Future[Void] = null + var remoteFetchFuture: RemoteFetchFuture = null try { - remoteFetchTask = remoteLogManager.get.asyncRead(remoteFetchInfo, (result: RemoteLogReadResult) => { + val remoteLogReader = remoteLogManager.get.remoteLogReaderTask(remoteFetchInfo, (result: RemoteLogReadResult) => { remoteFetchResult.complete(result) delayedRemoteFetchPurgatory.checkAndComplete(key) }) + val remoteFetchTask = remoteLogManager.get.asyncRead(remoteLogReader) + remoteFetchFuture = new RemoteFetchFuture(remoteLogReader, remoteFetchTask) Review Comment: Hmm, is this correct? We need to pass in remoteFetchFuture to RemoteFetchFuture, right? ########## core/src/main/scala/kafka/server/DelayedRemoteFetch.scala: ########## @@ -119,6 +118,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void], false) } } else { + // cancel the remote storage read task, if it has not been executed yet + if (!remoteFetchFuture.isDone) + remoteFetchFuture.cancel() Review Comment: Sorry, I didn't realize this branch doesn't depend on the result from the future. So, if the future is not done, we simply return empty. Given this, it's better to call remoteFetchFuture.cancel() in onExpiration() as it was before. ########## core/src/main/java/kafka/log/remote/RemoteLogReader.java: ########## @@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo, this.remoteReadTimer = remoteReadTimer; } + public void cancel() { + LOGGER.debug("Cancelling remote log reader for topic partition {}", fetchInfo.topicPartition); + callback.accept(new RemoteLogReadResult(Optional.empty(), Optional.of(new InterruptedException("Cancelled remote log reader")))); + this.cancelled = true; + } + @Override public Void call() { + if (cancelled) { Review Comment: When a task is cancelled, it could be in the middle of the following code in `rlm.read()`. I was thinking that if we keep checking the cancelled flag during the iteration, we could short-circuit it. ` Utils.readFully(remoteSegInputStream, buffer);` However, this is a rare situation and is probably an over optimization. For now, I think just cancelling the future is enough. We could remove the `cancelled` flag in this class. > Another question: would it make sense to make RemoteLogReader extend ShutdownableThread? We can't easily used that. We will need to implement some sort of a task queue on top of it. It's simpler to just use the ExecutorService. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org