This is an automated email from the ASF dual-hosted git repository.
nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 775ad8573d OAK-11051- indexing-job: parallel download: when one of the
download threads finishes, stop the other thread (#1661)
775ad8573d is described below
commit 775ad8573d3a63c26b3b2e0715a3220b03b14e74
Author: Nuno Santos <[email protected]>
AuthorDate: Thu Aug 22 16:11:36 2024 +0200
OAK-11051- indexing-job: parallel download: when one of the download
threads finishes, stop the other thread (#1661)
---
.../pipelined/PipelinedMongoDownloadTask.java | 141 +++++++++++++--------
1 file changed, 89 insertions(+), 52 deletions(-)
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
index e8c5770abe..7a6f16fb06 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
@@ -67,6 +67,7 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -435,27 +436,65 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
DownloadTask descendingDownloadTask = new
DownloadTask(DownloadOrder.DESCENDING, downloadStageStatistics,
parallelDownloadCoordinator);
ExecutorService downloadThreadPool =
Executors.newFixedThreadPool(2, new
ThreadFactoryBuilder().setDaemon(true).build());
- Future<?> ascendingDownloadFuture =
submitDownloadTask(downloadThreadPool, ascendingDownloadTask, mongoFilter,
THREAD_NAME_PREFIX + "-ascending");
- Future<?> descendingDownloadFuture =
submitDownloadTask(downloadThreadPool, descendingDownloadTask, mongoFilter,
THREAD_NAME_PREFIX + "-descending");
+ ExecutorCompletionService<Void> ecs = new
ExecutorCompletionService<>(downloadThreadPool);
+ Future<?> ascendingDownloadFuture = submitDownloadTask(ecs,
ascendingDownloadTask, mongoFilter, THREAD_NAME_PREFIX + "-ascending");
+ Future<?> descendingDownloadFuture = submitDownloadTask(ecs,
descendingDownloadTask, mongoFilter, THREAD_NAME_PREFIX + "-descending");
+ // When using parallel download, we stop the download when the
streams cross, that is, when they download
+ // a document that was already downloaded by the other. This works
well if both ascending and descending
+ // queries return documents frequently, so that we can check for
crossing frequently. But with regex filtering
+ // enabled, a Mongo query may go for a long time without returning
any documents, because it may be traversing
+ // a part of the index that does not match the regex expression.
In the extreme case, the query does not match
+ // any document in the collection, so both streams will scan the
full index without ever realizing they have
+ // crossed, because this check is only done when one of the
streams receives enough documents to fill up a batch.
+ // In this case, one of the queries will finish after completing
its scan, while the other thread may still be
+ // ongoing for some time. I have observed that the ascending
download thread completes faster than the
+ // descending thread, by a significant margin, so that the
descending download thread may continue running
+ // for a long time even after the ascending completed the full
scan. For this reason, it is important to
+ // cancel the other thread as soon as one thread completes the
scan.
+ // Therefore, we use the following termination conditions:
+ // - the threads have crossed.
+ // - one of the threads finished the download.
+ // In both cases, when a thread finishes, we cancel the other
thread because by then we have downloaded all
+ // documents.
try {
- boolean downloadFinished = false;
- while (!downloadFinished) {
+ while (true) {
// The parent thread waits for the download to complete,
reporting progress periodically
- try {
- ascendingDownloadFuture.get(10, TimeUnit.SECONDS);
- LOG.info("Ascending download finished. Waiting for
descending download to finish.");
- // Once one of the downloads finishes, the other
should also finish the next time it checks if
- // it intersected with the other stream.
- // TODO: One or both of the streams might be waiting
for a Mongo response for a long time in case
- // the query is traversing a region of the index that
does not match the regex filter. In the
- // extreme case, the query does not match any
document, so both streams will scan the full index
- // without ever realizing they have crossed, because
this check is only done when one of the streams
- // receives enough documents to fill up a batch. If
neither of the streams receives documents, we
- // have no way of knowing that they have intersected.
but in the case that one of the streams
- // detects that they have intersected, we can safely
abort the other download, without waiting
- // for it to continue until the next check.
- descendingDownloadFuture.get();
- LOG.info("Both ascending and descending download
completed.");
+ Future<Void> completedTask = ecs.poll(10,
TimeUnit.SECONDS);
+ if (completedTask == null) {
+ // null return means that the poll timed-out, so the
download tasks are still ongoing. Report
+ // progress and then go back to waiting for the tasks
to complete
+ long secondsElapsed =
downloadStartWatch.elapsed(TimeUnit.SECONDS);
+ long ascTaskDownloadTotal =
ascendingDownloadTask.getDocumentsDownloadedTotal();
+ long descTaskDownloadTotal =
descendingDownloadTask.getDocumentsDownloadedTotal();
+ String formattedRate;
+ if (secondsElapsed == 0) {
+ formattedRate = "N/A nodes/s, N/A nodes/hr, N/A
/s";
+ } else {
+ double docRate = ((double)
downloadStageStatistics.getDocumentsDownloadedTotal()) / secondsElapsed;
+ double bytesRate = ((double)
downloadStageStatistics.getDocumentsDownloadedTotalBytes()) / secondsElapsed;
+ formattedRate = String.format(Locale.ROOT, "%1.2f
nodes/s, %1.2f nodes/hr, %s/s",
+ docRate, docRate * 3600,
IOUtils.humanReadableByteCountBin((long) bytesRate));
+ }
+ LOG.info("Total documents dumped from Mongo {} (asc:
{}, desc: {}) [{}] (Elapsed {})",
+
downloadStageStatistics.getDocumentsDownloadedTotal(), ascTaskDownloadTotal,
descTaskDownloadTotal,
+ formattedRate,
FormattingUtils.formatToSeconds(secondsElapsed));
+ } else {
+ // One of the download tasks has completed. Cancel the
other one.
+ if (completedTask == ascendingDownloadFuture) {
+ LOG.info("Ascending download task has completed.
Cancelling descending download task.");
+ // If the download thread is blocked on a socket
read waiting for a Mongo response, it will
+ // not immediately respond to the interrupt. It
will only check the interrupt status when
+ // it returns from the socket read. So in addition
to interrupting the thread, we have to
+ // close the Mongo cursor that is executing the
query.
+ descendingDownloadFuture.cancel(true);
+ descendingDownloadTask.cancelDownload();
+ } else if (completedTask == descendingDownloadFuture) {
+ LOG.info("Descending download task has completed.
Cancelling ascending download task.");
+ ascendingDownloadFuture.cancel(true);
+ ascendingDownloadTask.cancelDownload();
+ } else {
+ throw new IllegalStateException("Unknown download
task completed: " + completedTask);
+ }
// Download any documents that may have been skipped
by the descending thread because they were
// modified during the download. Any modification will
change the _modified field to
@@ -474,44 +513,24 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
// de-duplicate the entries.
LOG.info("Downloading documents modified since the
start of the download: _modified >= {}", highestModifiedSeen);
DownloadTask updatedDocsTask = new
DownloadTask(DownloadOrder.ASCENDING, downloadStageStatistics,
highestModifiedSeen, Long.MAX_VALUE, null);
- Future<?> updateDocsDownloadFuture =
submitDownloadTask(downloadThreadPool, updatedDocsTask, mongoFilter,
THREAD_NAME_PREFIX + "-updated-docs");
+ Future<?> updateDocsDownloadFuture =
submitDownloadTask(ecs, updatedDocsTask, mongoFilter, THREAD_NAME_PREFIX +
"-updated-docs");
LOG.info("Waiting for download of updated
documents to complete.");
updateDocsDownloadFuture.get();
}
- downloadFinished = true;
- } catch (TimeoutException e) {
- // Report progress
- long secondsElapsed =
downloadStartWatch.elapsed(TimeUnit.SECONDS);
- long ascTaskDownloadTotal =
ascendingDownloadTask.getDocumentsDownloadedTotal();
- long descTaskDownloadTotal =
descendingDownloadTask.getDocumentsDownloadedTotal();
- String formattedRate;
- if (secondsElapsed == 0) {
- formattedRate = "N/A nodes/s, N/A nodes/hr, N/A
/s";
- } else {
- double docRate = ((double)
downloadStageStatistics.getDocumentsDownloadedTotal()) / secondsElapsed;
- double bytesRate = ((double)
downloadStageStatistics.getDocumentsDownloadedTotalBytes()) / secondsElapsed;
- formattedRate = String.format(Locale.ROOT, "%1.2f
nodes/s, %1.2f nodes/hr, %s/s",
- docRate, docRate * 3600,
IOUtils.humanReadableByteCountBin((long) bytesRate));
- }
- LOG.info("Total documents dumped from Mongo {} (asc:
{}, desc: {}) [{}] (Elapsed {})",
-
downloadStageStatistics.getDocumentsDownloadedTotal(), ascTaskDownloadTotal,
descTaskDownloadTotal,
- formattedRate,
FormattingUtils.formatToSeconds(secondsElapsed));
+ return;
}
}
} catch (ExecutionException e) {
- // One of the download tasks finished with an exception.
Cancel the other one. Trying to cancel the
- // task that already failed has no effect
- LOG.info("Error during download. Canceling download threads.
Error: {}", e.toString());
- downloadThreadPool.shutdownNow();
+ // One of the download tasks finished with an exception. The
finally block will cancel the other task.
+ LOG.info("Error during download: {}", e.toString());
throw new RuntimeException(e);
} catch (InterruptedException e) {
- LOG.info("Thread interrupted. Cancelling download threads.");
- // The parent thread was interrupted. Shutdown the download
threads.
- downloadThreadPool.shutdownNow();
+ LOG.info("Thread interrupted.");
+ // The parent thread was interrupted. The finally block will
cancel any task that may still be running.
throw e;
} finally {
LOG.info("Shutting down download thread pool.");
- new ExecutorCloser(downloadThreadPool).close();
+ new ExecutorCloser(downloadThreadPool, 1,
TimeUnit.SECONDS).close();
LOG.info("Download thread pool shutdown complete.");
}
} else {
@@ -521,25 +540,29 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
}
}
- private Future<?> submitDownloadTask(ExecutorService executor,
DownloadTask downloadTask, Bson mongoFilter, String name) {
+ private Future<?> submitDownloadTask(ExecutorCompletionService<Void>
executor, DownloadTask downloadTask, Bson mongoFilter, String name) {
return executor.submit(() -> {
String originalName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
downloadTask.download(mongoFilter);
downloadTask.reportFinalResults();
- } catch (InterruptedException e) {
- LOG.warn("Thread interrupted.");
+ } catch (InterruptedException | MongoInterruptedException e) {
+ LOG.info("Thread interrupted: {}", e.toString());
throw new RuntimeException(e);
} catch (TimeoutException e) {
LOG.warn("Timeout: {}", e.toString());
throw new RuntimeException(e);
+ } catch (Throwable t) {
+ LOG.error("Error during download: {}", t.toString());
+ throw t;
} finally {
if (mongoServerSelector != null) {
mongoServerSelector.threadFinished();
}
Thread.currentThread().setName(originalName);
}
+ return null;
});
}
@@ -594,7 +617,9 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
private long documentsDownloadedTotal;
private long nextLastModified;
private String lastIdDownloaded;
- private long firstModifiedValueSeen = -1;
+ // Accessed from the main download thread
+ private volatile long firstModifiedValueSeen = -1;
+ private volatile MongoCursor<NodeDocument> mongoCursor = null;
DownloadTask(DownloadOrder downloadOrder, DownloadStageStatistics
downloadStatics) {
this(downloadOrder, downloadStatics, null);
@@ -716,12 +741,20 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
download(mongoIterable);
}
+ public void cancelDownload() {
+ LOG.info("{} Cancelling download, closing Mongo cursor: {}",
downloadOrder, mongoCursor);
+ if (mongoCursor != null) {
+ mongoCursor.close();
+ }
+ }
+
void download(FindIterable<NodeDocument> mongoIterable) throws
InterruptedException, TimeoutException {
if (mongoBatchSize != -1) {
LOG.info("Setting Mongo batch size to {}", mongoBatchSize);
mongoIterable.batchSize(mongoBatchSize);
}
try (MongoCursor<NodeDocument> cursor = mongoIterable.iterator()) {
+ this.mongoCursor = cursor;
NodeDocument[] batch = new
NodeDocument[maxBatchNumberOfDocuments];
int nextIndex = 0;
int batchSize = 0;
@@ -743,7 +776,7 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
this.lastIdDownloaded = id;
this.documentsDownloadedTotal++;
downloadStatics.incrementDocumentsDownloadedTotal();
- if (this.documentsDownloadedTotal % 20000 == 0) {
+ if (this.documentsDownloadedTotal % 20_000 == 0) {
reportProgress(id);
}
TRAVERSAL_LOG.trace(id);
@@ -784,7 +817,11 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
if (nextIndex > 0) {
LOG.info("Connection interrupted with recoverable
failure. Enqueueing partial block with {} elements, estimated size: {}",
nextIndex,
IOUtils.humanReadableByteCountBin(batchSize));
- tryEnqueueCopy(batch, nextIndex);
+ boolean downloadCompleted = tryEnqueueCopy(batch,
nextIndex);
+ if (downloadCompleted) {
+ LOG.info("Download of range with download order {}
completed, intersected with other download.", downloadOrder);
+ return;
+ }
}
throw e;
}