This is an automated email from the ASF dual-hosted git repository.

thomasm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0861c05c00 OAK-12054: Refactor creation of ThreadPoolExecutors (fix 
those that we in fact only running one thread) (#2679)
0861c05c00 is described below

commit 0861c05c00c0408ffacd2f668b0507f1251680fb
Author: Benjamin Habegger <[email protected]>
AuthorDate: Tue Feb 3 09:55:40 2026 +0100

    OAK-12054: Refactor creation of ThreadPoolExecutors (fix those that we in 
fact only running one thread) (#2679)
---
 oak-commons/pom.xml                                |  9 +--
 .../internal/concurrent/ExecutorHelper.java        | 94 ++++++++++++++++++++++
 .../index/lucene/LuceneIndexProviderService.java   | 30 ++-----
 .../apache/jackrabbit/oak/index/IndexHelper.java   | 28 +------
 .../org/apache/jackrabbit/oak/run/Downloader.java  | 26 +-----
 .../index/elastic/ElasticIndexStatistics.java      | 12 +--
 .../plugins/index/search/ExtractedTextCache.java   | 31 ++-----
 7 files changed, 120 insertions(+), 110 deletions(-)

diff --git a/oak-commons/pom.xml b/oak-commons/pom.xml
index c9eca84914..ecc5409c4d 100644
--- a/oak-commons/pom.xml
+++ b/oak-commons/pom.xml
@@ -109,6 +109,10 @@
       <groupId>commons-codec</groupId>
       <artifactId>commons-codec</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.jackrabbit</groupId>
       <artifactId>jackrabbit-jcr-commons</artifactId>
@@ -142,11 +146,6 @@
       <artifactId>commons-math3</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
diff --git 
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/ExecutorHelper.java
 
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/ExecutorHelper.java
new file mode 100644
index 0000000000..94e340d579
--- /dev/null
+++ 
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/ExecutorHelper.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.commons.internal.concurrent;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Helper class complementing JDK's @see java.util.concurrent.Executors class 
with additional ExecutionService creation
+ */
+public class ExecutorHelper {
+
+    /**
+     * Create a ThreadPoolExecutor with an unbounded LinkedBlockingQueue as 
work queue.
+     * <br>
+     * NOTE: The maximum number of threads in each executor service, when 
using a LinkedBlockingQueue(), is corePoolSize.
+     * all other tasks are kept in the LinkedBlockingQueue, which is unbounded.
+     * <br>
+     * @see <a 
href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ThreadPoolExecutor.html";>ThreadPoolExecutor
 javadoc</a>
+     * <br>
+     * (Using a bounded queue, such as SynchronousQueue would result in 
RejectedExecutionHandler if the queue would reach its capacity).
+     * We want to keep things simple and don't want to use back pressure or 
other mechanisms. So in summary, corePoolSize threads are used, per service.
+     * @param poolSize The size of the thread pool
+     * @param namePattern The name pattern for the threads containing %d 
replaced by the thread number (calculated by NamedThreadFactory).
+     * @param handler  An optional handler for uncaught exceptions
+     * @return A ThreadPoolExecutor
+     */
+    public static ThreadPoolExecutor linkedQueueExecutor(int poolSize, 
@NotNull String namePattern, Thread.UncaughtExceptionHandler handler) {
+        return linkedQueueExecutor(poolSize, poolSize, namePattern, handler);
+    }
+
+    /**
+     * Create a ThreadPoolExecutor with an unbounded LinkedBlockingQueue as 
work queue.
+     * <br>
+     * Helper method delegating to {@link #linkedQueueExecutor(int, String, 
Thread.UncaughtExceptionHandler)} with null handler.
+     * @param poolSize The size of the thread pool
+     * @param namePattern The name pattern for the threads containing %d 
replaced by the thread number (calculated by NamedThreadFactory).
+     * @return A ThreadPoolExecutor
+     */
+    public static ExecutorService linkedQueueExecutor(int poolSize, @NotNull 
String namePattern) {
+        return linkedQueueExecutor(poolSize, namePattern, null);
+    }
+
+
+    /**
+     * Create a ThreadPoolExecutor with an unbounded LinkedBlockingQueue as 
work queue which will start a single thread
+     * in presence of work and will shut down the thread after 60 seconds of 
inactivity.
+     * <br>
+     * NOTE: ThreadPoolExecutor with an (unbounded) LinkedBlockingQueue does 
not allow setting both corePoolSize and maximumPoolSize,
+     * and thus does not allow starting more than a single thread in 
combination with reduction to 0.
+     * @param namePattern The name pattern for the threads containing %d 
replaced by the thread number (calculated by NamedThreadFactory).
+     * @param handler  An optional handler for uncaught exceptions
+     * @return A ThreadPoolExecutor which will activate a single thread on 
demand and shut it down after 60 seconds of inactivity.
+     */
+    public static ThreadPoolExecutor 
onDemandSingleThreadLinkedQueueExecutor(@NotNull String namePattern, 
Thread.UncaughtExceptionHandler handler) {
+        return linkedQueueExecutor(0, 1, namePattern, handler);
+    }
+
+    private static ThreadPoolExecutor linkedQueueExecutor(int minSize, int 
maxSize, @NotNull String namePattern, Thread.UncaughtExceptionHandler handler) {
+        // Either minSize=0 and maxSize=1 (maxSize > 1 behaves as if maxSize 
were 1)
+        //     or minSize=maxSize         (maxSize <> minSize behaves as if 
minSize were maxSize)
+        var factory = BasicThreadFactory.builder()
+                .daemon(true)
+                .namingPattern(namePattern);
+        if(handler != null) {
+            factory.uncaughtExceptionHandler(handler);
+        }
+        return new ThreadPoolExecutor(minSize, maxSize, 60L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(), factory.build());
+    }
+
+}
diff --git 
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
 
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
index 025572c84a..09833d8214 100644
--- 
a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
+++ 
b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
@@ -26,11 +26,8 @@ import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -38,6 +35,7 @@ import org.apache.commons.io.FilenameUtils;
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
 import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorHelper;
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
 import org.apache.jackrabbit.oak.plugins.document.spi.JournalPropertyService;
 import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoService;
