This is an automated email from the ASF dual-hosted git repository.

nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 95d28d9823 OAK-10788 - Indexing job downloader: shutdown gracefully 
all threads in case of failure (#1456)
95d28d9823 is described below

commit 95d28d9823e66c3a04e47dca903c2a0c9abadaf6
Author: Nuno Santos <[email protected]>
AuthorDate: Wed May 15 11:38:23 2024 +0200

    OAK-10788 - Indexing job downloader: shutdown gracefully all threads in 
case of failure (#1456)
---
 .../pipelined/PipelinedMongoDownloadTask.java      |  20 +-
 .../flatfile/pipelined/PipelinedStrategy.java      | 210 ++++++++++-----------
 2 files changed, 105 insertions(+), 125 deletions(-)

diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
index 3a3fcfac36..8425375a66 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
@@ -38,6 +38,7 @@ import org.apache.jackrabbit.guava.common.base.Stopwatch;
 import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.jackrabbit.oak.commons.IOUtils;
 import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
 import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.MongoRegexPathFilterFactory.MongoFilterPaths;
 import org.apache.jackrabbit.oak.plugins.document.Collection;
 import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
@@ -441,23 +442,17 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
                 // One of the download tasks finished with an exception. 
Cancel the other one. Trying to cancel the
                 // task that already failed has no effect
                 LOG.info("Error during download. Canceling download threads. 
Error: {}", e.toString());
-                ascendingDownloadFuture.cancel(true);
-                descendingDownloadFuture.cancel(true);
+                downloadThreadPool.shutdownNow();
                 throw new RuntimeException(e);
             } catch (InterruptedException e) {
                 LOG.info("Thread interrupted. Cancelling download threads.");
                 // The parent thread was interrupted. Shutdown the download 
threads.
-                ascendingDownloadFuture.cancel(true);
-                descendingDownloadFuture.cancel(true);
+                downloadThreadPool.shutdownNow();
                 throw e;
             } finally {
                 LOG.info("Shutting down download thread pool.");
-                downloadThreadPool.shutdown();
-                boolean success = downloadThreadPool.awaitTermination(5, 
TimeUnit.SECONDS);
-                if (!success) {
-                    LOG.warn("Download thread pool did not shut down in 5 
seconds. Forcing shutdown.");
-                    downloadThreadPool.shutdownNow();
-                }
+                new ExecutorCloser(downloadThreadPool).close();
+                LOG.info("Download thread pool shutdown complete.");
             }
         } else {
             // Single threaded dump
@@ -473,9 +468,12 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
             try {
                 downloadTask.download(mongoFilter);
                 downloadTask.reportFinalResults();
-            } catch (InterruptedException | TimeoutException e) {
+            } catch (InterruptedException e) {
                 LOG.warn("Thread interrupted.");
                 throw new RuntimeException(e);
+            } catch (TimeoutException e) {
+                LOG.warn("Timeout: {}", e.toString());
+                throw new RuntimeException(e);
             } finally {
                 mongoServerSelector.threadFinished();
                 Thread.currentThread().setName(originalName);
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
index 672be2c574..195922f12b 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
@@ -25,6 +25,7 @@ import org.apache.jackrabbit.guava.common.base.Stopwatch;
 import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.jackrabbit.oak.commons.Compression;
 import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
 import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
 import 
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreSortStrategyBase;
 import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
@@ -48,6 +49,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -56,8 +58,6 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 
@@ -147,35 +147,6 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
     private static final int MIN_ENTRY_BATCH_BUFFER_SIZE_MB = 32;
     private static final int MAX_AUTODETECT_WORKING_MEMORY_MB = 4000;
 
-    private static class MonitorTask<T> implements Runnable {
-        private final ArrayBlockingQueue<T[]> mongoDocQueue;
-        private final ArrayBlockingQueue<NodeStateEntryBatch> 
emptyBatchesQueue;
-        private final ArrayBlockingQueue<NodeStateEntryBatch> 
nonEmptyBatchesQueue;
-        private final ArrayBlockingQueue<Path> sortedFilesQueue;
-        private final TransformStageStatistics transformStageStatistics;
-
-        public MonitorTask(ArrayBlockingQueue<T[]> mongoDocQueue,
-                           ArrayBlockingQueue<NodeStateEntryBatch> 
emptyBatchesQueue,
-                           ArrayBlockingQueue<NodeStateEntryBatch> 
nonEmptyBatchesQueue,
-                           ArrayBlockingQueue<Path> sortedFilesQueue,
-                           TransformStageStatistics transformStageStatistics) {
-            this.mongoDocQueue = mongoDocQueue;
-            this.emptyBatchesQueue = emptyBatchesQueue;
-            this.nonEmptyBatchesQueue = nonEmptyBatchesQueue;
-            this.sortedFilesQueue = sortedFilesQueue;
-            this.transformStageStatistics = transformStageStatistics;
-        }
-
-        @Override
-        public void run() {
-            try {
-                printStatistics(mongoDocQueue, emptyBatchesQueue, 
nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, false);
-            } catch (Exception e) {
-                LOG.error("Error while logging queue sizes", e);
-            }
-        }
-    }
-
     private static <T> void printStatistics(ArrayBlockingQueue<T[]> 
mongoDocQueue,
                                             
ArrayBlockingQueue<NodeStateEntryBatch> emptyBuffersQueue,
                                             
ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBuffersQueue,
@@ -382,10 +353,7 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
         // all the tasks, so that if one of them fails, we can abort the whole 
pipeline. Otherwise, if we wait on
         // Future instances, we can only wait on one of them, so that if any 
of the others fail, we have no easy way
         // to detect this failure.
-        ExecutorCompletionService ecs = new 
ExecutorCompletionService(threadPool);
-        ScheduledExecutorService monitorThreadPool = 
Executors.newScheduledThreadPool(1,
-                new 
ThreadFactoryBuilder().setNameFormat("monitor").setDaemon(true).build()
-        );
+        ExecutorCompletionService ecs = new 
ExecutorCompletionService<>(threadPool);
         try {
             // download -> transform thread.
             ArrayBlockingQueue<NodeDocument[]> mongoDocQueue = new 
ArrayBlockingQueue<>(mongoDocQueueSize);
@@ -401,10 +369,6 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
 
             TransformStageStatistics transformStageStatistics = new 
TransformStageStatistics();
 
-            ScheduledFuture<?> monitorFuture = 
monitorThreadPool.scheduleWithFixedDelay(
-                    new MonitorTask(mongoDocQueue, emptyBatchesQueue, 
nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics),
-                    10, 30, TimeUnit.SECONDS);
-
             // Create empty buffers
             for (int i = 0; i < nseBuffersCount; i++) {
                 // No limits on the number of entries, only on their total 
size. This might be revised later.
@@ -414,7 +378,7 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
             LOG.info("[TASK:PIPELINED-DUMP:START] Starting to build FFS");
             Stopwatch start = Stopwatch.createStarted();
 
-            ecs.submit(new PipelinedMongoDownloadTask(
+            Future<PipelinedMongoDownloadTask.Result> downloadFuture = 
ecs.submit(new PipelinedMongoDownloadTask(
                     mongoClientURI,
                     docStore,
                     (int) (mongoDocBatchMaxSizeMB * FileUtils.ONE_MB),
@@ -425,9 +389,10 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
                     indexingReporter
             ));
 
+            ArrayList<Future<PipelinedTransformTask.Result>> transformFutures 
= new ArrayList<>(numberOfTransformThreads);
             for (int i = 0; i < numberOfTransformThreads; i++) {
                 NodeStateEntryWriter entryWriter = new 
NodeStateEntryWriter(blobStore);
-                ecs.submit(new PipelinedTransformTask(
+                transformFutures.add(ecs.submit(new PipelinedTransformTask(
                         docStore,
                         documentNodeStore,
                         rootRevision,
@@ -437,10 +402,10 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
                         emptyBatchesQueue,
                         nonEmptyBatchesQueue,
                         transformStageStatistics
-                ));
+                )));
             }
 
-            ecs.submit(new PipelinedSortBatchTask(
+            Future<PipelinedSortBatchTask.Result> sortBatchFuture = 
ecs.submit(new PipelinedSortBatchTask(
                     this.getStoreDir().toPath(),
                     pathComparator,
                     this.getAlgorithm(),
@@ -459,81 +424,86 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
                     statisticsProvider,
                     indexingReporter);
 
-            ecs.submit(mergeSortTask);
+            Future<PipelinedMergeSortTask.Result> mergeSortFuture = 
ecs.submit(mergeSortTask);
 
             Path flatFileStore = null;
             try {
                 LOG.info("Waiting for tasks to complete");
                 int tasksFinished = 0;
                 int transformTasksFinished = 0;
+                boolean monitorQueues = true;
                 while (tasksFinished < numberOfThreads) {
-                    Future<?> completedTask = ecs.take();
-                    try {
-                        Object result = completedTask.get();
-                        if (result instanceof 
PipelinedMongoDownloadTask.Result) {
-                            PipelinedMongoDownloadTask.Result downloadResult = 
(PipelinedMongoDownloadTask.Result) result;
-                            LOG.info("Download finished. Documents downloaded: 
{}", downloadResult.getDocumentsDownloaded());
-                            // Signal the end of documents to the transform 
threads.
-                            for (int i = 0; i < numberOfTransformThreads; i++) 
{
-                                mongoDocQueue.put(SENTINEL_MONGO_DOCUMENT);
-                            }
-                            mergeSortTask.stopEagerMerging();
-
-                        } else if (result instanceof 
PipelinedTransformTask.Result) {
-                            PipelinedTransformTask.Result transformResult = 
(PipelinedTransformTask.Result) result;
-                            transformTasksFinished++;
-                            nodeStateEntriesExtracted += 
transformResult.getEntryCount();
-                            LOG.info("Transform task {} finished. Entries 
processed: {}",
-                                    transformResult.getThreadId(), 
transformResult.getEntryCount());
-                            if (transformTasksFinished == 
numberOfTransformThreads) {
-                                LOG.info("All transform tasks finished. Total 
entries processed: {}", nodeStateEntriesExtracted);
-                                // No need to keep monitoring the queues, the 
download and transform threads are done.
-                                monitorFuture.cancel(false);
-                                // Terminate the sort thread.
-                                nonEmptyBatchesQueue.put(SENTINEL_NSE_BUFFER);
-                                
transformStageStatistics.publishStatistics(statisticsProvider, 
indexingReporter);
+                    // Wait with a timeout to print statistics periodically
+                    Future<?> completedTask = ecs.poll(30, TimeUnit.SECONDS);
+                    if (completedTask == null) {
+                        // Timeout waiting for a task to complete
+                        if (monitorQueues) {
+                            try {
+                                printStatistics(mongoDocQueue, 
emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue, 
transformStageStatistics, false);
+                            } catch (Exception e) {
+                                LOG.warn("Error while logging queue sizes", e);
                             }
-
-                        } else if (result instanceof 
PipelinedSortBatchTask.Result) {
-                            PipelinedSortBatchTask.Result sortTaskResult = 
(PipelinedSortBatchTask.Result) result;
-                            LOG.info("Sort batch task finished. Entries 
processed: {}", sortTaskResult.getTotalEntries());
-                            sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
-                            // The buffers between transform and merge sort 
tasks are no longer needed, so remove them
-                            // from the queues so they can be garbage 
collected.
-                            // These buffers can be very large, so this is 
important to avoid running out of memory in
-                            // the merge-sort phase
-                            if (!nonEmptyBatchesQueue.isEmpty()) {
-                                LOG.warn("emptyBatchesQueue is not empty. 
Size: {}", emptyBatchesQueue.size());
-                            }
-                            emptyBatchesQueue.clear();
-                            printStatistics(mongoDocQueue, emptyBatchesQueue, 
nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
-
-                        } else if (result instanceof 
PipelinedMergeSortTask.Result) {
-                            PipelinedMergeSortTask.Result mergeSortedFilesTask 
= (PipelinedMergeSortTask.Result) result;
-                            Path ffs = 
mergeSortedFilesTask.getFlatFileStoreFile();
-                            LOG.info("Merge-sort sort task finished. FFS: {}, 
Size: {}", ffs, humanReadableByteCountBin(Files.size(ffs)));
-                            flatFileStore = 
mergeSortedFilesTask.getFlatFileStoreFile();
-
-                        } else {
-                            throw new RuntimeException("Unknown result type: " 
+ result);
-                        }
-                        tasksFinished++;
-                    } catch (ExecutionException ex) {
-                        LOG.warn("Execution error dumping from MongoDB: " + ex 
+ ". Shutting down all threads");
-                        threadPool.shutdownNow();
-                        boolean terminated = threadPool.awaitTermination(5, 
TimeUnit.SECONDS);
-                        if (!terminated) {
-                            LOG.warn("Thread pool failed to terminate");
                         }
-                        throw new RuntimeException(ex.getCause());
-                    } catch (Throwable ex) {
-                        LOG.warn("Error dumping from MongoDB: " + ex);
-                        threadPool.shutdownNow();
-                        boolean terminated = threadPool.awaitTermination(5, 
TimeUnit.SECONDS);
-                        if (!terminated) {
-                            LOG.warn("Thread pool failed to terminate");
+                    } else {
+                        try {
+                            Object result = completedTask.get();
+                            if (result instanceof 
PipelinedMongoDownloadTask.Result) {
+                                PipelinedMongoDownloadTask.Result 
downloadResult = (PipelinedMongoDownloadTask.Result) result;
+                                LOG.info("Download finished. Documents 
downloaded: {}", downloadResult.getDocumentsDownloaded());
+                                // Signal the end of documents to the 
transform threads.
+                                for (int i = 0; i < numberOfTransformThreads; 
i++) {
+                                    mongoDocQueue.put(SENTINEL_MONGO_DOCUMENT);
+                                }
+                                mergeSortTask.stopEagerMerging();
+                                downloadFuture = null;
+
+                            } else if (result instanceof 
PipelinedTransformTask.Result) {
+                                PipelinedTransformTask.Result transformResult 
= (PipelinedTransformTask.Result) result;
+                                transformTasksFinished++;
+                                nodeStateEntriesExtracted += 
transformResult.getEntryCount();
+                                LOG.info("Transform task {} finished. Entries 
processed: {}",
+                                        transformResult.getThreadId(), 
transformResult.getEntryCount());
+                                if (transformTasksFinished == 
numberOfTransformThreads) {
+                                    LOG.info("All transform tasks finished. 
Total entries processed: {}", nodeStateEntriesExtracted);
+                                    // No need to keep monitoring the queues, 
the download and transform threads are done.
+                                    monitorQueues = false;
+                                    // Terminate the sort thread.
+                                    
nonEmptyBatchesQueue.put(SENTINEL_NSE_BUFFER);
+                                    
transformStageStatistics.publishStatistics(statisticsProvider, 
indexingReporter);
+                                    transformFutures.clear();
+                                }
+
+                            } else if (result instanceof 
PipelinedSortBatchTask.Result) {
+                                PipelinedSortBatchTask.Result sortTaskResult = 
(PipelinedSortBatchTask.Result) result;
+                                LOG.info("Sort batch task finished. Entries 
processed: {}", sortTaskResult.getTotalEntries());
+                                
sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
+                                // The buffers between transform and merge 
sort tasks are no longer needed, so remove them
+                                // from the queues so they can be garbage 
collected.
+                                // These buffers can be very large, so this is 
important to avoid running out of memory in
+                                // the merge-sort phase
+                                if (!nonEmptyBatchesQueue.isEmpty()) {
+                                    LOG.warn("emptyBatchesQueue is not empty. 
Size: {}", emptyBatchesQueue.size());
+                                }
+                                emptyBatchesQueue.clear();
+                                printStatistics(mongoDocQueue, 
emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue, 
transformStageStatistics, true);
+                                sortBatchFuture = null;
+
+                            } else if (result instanceof 
PipelinedMergeSortTask.Result) {
+                                PipelinedMergeSortTask.Result 
mergeSortedFilesTask = (PipelinedMergeSortTask.Result) result;
+                                Path ffs = 
mergeSortedFilesTask.getFlatFileStoreFile();
+                                LOG.info("Merge-sort sort task finished. FFS: 
{}, Size: {}", ffs, humanReadableByteCountBin(Files.size(ffs)));
+                                flatFileStore = 
mergeSortedFilesTask.getFlatFileStoreFile();
+                                mergeSortFuture = null;
+
+                            } else {
+                                throw new RuntimeException("Unknown result 
type: " + result);
+                            }
+                            tasksFinished++;
+                        } catch (ExecutionException ex) {
+                            throw new RuntimeException(ex.getCause());
+                        } catch (Throwable ex) {
+                            throw new RuntimeException(ex);
                         }
-                        throw new RuntimeException(ex);
                     }
                 }
                 long elapsedSeconds = start.elapsed(TimeUnit.SECONDS);
@@ -545,16 +515,28 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
                 indexingReporter.addTiming("Build FFS (Dump+Merge)", 
FormattingUtils.formatToSeconds(elapsedSeconds));
 
                 LOG.info("[INDEXING_REPORT:BUILD_FFS]\n{}", 
indexingReporter.generateReport());
-            } catch (InterruptedException e) {
+            } catch (Throwable e) {
+                LOG.warn("Error dumping from MongoDB. Cancelling all tasks. 
Error: {}", e.toString());
+                // Cancel in order
+                cancelFuture(downloadFuture);
+                for (Future<?> transformTask : transformFutures) {
+                    cancelFuture(transformTask);
+                }
+                cancelFuture(sortBatchFuture);
+                cancelFuture(mergeSortFuture);
                 throw new RuntimeException(e);
-            } finally {
-                // No longer need to monitor the size of the queues,
-                monitorFuture.cancel(true);
             }
             return flatFileStore.toFile();
         } finally {
-            threadPool.shutdown();
-            monitorThreadPool.shutdown();
+            LOG.info("Shutting down build FFS thread pool");
+            new ExecutorCloser(threadPool).close();
+        }
+    }
+
+    private void cancelFuture(Future<?> future) {
+        if (future != null) {
+            LOG.info("Cancelling future: {}", future);
+            future.cancel(true);
         }
     }
 

Reply via email to