This is an automated email from the ASF dual-hosted git repository.
nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new dbb7d5fbcc OAK-11109 - Fix: parallel download may fail if the two
download threads finish at the same time. (#1708)
dbb7d5fbcc is described below
commit dbb7d5fbcc66a3a3c0e8cf89c7966800932c8429
Author: Nuno Santos <[email protected]>
AuthorDate: Wed Sep 11 13:51:47 2024 +0200
OAK-11109 - Fix: parallel download may fail if the two download threads
finish at the same time. (#1708)
---
.../pipelined/PipelinedMongoDownloadTask.java | 43 +++++++++++++++-------
1 file changed, 30 insertions(+), 13 deletions(-)
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
index c63a6f10b3..07940e15f0 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
@@ -73,6 +73,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static
org.apache.jackrabbit.oak.plugins.index.IndexUtils.INDEXING_PHASE_LOGGER;
@@ -482,16 +483,24 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
// One of the download tasks has completed. Cancel the
other one.
if (completedTask == ascendingDownloadFuture) {
LOG.info("Ascending download task has completed.
Cancelling descending download task.");
- // If the download thread is blocked on a socket
read waiting for a Mongo response, it will
- // not immediately respond to the interrupt. It
will only check the interrupt status when
- // it returns from the socket read. So in addition
to interrupting the thread, we have to
- // close the Mongo cursor that is executing the
query.
- descendingDownloadFuture.cancel(true);
+ // This closes the Mongo cursor, which will cause
the download task to abort next time it
+ // performs an operation on the cursor or if it is
blocked on the cursor.
descendingDownloadTask.cancelDownload();
+ // In case the thread is not currently operating
on the Mongo cursor, we interrupt the thread
+ // to ensure that it terminates quickly.
+ descendingDownloadFuture.cancel(true);
+ // Notes:
+ // 1. Calling close() on a Mongo cursor will fail
if the cursor was already interrupted. So
+ // we cancel the cursor before interrupting the
thread. Any exception thrown by calling
+ // close() on the cursor will be ignored, but
it's better if we avoid them.
+ // 2. Interrupting the thread is not enough if the
thread is blocked waiting on a socket.
+ // In that state, the thread does not check for
interrupts, it will only check when it
+ // finishes the I/O operation, which can take a
long time. So we need to close the cursor,
+ // which will abort the I/O operation.
} else if (completedTask == descendingDownloadFuture) {
LOG.info("Descending download task has completed.
Cancelling ascending download task.");
- ascendingDownloadFuture.cancel(true);
ascendingDownloadTask.cancelDownload();
+ ascendingDownloadFuture.cancel(true);
} else {
throw new IllegalStateException("Unknown download
task completed: " + completedTask);
}
@@ -614,7 +623,7 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
// Accessed from the main download thread
private volatile long firstModifiedValueSeen = -1;
private volatile MongoCursor<NodeDocument> mongoCursor = null;
- private volatile boolean cancelled = false;
+ private final AtomicBoolean cancelled = new AtomicBoolean(false);
DownloadTask(DownloadOrder downloadOrder, DownloadStageStatistics
downloadStatics) {
this(downloadOrder, downloadStatics, null);
@@ -678,8 +687,8 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
}
downloadCompleted = true;
} catch (IllegalStateException | InterruptedException |
MongoInterruptedException e) {
- if (cancelled) {
- LOG.info("Download task was cancelled.");
+ if (cancelled.get()) {
+ LOG.info("Download task was cancelled: {}",
e.toString());
return;
} else {
throw e;
@@ -744,10 +753,18 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
}
public void cancelDownload() {
- LOG.info("{} Cancelling download, closing Mongo cursor: {}",
downloadOrder, mongoCursor);
- cancelled = true;
- if (mongoCursor != null) {
- mongoCursor.close();
+ boolean alreadyCancelled = cancelled.getAndSet(true);
+ if (alreadyCancelled) {
+ LOG.info("Download task was already cancelled.");
+ } else {
+ LOG.info("Cancelling download for {} order task, closing Mongo
cursor.", downloadOrder);
+ if (mongoCursor != null) {
+ try {
+ mongoCursor.close();
+ } catch (Throwable e) {
+ LOG.info("Error closing Mongo cursor, the cursor may
already be closed: {}", e.toString());
+ }
+ }
}
}