@@ -79,7 +77,6 @@ import org.apache.lucene.analysis.util.TokenFilterFactory;
 import org.apache.lucene.analysis.util.TokenizerFactory;
 import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.util.InfoStream;
-import org.jetbrains.annotations.NotNull;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.component.annotations.Activate;
@@ -266,6 +263,7 @@ public class LuceneIndexProviderService {
     }
 
     public static final String REPOSITORY_HOME = "repository.home";
+    private static final int INDEX_COPIER_POOL_SIZE = 5;
 
     private LuceneIndexProvider indexProvider;
 
@@ -335,8 +333,6 @@ public class LuceneIndexProviderService {
 
     private ExecutorService executorService;
 
-    private int threadPoolSize;
-
     private ExtractedTextCache extractedTextCache;
 
     private boolean hybridIndex;
@@ -373,7 +369,6 @@ public class LuceneIndexProviderService {
         }
 
         whiteboard = new OsgiWhiteboard(bundleContext);
-        threadPoolSize = config.threadPoolSize();
         initializeIndexDir(bundleContext, config);
         initializeExtractedTextCache(bundleContext, config, 
statisticsProvider);
         tracker = createTracker(bundleContext, config);
@@ -572,24 +567,9 @@ public class LuceneIndexProviderService {
     }
 
     private ExecutorService createExecutor() {
-        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60L, 
TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(), new ThreadFactory() {
-            private final AtomicInteger counter = new AtomicInteger();
-            private final Thread.UncaughtExceptionHandler handler = (t, e) -> 
log.warn("Error occurred in asynchronous processing ", e);
-            @Override
-            public Thread newThread(@NotNull Runnable r) {
-                Thread thread = new Thread(r, createName());
-                thread.setDaemon(true);
-                thread.setPriority(Thread.MIN_PRIORITY);
-                thread.setUncaughtExceptionHandler(handler);
-                return thread;
-            }
-
-            private String createName() {
-                return "oak-lucene-" + counter.getAndIncrement();
-            }
-        });
-        executor.setKeepAliveTime(1, TimeUnit.MINUTES);
+        ThreadPoolExecutor executor = ExecutorHelper.linkedQueueExecutor(
+                INDEX_COPIER_POOL_SIZE, "oak-lucene-%d", 
+                (t, e) -> log.warn("Error occurred in asynchronous processing 
", e));
         executor.allowCoreThreadTimeOut(true);
         return executor;
     }
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java
index 601597d6cc..0fc3d21990 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java
@@ -25,13 +25,10 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorHelper;
 import org.apache.jackrabbit.oak.commons.pio.Closer;
 import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoService;
 import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoServiceImpl;
@@ -174,26 +171,9 @@ public class IndexHelper implements Closeable {
     }
 
     private ThreadPoolExecutor createExecutor() {
-        ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 5, 60L, 
TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
-            private final AtomicInteger counter = new AtomicInteger();
-            private final Thread.UncaughtExceptionHandler handler =
-                    (t, e) -> log.warn("Error occurred in asynchronous 
processing ", e);
-
-            @Override
-            public Thread newThread(@NotNull Runnable r) {
-                Thread thread = new Thread(r, createName());
-                thread.setDaemon(true);
-                thread.setPriority(Thread.MIN_PRIORITY);
-                thread.setUncaughtExceptionHandler(handler);
-                return thread;
-            }
-
-            private String createName() {
-                return "oak-lucene-" + counter.getAndIncrement();
-            }
-        });
-        executor.setKeepAliveTime(1, TimeUnit.MINUTES);
+        ThreadPoolExecutor executor = 
ExecutorHelper.onDemandSingleThreadLinkedQueueExecutor(
+                "oak-lucene-%d", 
+                (t, e) -> log.warn("Error occurred in asynchronous processing 
", e));
         executor.allowCoreThreadTimeOut(true);
         return executor;
     }
diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
index fc2836c028..7fa23652c7 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
@@ -18,8 +18,8 @@
  */
 package org.apache.jackrabbit.oak.run;
 
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,8 +45,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -128,26 +126,8 @@ public class Downloader implements Closeable {
             this.checksumAlgorithm = null;
         }
         this.bufferSize = bufferSize;
-
-        // The maximum number of threads in each executor service,
-        // when using a LinkedBlockingQueue(), is corePoolSize.
-        // all other tasks are kept in the LinkedBlockingQueue, which
-        // is unbounded.
-        // (Using a bounded queue, such as SynchronousQueue,
-        // would result in RejectedExecutionHandler).
-        // We want to keep things simple and don't want
-        // to use back presssure or other mechanisms.
-        // So in summary, corePoolSize threads are used, per service.
-        this.executorService = new ThreadPoolExecutor(
-                corePoolSize, concurrency, 60L, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(),
-                
BasicThreadFactory.builder().namingPattern("downloader-%d").daemon().build()
-        );
-        this.executorServiceForParts = new ThreadPoolExecutor(
-                corePoolSize, concurrency, 60L, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(),
-                
BasicThreadFactory.builder().namingPattern("partDownloader-%d").daemon().build()
-        );
+        this.executorService = 
ExecutorHelper.linkedQueueExecutor(corePoolSize, "downloader-%d");
+        this.executorServiceForParts = 
ExecutorHelper.linkedQueueExecutor(corePoolSize, "partDownloader-%d");
         this.responses = new ArrayList<>();
     }
 
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatistics.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatistics.java
index f963eb504b..240b22de89 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatistics.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatistics.java
@@ -23,14 +23,12 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import co.elastic.clients.elasticsearch._types.query_dsl.Query;
 
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.jackrabbit.guava.common.base.Ticker;
+import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorHelper;
 import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureConverter;
 import org.apache.jackrabbit.oak.plugins.index.elastic.util.ElasticIndexUtils;
 import org.apache.jackrabbit.oak.plugins.index.search.IndexStatistics;
@@ -69,12 +67,10 @@ public class ElasticIndexStatistics implements 
IndexStatistics {
     private static final String REFRESH_SECONDS = 
"oak.elastic.statsRefreshSeconds";
     private static final Long REFRESH_SECONDS_DEFAULT = 60L;
 
-    private static final ExecutorService REFRESH_EXECUTOR = new 
ThreadPoolExecutor(
-            0, 4, 60L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(),
-            
BasicThreadFactory.builder().namingPattern("elastic-statistics-cache-refresh-thread-%d").daemon().build()
-    );
+    private static final int REFRESH_POOL_SIZE = 4;
 
+    private static final ExecutorService REFRESH_EXECUTOR = 
ExecutorHelper.linkedQueueExecutor(
+            REFRESH_POOL_SIZE, "elastic-statistics-cache-refresh-%d");
     private final ElasticConnection elasticConnection;
     private final ElasticIndexDefinition indexDefinition;
     private final LoadingCache<StatsRequestDescriptor, Integer> countCache;
diff --git 
a/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/ExtractedTextCache.java
 
b/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/ExtractedTextCache.java
index be75b64f51..57911f6a2f 100644
--- 
a/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/ExtractedTextCache.java
+++ 
b/oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/ExtractedTextCache.java
@@ -32,12 +32,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.jackrabbit.guava.common.cache.Cache;
 import org.apache.jackrabbit.guava.common.cache.CacheBuilder;
@@ -45,6 +42,7 @@ import org.apache.jackrabbit.guava.common.cache.Weigher;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorHelper;
 import org.apache.jackrabbit.oak.plugins.index.fulltext.ExtractedText;
 import 
org.apache.jackrabbit.oak.plugins.index.fulltext.ExtractedText.ExtractionResult;
 import 
org.apache.jackrabbit.oak.plugins.index.fulltext.PreExtractedTextProvider;
@@ -64,8 +62,8 @@ public class ExtractedTextCache {
             Boolean.getBoolean("oak.extracted.cacheOnlySuccess");
     private static final int EXTRACTION_TIMEOUT_SECONDS =
             Integer.getInteger("oak.extraction.timeoutSeconds", 60);
-    private static final int EXTRACTION_MAX_THREADS =
-            Integer.getInteger("oak.extraction.maxThreads", 10);
+    private static final int EXTRACTION_POOL_SIZE =
+            Integer.getInteger("oak.extraction.poolSize", 1);
     private static final boolean EXTRACT_IN_CALLER_THREAD =
             Boolean.getBoolean("oak.extraction.inCallerThread");
     private static final boolean EXTRACT_FORGET_TIMEOUT =
@@ -346,26 +344,9 @@ public class ExtractedTextCache {
             return;
         }
         log.debug("ExtractedTextCache createExecutor {}", this);
-        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 
EXTRACTION_MAX_THREADS,
-                60L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(), new ThreadFactory() {
-            private final AtomicInteger counter = new AtomicInteger();
-            private final Thread.UncaughtExceptionHandler handler = (t, e) -> 
log.warn("Error occurred in asynchronous processing ", e);
-            @Override
-            public Thread newThread(@NotNull Runnable r) {
-                Thread thread = new Thread(r, createName());
-                thread.setDaemon(true);
-                thread.setPriority(Thread.MIN_PRIORITY);
-                thread.setUncaughtExceptionHandler(handler);
-                return thread;
-            }
-
-            private String createName() {
-                int index = counter.getAndIncrement();
-                return "oak binary text extractor" + (index == 0 ? "" : " " + 
index);
-            }
-        });
-        executor.setKeepAliveTime(1, TimeUnit.MINUTES);
+        ThreadPoolExecutor executor = ExecutorHelper.linkedQueueExecutor(
+                EXTRACTION_POOL_SIZE, "oak-binary-text-extractor-%d", 
+                (t, e) -> log.warn("Error occurred in asynchronous processing 
", e));
         executor.allowCoreThreadTimeOut(true);
         executorService = executor;
     }

Reply via email to