This is an automated email from the ASF dual-hosted git repository.
thomasm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0861c05c00 OAK-12054: Refactor creation of ThreadPoolExecutors (fix
those that we in fact only running one thread) (#2679)
0861c05c00 is described below
commit 0861c05c00c0408ffacd2f668b0507f1251680fb
Author: Benjamin Habegger <[email protected]>
AuthorDate: Tue Feb 3 09:55:40 2026 +0100
OAK-12054: Refactor creation of ThreadPoolExecutors (fix those that we in
fact only running one thread) (#2679)
---
oak-commons/pom.xml | 9 +--
.../internal/concurrent/ExecutorHelper.java | 94 ++++++++++++++++++++++
.../index/lucene/LuceneIndexProviderService.java | 30 ++-----
.../apache/jackrabbit/oak/index/IndexHelper.java | 28 +------
.../org/apache/jackrabbit/oak/run/Downloader.java | 26 +-----
.../index/elastic/ElasticIndexStatistics.java | 12 +--
.../plugins/index/search/ExtractedTextCache.java | 31 ++-----
7 files changed, 120 insertions(+), 110 deletions(-)
diff --git a/oak-commons/pom.xml b/oak-commons/pom.xml
index c9eca84914..ecc5409c4d 100644
--- a/oak-commons/pom.xml
+++ b/oak-commons/pom.xml
@@ -109,6 +109,10 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>jackrabbit-jcr-commons</artifactId>
@@ -142,11 +146,6 @@
<artifactId>commons-math3</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
diff --git
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/ExecutorHelper.java
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/ExecutorHelper.java
new file mode 100644
index 0000000000..94e340d579
--- /dev/null
+++
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/ExecutorHelper.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.commons.internal.concurrent;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Helper class complementing JDK's @see java.util.concurrent.Executors class
with additional ExecutionService creation
+ */
+public class ExecutorHelper {
+
+ /**
+ * Create a ThreadPoolExecutor with an unbounded LinkedBlockingQueue as
work queue.
+ * <br>
+ * NOTE: The maximum number of threads in each executor service, when
using a LinkedBlockingQueue(), is corePoolSize.
+ * all other tasks are kept in the LinkedBlockingQueue, which is unbounded.
+ * <br>
+ * @see <a
href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ThreadPoolExecutor.html">ThreadPoolExecutor
javadoc</a>
+ * <br>
+ * (Using a bounded queue, such as SynchronousQueue would result in
RejectedExecutionHandler if the queue would reach its capacity).
+ * We want to keep things simple and don't want to use back pressure or
other mechanisms. So in summary, corePoolSize threads are used, per service.
+ * @param poolSize The size of the thread pool
+ * @param namePattern The name pattern for the threads containing %d
replaced by the thread number (calculated by NamedThreadFactory).
+ * @param handler An optional handler for uncaught exceptions
+ * @return A ThreadPoolExecutor
+ */
+ public static ThreadPoolExecutor linkedQueueExecutor(int poolSize,
@NotNull String namePattern, Thread.UncaughtExceptionHandler handler) {
+ return linkedQueueExecutor(poolSize, poolSize, namePattern, handler);
+ }
+
+ /**
+ * Create a ThreadPoolExecutor with an unbounded LinkedBlockingQueue as
work queue.
+ * <br>
+ * Helper method delegating to {@link #linkedQueueExecutor(int, String,
Thread.UncaughtExceptionHandler)} with null handler.
+ * @param poolSize The size of the thread pool
+ * @param namePattern The name pattern for the threads containing %d
replaced by the thread number (calculated by NamedThreadFactory).
+ * @return A ThreadPoolExecutor
+ */
+ public static ExecutorService linkedQueueExecutor(int poolSize, @NotNull
String namePattern) {
+ return linkedQueueExecutor(poolSize, namePattern, null);
+ }
+
+
+ /**
+ * Create a ThreadPoolExecutor with an unbounded LinkedBlockingQueue as
work queue which will start a single thread
+ * in presence of work and will shut down the thread after 60 seconds of
inactivity.
+ * <br>
+ * NOTE: ThreadPoolExecutor with an (unbounded) LinkedBlockingQueue does
not allow setting both corePoolSize and maximumPoolSize,
+ * and thus does not allow starting more than a single thread in
combination with reduction to 0.
+ * @param namePattern The name pattern for the threads containing %d
replaced by the thread number (calculated by NamedThreadFactory).
+ * @param handler An optional handler for uncaught exceptions
+ * @return A ThreadPoolExecutor which will activate a single thread on
demand and shut it down after 60 seconds of inactivity.
+ */
+ public static ThreadPoolExecutor
onDemandSingleThreadLinkedQueueExecutor(@NotNull String namePattern,
Thread.UncaughtExceptionHandler handler) {
+ return linkedQueueExecutor(0, 1, namePattern, handler);
+ }
+
+ private static ThreadPoolExecutor linkedQueueExecutor(int minSize, int
maxSize, @NotNull String namePattern, Thread.UncaughtExceptionHandler handler) {
+ // Either minSize=0 and maxSize=1 (maxSize > 1 behaves as if maxSize
were 1)
+ // or minSize=maxSize (maxSize <> minSize behaves as if
minSize were maxSize)
+ var factory = BasicThreadFactory.builder()
+ .daemon(true)
+ .namingPattern(namePattern);
+ if(handler != null) {
+ factory.uncaughtExceptionHandler(handler);
+ }
+ return new ThreadPoolExecutor(minSize, maxSize, 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(), factory.build());
+ }
+
+}
diff --git
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
index 025572c84a..09833d8214 100644
---
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
+++
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
@@ -26,11 +26,8 @@ import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
@@ -38,6 +35,7 @@ import org.apache.commons.io.FilenameUtils;
import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorHelper;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
import org.apache.jackrabbit.oak.plugins.document.spi.JournalPropertyService;
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoService;
@@ -79,7 +77,6 @@ import org.apache.lucene.analysis.util.TokenFilterFactory;
import org.apache.lucene.analysis.util.TokenizerFactory;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.util.InfoStream;
-import org.jetbrains.annotations.NotNull;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
@@ -266,6 +263,7 @@ public class LuceneIndexProviderService {
}
public static final String REPOSITORY_HOME = "repository.home";
+ private static final int INDEX_COPIER_POOL_SIZE = 5;
private LuceneIndexProvider indexProvider;
@@ -335,8 +333,6 @@ public class LuceneIndexProviderService {
private ExecutorService executorService;
- private int threadPoolSize;
-
private ExtractedTextCache extractedTextCache;
private boolean hybridIndex;
@@ -373,7 +369,6 @@ public class LuceneIndexProviderService {
}
whiteboard = new OsgiWhiteboard(bundleContext);
- threadPoolSize = config.threadPoolSize();
initializeIndexDir(bundleContext, config);
initializeExtractedTextCache(bundleContext, config,
statisticsProvider);
tracker = createTracker(bundleContext, config);
@@ -572,24 +567,9 @@ public class LuceneIndexProviderService {
}
private ExecutorService createExecutor() {
- ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60L,
TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(), new ThreadFactory() {
- private final AtomicInteger counter = new AtomicInteger();
- private final Thread.UncaughtExceptionHandler handler = (t, e) ->
log.warn("Error occurred in asynchronous processing ", e);
- @Override
- public Thread newThread(@NotNull Runnable r) {
- Thread thread = new Thread(r, createName());
- thread.setDaemon(true);
- thread.setPriority(Thread.MIN_PRIORITY);
- thread.setUncaughtExceptionHandler(handler);
- return thread;
- }
-
- private String createName() {
- return "oak-lucene-" + counter.getAndIncrement();
- }
- });
- executor.setKeepAliveTime(1, TimeUnit.MINUTES);
+ ThreadPoolExecutor executor = ExecutorHelper.linkedQueueExecutor(
+ INDEX_COPIER_POOL_SIZE, "oak-lucene-%d",
+ (t, e) -> log.warn("Error occurred in asynchronous processing
", e));
executor.allowCoreThreadTimeOut(true);
return executor;
}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java
index 601597d6cc..0fc3d21990 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java
@@ -25,13 +25,10 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorHelper;
import org.apache.jackrabbit.oak.commons.pio.Closer;
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoService;
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoServiceImpl;
@@ -174,26 +171,9 @@ public class IndexHelper implements Closeable {
}
private ThreadPoolExecutor createExecutor() {
- ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 5, 60L,
TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
- private final AtomicInteger counter = new AtomicInteger();
- private final Thread.UncaughtExceptionHandler handler =
- (t, e) -> log.warn("Error occurred in asynchronous
processing ", e);
-
- @Override
- public Thread newThread(@NotNull Runnable r) {
- Thread thread = new Thread(r, createName());
- thread.setDaemon(true);
- thread.setPriority(Thread.MIN_PRIORITY);
- thread.setUncaughtExceptionHandler(handler);
- return thread;
- }
-
- private String createName() {
- return "oak-lucene-" + counter.getAndIncrement();
- }
- });
- executor.setKeepAliveTime(1, TimeUnit.MINUTES);
+ ThreadPoolExecutor executor =
ExecutorHelper.onDemandSingleThreadLinkedQueueExecutor(
+ "oak-lucene-%d",
+ (t, e) -> log.warn("Error occurred in asynchronous processing
", e));
executor.allowCoreThreadTimeOut(true);
return executor;
}
diff --git
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
index fc2836c028..7fa23652c7 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
@@ -18,8 +18,8 @@
*/
package org.apache.jackrabbit.oak.run;
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,8 +45,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -128,26 +126,8 @@ public class Downloader implements Closeable {
this.checksumAlgorithm = null;
}
this.bufferSize = bufferSize;
-
- // The maximum number of threads in each executor service,
- // when using a LinkedBlockingQueue(), is corePoolSize.
- // all other tasks are kept in the LinkedBlockingQueue, which
- // is unbounded.
- // (Using a bounded queue, such as SynchronousQueue,
- // would result in RejectedExecutionHandler).
- // We want to keep things simple and don't want
- // to use back presssure or other mechanisms.
- // So in summary, corePoolSize threads are used, per service.
- this.executorService = new ThreadPoolExecutor(
- corePoolSize, concurrency, 60L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
-
BasicThreadFactory.builder().namingPattern("downloader-%d").daemon().build()
- );
- this.executorServiceForParts = new ThreadPoolExecutor(
- corePoolSize, concurrency, 60L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
-
BasicThreadFactory.builder().namingPattern("partDownloader-%d").daemon().build()
- );
+ this.executorService =
ExecutorHelper.linkedQueueExecutor(corePoolSize, "downloader-%d");
+ this.executorServiceForParts =
ExecutorHelper.linkedQueueExecutor(corePoolSize, "partDownloader-%d");
this.responses = new ArrayList<>();
}
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatistics.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatistics.java
index f963eb504b..240b22de89 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatistics.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatistics.java
@@ -23,14 +23,12 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.jackrabbit.guava.common.base.Ticker;
+import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorHelper;
import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureConverter;
import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexUtils;
import org.apache.jackrabbit.oak.plugins.index.search.IndexStatistics;
@@ -69,12 +67,10 @@ public class ElasticIndexStatistics implements
IndexStatistics {
private static final String REFRESH_SECONDS =
"oak.elastic.statsRefreshSeconds";
private static final Long REFRESH_SECONDS_DEFAULT = 60L;
- private static final ExecutorService REFRESH_EXECUTOR = new
ThreadPoolExecutor(
- 0, 4, 60L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
-
BasicThreadFactory.builder().namingPattern("elastic-statistics-cache-refresh-thread-%d").daemon().build()
- );
+ private static final int REFRESH_POOL_SIZE = 4;
+ private static final ExecutorService REFRESH_EXECUTOR =
ExecutorHelper.linkedQueueExecutor(
+ REFRESH_POOL_SIZE, "elastic-statistics-cache-refresh-%d");
private final ElasticConnection elasticConnection;
private final ElasticIndexDefinition indexDefinition;
private final LoadingCache<StatsRequestDescriptor, Integer> countCache;
diff --git
a/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/ExtractedTextCache.java
b/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/ExtractedTextCache.java
index be75b64f51..57911f6a2f 100644
---
a/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/ExtractedTextCache.java
+++
b/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/ExtractedTextCache.java
@@ -32,12 +32,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.jackrabbit.guava.common.cache.Cache;
import org.apache.jackrabbit.guava.common.cache.CacheBuilder;
@@ -45,6 +42,7 @@ import org.apache.jackrabbit.guava.common.cache.Weigher;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorHelper;
import org.apache.jackrabbit.oak.plugins.index.fulltext.ExtractedText;
import
org.apache.jackrabbit.oak.plugins.index.fulltext.ExtractedText.ExtractionResult;
import
org.apache.jackrabbit.oak.plugins.index.fulltext.PreExtractedTextProvider;
@@ -64,8 +62,8 @@ public class ExtractedTextCache {
Boolean.getBoolean("oak.extracted.cacheOnlySuccess");
private static final int EXTRACTION_TIMEOUT_SECONDS =
Integer.getInteger("oak.extraction.timeoutSeconds", 60);
- private static final int EXTRACTION_MAX_THREADS =
- Integer.getInteger("oak.extraction.maxThreads", 10);
+ private static final int EXTRACTION_POOL_SIZE =
+ Integer.getInteger("oak.extraction.poolSize", 1);
private static final boolean EXTRACT_IN_CALLER_THREAD =
Boolean.getBoolean("oak.extraction.inCallerThread");
private static final boolean EXTRACT_FORGET_TIMEOUT =
@@ -346,26 +344,9 @@ public class ExtractedTextCache {
return;
}
log.debug("ExtractedTextCache createExecutor {}", this);
- ThreadPoolExecutor executor = new ThreadPoolExecutor(1,
EXTRACTION_MAX_THREADS,
- 60L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(), new ThreadFactory() {
- private final AtomicInteger counter = new AtomicInteger();
- private final Thread.UncaughtExceptionHandler handler = (t, e) ->
log.warn("Error occurred in asynchronous processing ", e);
- @Override
- public Thread newThread(@NotNull Runnable r) {
- Thread thread = new Thread(r, createName());
- thread.setDaemon(true);
- thread.setPriority(Thread.MIN_PRIORITY);
- thread.setUncaughtExceptionHandler(handler);
- return thread;
- }
-
- private String createName() {
- int index = counter.getAndIncrement();
- return "oak binary text extractor" + (index == 0 ? "" : " " +
index);
- }
- });
- executor.setKeepAliveTime(1, TimeUnit.MINUTES);
+ ThreadPoolExecutor executor = ExecutorHelper.linkedQueueExecutor(
+ EXTRACTION_POOL_SIZE, "oak-binary-text-extractor-%d",
+ (t, e) -> log.warn("Error occurred in asynchronous processing
", e));
executor.allowCoreThreadTimeOut(true);
executorService = executor;
}