This is an automated email from the ASF dual-hosted git repository.

nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 775ad8573d OAK-11051- indexing-job: parallel download: when one of the 
download threads finishes, stop the other thread (#1661)
775ad8573d is described below

commit 775ad8573d3a63c26b3b2e0715a3220b03b14e74
Author: Nuno Santos <[email protected]>
AuthorDate: Thu Aug 22 16:11:36 2024 +0200

    OAK-11051- indexing-job: parallel download: when one of the download 
threads finishes, stop the other thread (#1661)
---
 .../pipelined/PipelinedMongoDownloadTask.java      | 141 +++++++++++++--------
 1 file changed, 89 insertions(+), 52 deletions(-)

diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
index e8c5770abe..7a6f16fb06 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
@@ -67,6 +67,7 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -435,27 +436,65 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
             DownloadTask descendingDownloadTask = new 
DownloadTask(DownloadOrder.DESCENDING, downloadStageStatistics, 
parallelDownloadCoordinator);
 
             ExecutorService downloadThreadPool = 
Executors.newFixedThreadPool(2, new 
ThreadFactoryBuilder().setDaemon(true).build());
-            Future<?> ascendingDownloadFuture = 
submitDownloadTask(downloadThreadPool, ascendingDownloadTask, mongoFilter, 
THREAD_NAME_PREFIX + "-ascending");
-            Future<?> descendingDownloadFuture = 
submitDownloadTask(downloadThreadPool, descendingDownloadTask, mongoFilter, 
THREAD_NAME_PREFIX + "-descending");
+            ExecutorCompletionService<Void> ecs = new 
ExecutorCompletionService<>(downloadThreadPool);
+            Future<?> ascendingDownloadFuture = submitDownloadTask(ecs, 
ascendingDownloadTask, mongoFilter, THREAD_NAME_PREFIX + "-ascending");
+            Future<?> descendingDownloadFuture = submitDownloadTask(ecs, 
descendingDownloadTask, mongoFilter, THREAD_NAME_PREFIX + "-descending");
+            // When using parallel download, we stop the download when the 
streams cross, that is, when they download
+            // a document that was already downloaded by the other. This works 
well if both ascending and descending
+            // queries return documents frequently, so that we can check for 
crossing frequently. But with regex filtering
+            // enabled, a Mongo query may go for a long time without returning 
any documents, because it may be traversing
+            // a part of the index that does not match the regex expression. 
In the extreme case, the query does not match
+            // any document in the collection, so both streams will scan the 
full index without ever realizing they have
+            // crossed, because this check is only done when one of the 
streams receives enough documents to fill up a batch.
+            // In this case, one of the queries will finish after completing 
its scan, while the other thread may still be
+            // ongoing for some time. I have observed that the ascending 
download thread completes faster than the
+            // descending thread, by a significant margin, so that the 
descending download thread may continue running
+            // for a long time even after the ascending completed the full 
scan. For this reason, it is important to
+            // cancel the other thread as soon as one thread completes the 
scan.
+            // Therefore, we use the following termination conditions:
+            // - the threads have crossed.
+            // - one of the threads finished the download.
+            // In both cases, when a thread finishes, we cancel the other 
thread because by then we have downloaded all
+            // documents.
             try {
-                boolean downloadFinished = false;
-                while (!downloadFinished) {
+                while (true) {
                     // The parent thread waits for the download to complete, 
reporting progress periodically
-                    try {
-                        ascendingDownloadFuture.get(10, TimeUnit.SECONDS);
-                        LOG.info("Ascending download finished. Waiting for 
descending download to finish.");
-                        // Once one of the downloads finishes, the other 
should also finish the next time it checks if
-                        // it intersected with the other stream.
-                        // TODO: One or both of the streams might be waiting 
for a Mongo response for a long time in case
-                        //  the query is traversing a region of the index that 
does not match the regex filter. In the
-                        //  extreme case, the query does not match any 
document, so both streams will scan the full index
-                        //  without ever realizing they have crossed, because 
this check is only done when one of the streams
-                        //  receives enough documents to fill up a batch. If 
neither of the streams receives documents, we
-                        //  have no way of knowing that they have intersected. 
but in the case that one of the streams
-                        //  detects that they have intersected, we can safely 
abort the other download, without waiting
-                        //  for it to continue until the next check.
-                        descendingDownloadFuture.get();
-                        LOG.info("Both ascending and descending download 
completed.");
+                    Future<Void> completedTask = ecs.poll(10, 
TimeUnit.SECONDS);
+                    if (completedTask == null) {
+                        // null return means that the poll timed-out, so the 
download tasks are still ongoing. Report
+                        // progress and then go back to waiting for the tasks 
to complete
+                        long secondsElapsed = 
downloadStartWatch.elapsed(TimeUnit.SECONDS);
+                        long ascTaskDownloadTotal = 
ascendingDownloadTask.getDocumentsDownloadedTotal();
+                        long descTaskDownloadTotal = 
descendingDownloadTask.getDocumentsDownloadedTotal();
+                        String formattedRate;
+                        if (secondsElapsed == 0) {
+                            formattedRate = "N/A nodes/s, N/A nodes/hr, N/A 
/s";
+                        } else {
+                            double docRate = ((double) 
downloadStageStatistics.getDocumentsDownloadedTotal()) / secondsElapsed;
+                            double bytesRate = ((double) 
downloadStageStatistics.getDocumentsDownloadedTotalBytes()) / secondsElapsed;
+                            formattedRate = String.format(Locale.ROOT, "%1.2f 
nodes/s, %1.2f nodes/hr, %s/s",
+                                    docRate, docRate * 3600, 
IOUtils.humanReadableByteCountBin((long) bytesRate));
+                        }
+                        LOG.info("Total documents dumped from Mongo {} (asc: 
{}, desc: {}) [{}] (Elapsed {})",
+                                
downloadStageStatistics.getDocumentsDownloadedTotal(), ascTaskDownloadTotal, 
descTaskDownloadTotal,
+                                formattedRate, 
FormattingUtils.formatToSeconds(secondsElapsed));
+                    } else {
+                        // One of the download tasks has completed. Cancel the 
other one.
+                        if (completedTask == ascendingDownloadFuture) {
+                            LOG.info("Ascending download task has completed. 
Cancelling descending download task.");
+                            // If the download thread is blocked on a socket 
read waiting for a Mongo response, it will
+                            // not immediately respond to the interrupt. It 
will only check the interrupt status when
+                            // it returns from the socket read. So in addition 
to interrupting the thread, we have to
+                            // close the Mongo cursor that is executing the 
query.
+                            descendingDownloadFuture.cancel(true);
+                            descendingDownloadTask.cancelDownload();
+                        } else if (completedTask == descendingDownloadFuture) {
+                            LOG.info("Descending download task has completed. 
Cancelling ascending download task.");
+                            ascendingDownloadFuture.cancel(true);
+                            ascendingDownloadTask.cancelDownload();
+                        } else {
+                            throw new IllegalStateException("Unknown download 
task completed: " + completedTask);
+                        }
 
                         // Download any documents that may have been skipped 
by the descending thread because they were
                         // modified during the download. Any modification will 
change the _modified field to
@@ -474,44 +513,24 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
                             // de-duplicate the entries.
                             LOG.info("Downloading documents modified since the 
start of the download: _modified >= {}", highestModifiedSeen);
                             DownloadTask updatedDocsTask = new 
DownloadTask(DownloadOrder.ASCENDING, downloadStageStatistics, 
highestModifiedSeen, Long.MAX_VALUE, null);
-                            Future<?> updateDocsDownloadFuture = 
submitDownloadTask(downloadThreadPool, updatedDocsTask, mongoFilter, 
THREAD_NAME_PREFIX + "-updated-docs");
+                            Future<?> updateDocsDownloadFuture = 
submitDownloadTask(ecs, updatedDocsTask, mongoFilter, THREAD_NAME_PREFIX + 
"-updated-docs");
                             LOG.info("Waiting for download of updated 
documents to complete.");
                             updateDocsDownloadFuture.get();
                         }
-                        downloadFinished = true;
-                    } catch (TimeoutException e) {
-                        // Report progress
-                        long secondsElapsed = 
downloadStartWatch.elapsed(TimeUnit.SECONDS);
-                        long ascTaskDownloadTotal = 
ascendingDownloadTask.getDocumentsDownloadedTotal();
-                        long descTaskDownloadTotal = 
descendingDownloadTask.getDocumentsDownloadedTotal();
-                        String formattedRate;
-                        if (secondsElapsed == 0) {
-                            formattedRate = "N/A nodes/s, N/A nodes/hr, N/A 
/s";
-                        } else {
-                            double docRate = ((double) 
downloadStageStatistics.getDocumentsDownloadedTotal()) / secondsElapsed;
-                            double bytesRate = ((double) 
downloadStageStatistics.getDocumentsDownloadedTotalBytes()) / secondsElapsed;
-                            formattedRate = String.format(Locale.ROOT, "%1.2f 
nodes/s, %1.2f nodes/hr, %s/s",
-                                    docRate, docRate * 3600, 
IOUtils.humanReadableByteCountBin((long) bytesRate));
-                        }
-                        LOG.info("Total documents dumped from Mongo {} (asc: 
{}, desc: {}) [{}] (Elapsed {})",
-                                
downloadStageStatistics.getDocumentsDownloadedTotal(), ascTaskDownloadTotal, 
descTaskDownloadTotal,
-                                formattedRate, 
FormattingUtils.formatToSeconds(secondsElapsed));
+                        return;
                     }
                 }
             } catch (ExecutionException e) {
-                // One of the download tasks finished with an exception. 
Cancel the other one. Trying to cancel the
-                // task that already failed has no effect
-                LOG.info("Error during download. Canceling download threads. 
Error: {}", e.toString());
-                downloadThreadPool.shutdownNow();
+                // One of the download tasks finished with an exception. The 
finally block will cancel the other task.
+                LOG.info("Error during download: {}", e.toString());
                 throw new RuntimeException(e);
             } catch (InterruptedException e) {
-                LOG.info("Thread interrupted. Cancelling download threads.");
-                // The parent thread was interrupted. Shutdown the download 
threads.
-                downloadThreadPool.shutdownNow();
+                LOG.info("Thread interrupted.");
+                // The parent thread was interrupted. The finally block will 
cancel any task that may still be running.
                 throw e;
             } finally {
                 LOG.info("Shutting down download thread pool.");
-                new ExecutorCloser(downloadThreadPool).close();
+                new ExecutorCloser(downloadThreadPool, 1, 
TimeUnit.SECONDS).close();
                 LOG.info("Download thread pool shutdown complete.");
             }
         } else {
@@ -521,25 +540,29 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
         }
     }
 
-    private Future<?> submitDownloadTask(ExecutorService executor, 
DownloadTask downloadTask, Bson mongoFilter, String name) {
+    private Future<?> submitDownloadTask(ExecutorCompletionService<Void> 
executor, DownloadTask downloadTask, Bson mongoFilter, String name) {
         return executor.submit(() -> {
             String originalName = Thread.currentThread().getName();
             Thread.currentThread().setName(name);
             try {
                 downloadTask.download(mongoFilter);
                 downloadTask.reportFinalResults();
-            } catch (InterruptedException e) {
-                LOG.warn("Thread interrupted.");
+            } catch (InterruptedException | MongoInterruptedException e) {
+                LOG.info("Thread interrupted: {}", e.toString());
                 throw new RuntimeException(e);
             } catch (TimeoutException e) {
                 LOG.warn("Timeout: {}", e.toString());
                 throw new RuntimeException(e);
+            } catch (Throwable t) {
+                LOG.error("Error during download: {}", t.toString());
+                throw t;
             } finally {
                 if (mongoServerSelector != null) {
                     mongoServerSelector.threadFinished();
                 }
                 Thread.currentThread().setName(originalName);
             }
+            return null;
         });
     }
 
@@ -594,7 +617,9 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
         private long documentsDownloadedTotal;
         private long nextLastModified;
         private String lastIdDownloaded;
-        private long firstModifiedValueSeen = -1;
+        // Accessed from the main download thread
+        private volatile long firstModifiedValueSeen = -1;
+        private volatile MongoCursor<NodeDocument> mongoCursor = null;
 
         DownloadTask(DownloadOrder downloadOrder, DownloadStageStatistics 
downloadStatics) {
             this(downloadOrder, downloadStatics, null);
@@ -716,12 +741,20 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
             download(mongoIterable);
         }
 
+        public void cancelDownload() {
+            LOG.info("{} Cancelling download, closing Mongo cursor: {}", 
downloadOrder, mongoCursor);
+            if (mongoCursor != null) {
+                mongoCursor.close();
+            }
+        }
+
         void download(FindIterable<NodeDocument> mongoIterable) throws 
InterruptedException, TimeoutException {
             if (mongoBatchSize != -1) {
                 LOG.info("Setting Mongo batch size to {}", mongoBatchSize);
                 mongoIterable.batchSize(mongoBatchSize);
             }
             try (MongoCursor<NodeDocument> cursor = mongoIterable.iterator()) {
+                this.mongoCursor = cursor;
                 NodeDocument[] batch = new 
NodeDocument[maxBatchNumberOfDocuments];
                 int nextIndex = 0;
                 int batchSize = 0;
@@ -743,7 +776,7 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
                         this.lastIdDownloaded = id;
                         this.documentsDownloadedTotal++;
                         downloadStatics.incrementDocumentsDownloadedTotal();
-                        if (this.documentsDownloadedTotal % 20000 == 0) {
+                        if (this.documentsDownloadedTotal % 20_000 == 0) {
                             reportProgress(id);
                         }
                         TRAVERSAL_LOG.trace(id);
@@ -784,7 +817,11 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
                     if (nextIndex > 0) {
                         LOG.info("Connection interrupted with recoverable 
failure. Enqueueing partial block with {} elements, estimated size: {}",
                                 nextIndex, 
IOUtils.humanReadableByteCountBin(batchSize));
-                        tryEnqueueCopy(batch, nextIndex);
+                        boolean downloadCompleted = tryEnqueueCopy(batch, 
nextIndex);
+                        if (downloadCompleted) {
+                            LOG.info("Download of range with download order {} 
completed, intersected with other download.", downloadOrder);
+                            return;
+                        }
                     }
                     throw e;
                 }

Reply via email to