This is an automated email from the ASF dual-hosted git repository.
nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 95d28d9823 OAK-10788 - Indexing job downloader: shutdown gracefully
all threads in case of failure (#1456)
95d28d9823 is described below
commit 95d28d9823e66c3a04e47dca903c2a0c9abadaf6
Author: Nuno Santos <[email protected]>
AuthorDate: Wed May 15 11:38:23 2024 +0200
OAK-10788 - Indexing job downloader: shutdown gracefully all threads in
case of failure (#1456)
---
.../pipelined/PipelinedMongoDownloadTask.java | 20 +-
.../flatfile/pipelined/PipelinedStrategy.java | 210 ++++++++++-----------
2 files changed, 105 insertions(+), 125 deletions(-)
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
index 3a3fcfac36..8425375a66 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
@@ -38,6 +38,7 @@ import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.MongoRegexPathFilterFactory.MongoFilterPaths;
import org.apache.jackrabbit.oak.plugins.document.Collection;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
@@ -441,23 +442,17 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
// One of the download tasks finished with an exception.
Cancel the other one. Trying to cancel the
// task that already failed has no effect
LOG.info("Error during download. Canceling download threads.
Error: {}", e.toString());
- ascendingDownloadFuture.cancel(true);
- descendingDownloadFuture.cancel(true);
+ downloadThreadPool.shutdownNow();
throw new RuntimeException(e);
} catch (InterruptedException e) {
LOG.info("Thread interrupted. Cancelling download threads.");
// The parent thread was interrupted. Shutdown the download
threads.
- ascendingDownloadFuture.cancel(true);
- descendingDownloadFuture.cancel(true);
+ downloadThreadPool.shutdownNow();
throw e;
} finally {
LOG.info("Shutting down download thread pool.");
- downloadThreadPool.shutdown();
- boolean success = downloadThreadPool.awaitTermination(5,
TimeUnit.SECONDS);
- if (!success) {
- LOG.warn("Download thread pool did not shut down in 5
seconds. Forcing shutdown.");
- downloadThreadPool.shutdownNow();
- }
+ new ExecutorCloser(downloadThreadPool).close();
+ LOG.info("Download thread pool shutdown complete.");
}
} else {
// Single threaded dump
@@ -473,9 +468,12 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
try {
downloadTask.download(mongoFilter);
downloadTask.reportFinalResults();
- } catch (InterruptedException | TimeoutException e) {
+ } catch (InterruptedException e) {
LOG.warn("Thread interrupted.");
throw new RuntimeException(e);
+ } catch (TimeoutException e) {
+ LOG.warn("Timeout: {}", e.toString());
+ throw new RuntimeException(e);
} finally {
mongoServerSelector.threadFinished();
Thread.currentThread().setName(originalName);
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
index 672be2c574..195922f12b 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
@@ -25,6 +25,7 @@ import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.jackrabbit.oak.commons.Compression;
import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
import
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreSortStrategyBase;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
@@ -48,6 +49,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
@@ -56,8 +58,6 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
@@ -147,35 +147,6 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
private static final int MIN_ENTRY_BATCH_BUFFER_SIZE_MB = 32;
private static final int MAX_AUTODETECT_WORKING_MEMORY_MB = 4000;
- private static class MonitorTask<T> implements Runnable {
- private final ArrayBlockingQueue<T[]> mongoDocQueue;
- private final ArrayBlockingQueue<NodeStateEntryBatch>
emptyBatchesQueue;
- private final ArrayBlockingQueue<NodeStateEntryBatch>
nonEmptyBatchesQueue;
- private final ArrayBlockingQueue<Path> sortedFilesQueue;
- private final TransformStageStatistics transformStageStatistics;
-
- public MonitorTask(ArrayBlockingQueue<T[]> mongoDocQueue,
- ArrayBlockingQueue<NodeStateEntryBatch>
emptyBatchesQueue,
- ArrayBlockingQueue<NodeStateEntryBatch>
nonEmptyBatchesQueue,
- ArrayBlockingQueue<Path> sortedFilesQueue,
- TransformStageStatistics transformStageStatistics) {
- this.mongoDocQueue = mongoDocQueue;
- this.emptyBatchesQueue = emptyBatchesQueue;
- this.nonEmptyBatchesQueue = nonEmptyBatchesQueue;
- this.sortedFilesQueue = sortedFilesQueue;
- this.transformStageStatistics = transformStageStatistics;
- }
-
- @Override
- public void run() {
- try {
- printStatistics(mongoDocQueue, emptyBatchesQueue,
nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, false);
- } catch (Exception e) {
- LOG.error("Error while logging queue sizes", e);
- }
- }
- }
-
private static <T> void printStatistics(ArrayBlockingQueue<T[]>
mongoDocQueue,
ArrayBlockingQueue<NodeStateEntryBatch> emptyBuffersQueue,
ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBuffersQueue,
@@ -382,10 +353,7 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
// all the tasks, so that if one of them fails, we can abort the whole
pipeline. Otherwise, if we wait on
// Future instances, we can only wait on one of them, so that if any
of the others fail, we have no easy way
// to detect this failure.
- ExecutorCompletionService ecs = new
ExecutorCompletionService(threadPool);
- ScheduledExecutorService monitorThreadPool =
Executors.newScheduledThreadPool(1,
- new
ThreadFactoryBuilder().setNameFormat("monitor").setDaemon(true).build()
- );
+ ExecutorCompletionService ecs = new
ExecutorCompletionService<>(threadPool);
try {
// download -> transform thread.
ArrayBlockingQueue<NodeDocument[]> mongoDocQueue = new
ArrayBlockingQueue<>(mongoDocQueueSize);
@@ -401,10 +369,6 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
TransformStageStatistics transformStageStatistics = new
TransformStageStatistics();
- ScheduledFuture<?> monitorFuture =
monitorThreadPool.scheduleWithFixedDelay(
- new MonitorTask(mongoDocQueue, emptyBatchesQueue,
nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics),
- 10, 30, TimeUnit.SECONDS);
-
// Create empty buffers
for (int i = 0; i < nseBuffersCount; i++) {
// No limits on the number of entries, only on their total
size. This might be revised later.
@@ -414,7 +378,7 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
LOG.info("[TASK:PIPELINED-DUMP:START] Starting to build FFS");
Stopwatch start = Stopwatch.createStarted();
- ecs.submit(new PipelinedMongoDownloadTask(
+ Future<PipelinedMongoDownloadTask.Result> downloadFuture =
ecs.submit(new PipelinedMongoDownloadTask(
mongoClientURI,
docStore,
(int) (mongoDocBatchMaxSizeMB * FileUtils.ONE_MB),
@@ -425,9 +389,10 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
indexingReporter
));
+ ArrayList<Future<PipelinedTransformTask.Result>> transformFutures
= new ArrayList<>(numberOfTransformThreads);
for (int i = 0; i < numberOfTransformThreads; i++) {
NodeStateEntryWriter entryWriter = new
NodeStateEntryWriter(blobStore);
- ecs.submit(new PipelinedTransformTask(
+ transformFutures.add(ecs.submit(new PipelinedTransformTask(
docStore,
documentNodeStore,
rootRevision,
@@ -437,10 +402,10 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
emptyBatchesQueue,
nonEmptyBatchesQueue,
transformStageStatistics
- ));
+ )));
}
- ecs.submit(new PipelinedSortBatchTask(
+ Future<PipelinedSortBatchTask.Result> sortBatchFuture =
ecs.submit(new PipelinedSortBatchTask(
this.getStoreDir().toPath(),
pathComparator,
this.getAlgorithm(),
@@ -459,81 +424,86 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
statisticsProvider,
indexingReporter);
- ecs.submit(mergeSortTask);
+ Future<PipelinedMergeSortTask.Result> mergeSortFuture =
ecs.submit(mergeSortTask);
Path flatFileStore = null;
try {
LOG.info("Waiting for tasks to complete");
int tasksFinished = 0;
int transformTasksFinished = 0;
+ boolean monitorQueues = true;
while (tasksFinished < numberOfThreads) {
- Future<?> completedTask = ecs.take();
- try {
- Object result = completedTask.get();
- if (result instanceof
PipelinedMongoDownloadTask.Result) {
- PipelinedMongoDownloadTask.Result downloadResult =
(PipelinedMongoDownloadTask.Result) result;
- LOG.info("Download finished. Documents downloaded:
{}", downloadResult.getDocumentsDownloaded());
- // Signal the end of documents to the transform
threads.
- for (int i = 0; i < numberOfTransformThreads; i++)
{
- mongoDocQueue.put(SENTINEL_MONGO_DOCUMENT);
- }
- mergeSortTask.stopEagerMerging();
-
- } else if (result instanceof
PipelinedTransformTask.Result) {
- PipelinedTransformTask.Result transformResult =
(PipelinedTransformTask.Result) result;
- transformTasksFinished++;
- nodeStateEntriesExtracted +=
transformResult.getEntryCount();
- LOG.info("Transform task {} finished. Entries
processed: {}",
- transformResult.getThreadId(),
transformResult.getEntryCount());
- if (transformTasksFinished ==
numberOfTransformThreads) {
- LOG.info("All transform tasks finished. Total
entries processed: {}", nodeStateEntriesExtracted);
- // No need to keep monitoring the queues, the
download and transform threads are done.
- monitorFuture.cancel(false);
- // Terminate the sort thread.
- nonEmptyBatchesQueue.put(SENTINEL_NSE_BUFFER);
-
transformStageStatistics.publishStatistics(statisticsProvider,
indexingReporter);
+ // Wait with a timeout to print statistics periodically
+ Future<?> completedTask = ecs.poll(30, TimeUnit.SECONDS);
+ if (completedTask == null) {
+ // Timeout waiting for a task to complete
+ if (monitorQueues) {
+ try {
+ printStatistics(mongoDocQueue,
emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue,
transformStageStatistics, false);
+ } catch (Exception e) {
+ LOG.warn("Error while logging queue sizes", e);
}
-
- } else if (result instanceof
PipelinedSortBatchTask.Result) {
- PipelinedSortBatchTask.Result sortTaskResult =
(PipelinedSortBatchTask.Result) result;
- LOG.info("Sort batch task finished. Entries
processed: {}", sortTaskResult.getTotalEntries());
- sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
- // The buffers between transform and merge sort
tasks are no longer needed, so remove them
- // from the queues so they can be garbage
collected.
- // These buffers can be very large, so this is
important to avoid running out of memory in
- // the merge-sort phase
- if (!nonEmptyBatchesQueue.isEmpty()) {
- LOG.warn("emptyBatchesQueue is not empty.
Size: {}", emptyBatchesQueue.size());
- }
- emptyBatchesQueue.clear();
- printStatistics(mongoDocQueue, emptyBatchesQueue,
nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
-
- } else if (result instanceof
PipelinedMergeSortTask.Result) {
- PipelinedMergeSortTask.Result mergeSortedFilesTask
= (PipelinedMergeSortTask.Result) result;
- Path ffs =
mergeSortedFilesTask.getFlatFileStoreFile();
- LOG.info("Merge-sort sort task finished. FFS: {},
Size: {}", ffs, humanReadableByteCountBin(Files.size(ffs)));
- flatFileStore =
mergeSortedFilesTask.getFlatFileStoreFile();
-
- } else {
- throw new RuntimeException("Unknown result type: "
+ result);
- }
- tasksFinished++;
- } catch (ExecutionException ex) {
- LOG.warn("Execution error dumping from MongoDB: " + ex
+ ". Shutting down all threads");
- threadPool.shutdownNow();
- boolean terminated = threadPool.awaitTermination(5,
TimeUnit.SECONDS);
- if (!terminated) {
- LOG.warn("Thread pool failed to terminate");
}
- throw new RuntimeException(ex.getCause());
- } catch (Throwable ex) {
- LOG.warn("Error dumping from MongoDB: " + ex);
- threadPool.shutdownNow();
- boolean terminated = threadPool.awaitTermination(5,
TimeUnit.SECONDS);
- if (!terminated) {
- LOG.warn("Thread pool failed to terminate");
+ } else {
+ try {
+ Object result = completedTask.get();
+ if (result instanceof
PipelinedMongoDownloadTask.Result) {
+ PipelinedMongoDownloadTask.Result
downloadResult = (PipelinedMongoDownloadTask.Result) result;
+ LOG.info("Download finished. Documents
downloaded: {}", downloadResult.getDocumentsDownloaded());
+ // Signal the end of documents to the
transform threads.
+ for (int i = 0; i < numberOfTransformThreads;
i++) {
+ mongoDocQueue.put(SENTINEL_MONGO_DOCUMENT);
+ }
+ mergeSortTask.stopEagerMerging();
+ downloadFuture = null;
+
+ } else if (result instanceof
PipelinedTransformTask.Result) {
+ PipelinedTransformTask.Result transformResult
= (PipelinedTransformTask.Result) result;
+ transformTasksFinished++;
+ nodeStateEntriesExtracted +=
transformResult.getEntryCount();
+ LOG.info("Transform task {} finished. Entries
processed: {}",
+ transformResult.getThreadId(),
transformResult.getEntryCount());
+ if (transformTasksFinished ==
numberOfTransformThreads) {
+ LOG.info("All transform tasks finished.
Total entries processed: {}", nodeStateEntriesExtracted);
+ // No need to keep monitoring the queues,
the download and transform threads are done.
+ monitorQueues = false;
+ // Terminate the sort thread.
+
nonEmptyBatchesQueue.put(SENTINEL_NSE_BUFFER);
+
transformStageStatistics.publishStatistics(statisticsProvider,
indexingReporter);
+ transformFutures.clear();
+ }
+
+ } else if (result instanceof
PipelinedSortBatchTask.Result) {
+ PipelinedSortBatchTask.Result sortTaskResult =
(PipelinedSortBatchTask.Result) result;
+ LOG.info("Sort batch task finished. Entries
processed: {}", sortTaskResult.getTotalEntries());
+
sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
+ // The buffers between transform and merge
sort tasks are no longer needed, so remove them
+ // from the queues so they can be garbage
collected.
+ // These buffers can be very large, so this is
important to avoid running out of memory in
+ // the merge-sort phase
+ if (!nonEmptyBatchesQueue.isEmpty()) {
+ LOG.warn("emptyBatchesQueue is not empty.
Size: {}", emptyBatchesQueue.size());
+ }
+ emptyBatchesQueue.clear();
+ printStatistics(mongoDocQueue,
emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue,
transformStageStatistics, true);
+ sortBatchFuture = null;
+
+ } else if (result instanceof
PipelinedMergeSortTask.Result) {
+ PipelinedMergeSortTask.Result
mergeSortedFilesTask = (PipelinedMergeSortTask.Result) result;
+ Path ffs =
mergeSortedFilesTask.getFlatFileStoreFile();
+ LOG.info("Merge-sort sort task finished. FFS:
{}, Size: {}", ffs, humanReadableByteCountBin(Files.size(ffs)));
+ flatFileStore =
mergeSortedFilesTask.getFlatFileStoreFile();
+ mergeSortFuture = null;
+
+ } else {
+ throw new RuntimeException("Unknown result
type: " + result);
+ }
+ tasksFinished++;
+ } catch (ExecutionException ex) {
+ throw new RuntimeException(ex.getCause());
+ } catch (Throwable ex) {
+ throw new RuntimeException(ex);
}
- throw new RuntimeException(ex);
}
}
long elapsedSeconds = start.elapsed(TimeUnit.SECONDS);
@@ -545,16 +515,28 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
indexingReporter.addTiming("Build FFS (Dump+Merge)",
FormattingUtils.formatToSeconds(elapsedSeconds));
LOG.info("[INDEXING_REPORT:BUILD_FFS]\n{}",
indexingReporter.generateReport());
- } catch (InterruptedException e) {
+ } catch (Throwable e) {
+ LOG.warn("Error dumping from MongoDB. Cancelling all tasks.
Error: {}", e.toString());
+ // Cancel in order
+ cancelFuture(downloadFuture);
+ for (Future<?> transformTask : transformFutures) {
+ cancelFuture(transformTask);
+ }
+ cancelFuture(sortBatchFuture);
+ cancelFuture(mergeSortFuture);
throw new RuntimeException(e);
- } finally {
- // No longer need to monitor the size of the queues,
- monitorFuture.cancel(true);
}
return flatFileStore.toFile();
} finally {
- threadPool.shutdown();
- monitorThreadPool.shutdown();
+ LOG.info("Shutting down build FFS thread pool");
+ new ExecutorCloser(threadPool).close();
+ }
+ }
+
+ private void cancelFuture(Future<?> future) {
+ if (future != null) {
+ LOG.info("Cancelling future: {}", future);
+ future.cancel(true);
}
}