This is an automated email from the ASF dual-hosted git repository.

nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dbb7d5fbcc OAK-11109 - Fix: parallel download may fail if the two 
download threads finish at the same time. (#1708)
dbb7d5fbcc is described below

commit dbb7d5fbcc66a3a3c0e8cf89c7966800932c8429
Author: Nuno Santos <[email protected]>
AuthorDate: Wed Sep 11 13:51:47 2024 +0200

    OAK-11109 - Fix: parallel download may fail if the two download threads 
finish at the same time. (#1708)
---
 .../pipelined/PipelinedMongoDownloadTask.java      | 43 +++++++++++++++-------
 1 file changed, 30 insertions(+), 13 deletions(-)

diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
index c63a6f10b3..07940e15f0 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
@@ -73,6 +73,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static 
org.apache.jackrabbit.oak.plugins.index.IndexUtils.INDEXING_PHASE_LOGGER;
@@ -482,16 +483,24 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
                         // One of the download tasks has completed. Cancel the 
other one.
                         if (completedTask == ascendingDownloadFuture) {
                             LOG.info("Ascending download task has completed. 
Cancelling descending download task.");
-                            // If the download thread is blocked on a socket 
read waiting for a Mongo response, it will
-                            // not immediately respond to the interrupt. It 
will only check the interrupt status when
-                            // it returns from the socket read. So in addition 
to interrupting the thread, we have to
-                            // close the Mongo cursor that is executing the 
query.
-                            descendingDownloadFuture.cancel(true);
+                            // This closes the Mongo cursor, which will cause 
the download task to abort next time it
+                            // performs an operation on the cursor or if it is 
blocked on the cursor.
                             descendingDownloadTask.cancelDownload();
+                            // In case the thread is not currently operating 
on the Mongo cursor, we interrupt the thread
+                            // to ensure that it terminates quickly.
+                            descendingDownloadFuture.cancel(true);
+                            // Notes:
+                            // 1. Calling close() on a Mongo cursor will fail 
if the cursor was already interrupted. So
+                            //   we cancel the cursor before interrupting the 
thread. Any exception thrown by calling
+                            //   close() on the cursor will be ignored, but 
it's better if we avoid them.
+                            // 2. Interrupting the thread is not enough if the 
thread is blocked waiting on a socket.
+                            //   In that state, the thread does not check for 
interrupts, it will only check when it
+                            //   finishes the I/O operation, which can take a 
long time. So we need to close the cursor,
+                            //   which will abort the I/O operation.
                         } else if (completedTask == descendingDownloadFuture) {
                             LOG.info("Descending download task has completed. 
Cancelling ascending download task.");
-                            ascendingDownloadFuture.cancel(true);
                             ascendingDownloadTask.cancelDownload();
+                            ascendingDownloadFuture.cancel(true);
                         } else {
                             throw new IllegalStateException("Unknown download 
task completed: " + completedTask);
                         }
@@ -614,7 +623,7 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
         // Accessed from the main download thread
         private volatile long firstModifiedValueSeen = -1;
         private volatile MongoCursor<NodeDocument> mongoCursor = null;
-        private volatile boolean cancelled = false;
+        private final AtomicBoolean cancelled = new AtomicBoolean(false);
 
         DownloadTask(DownloadOrder downloadOrder, DownloadStageStatistics 
downloadStatics) {
             this(downloadOrder, downloadStatics, null);
@@ -678,8 +687,8 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
                         }
                         downloadCompleted = true;
                     } catch (IllegalStateException | InterruptedException | 
MongoInterruptedException e) {
-                        if (cancelled) {
-                            LOG.info("Download task was cancelled.");
+                        if (cancelled.get()) {
+                            LOG.info("Download task was cancelled: {}", 
e.toString());
                             return;
                         } else {
                             throw e;
@@ -744,10 +753,18 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
         }
 
         public void cancelDownload() {
-            LOG.info("{} Cancelling download, closing Mongo cursor: {}", 
downloadOrder, mongoCursor);
-            cancelled = true;
-            if (mongoCursor != null) {
-                mongoCursor.close();
+            boolean alreadyCancelled = cancelled.getAndSet(true);
+            if (alreadyCancelled) {
+                LOG.info("Download task was already cancelled.");
+            } else {
+                LOG.info("Cancelling download for {} order task, closing Mongo 
cursor.", downloadOrder);
+                if (mongoCursor != null) {
+                    try {
+                        mongoCursor.close();
+                    } catch (Throwable e) {
+                        LOG.info("Error closing Mongo cursor, the cursor may 
already be closed: {}", e.toString());
+                    }
+                }
             }
         }
 

Reply via email to