bhabegger commented on code in PR #2679:
URL: https://github.com/apache/jackrabbit-oak/pull/2679#discussion_r2681137910


##########
oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/concurrent/NamedThreadFactory.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.commons.concurrent;
+
+import org.jspecify.annotations.NonNull;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NamedThreadFactory implements ThreadFactory {

Review Comment:
   I thought about it and came to the conclusion that the class could be useful 
by itself. But either way is fine by me.



##########
oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/ExtractedTextCache.java:
##########
@@ -346,26 +344,7 @@ private synchronized void createExecutor() {
             return;
         }
         log.debug("ExtractedTextCache createExecutor {}", this);
-        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 
EXTRACTION_MAX_THREADS,
-                60L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(), new ThreadFactory() {
-            private final AtomicInteger counter = new AtomicInteger();
-            private final Thread.UncaughtExceptionHandler handler = (t, e) -> 
log.warn("Error occurred in asynchronous processing ", e);
-            @Override
-            public Thread newThread(@NotNull Runnable r) {
-                Thread thread = new Thread(r, createName());
-                thread.setDaemon(true);
-                thread.setPriority(Thread.MIN_PRIORITY);
-                thread.setUncaughtExceptionHandler(handler);
-                return thread;
-            }
-
-            private String createName() {
-                int index = counter.getAndIncrement();
-                return "oak binary text extractor" + (index == 0 ? "" : " " + 
index);
-            }
-        });
-        executor.setKeepAliveTime(1, TimeUnit.MINUTES);
+        ThreadPoolExecutor executor = 
ExecutorHelper.linkedQueueExecutor(EXTRACTION_MAX_THREADS, 
"oak-binary-text-extractor-%d", (t, e) -> log.warn("Error occurred in 
asynchronous processing ", e));

Review Comment:
   same :)



##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java:
##########
@@ -174,25 +172,7 @@ protected void bindIndexInfoProviders(IndexInfoServiceImpl 
indexInfoService) {
     }
 
     private ThreadPoolExecutor createExecutor() {
-        ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 5, 60L, 
TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
-            private final AtomicInteger counter = new AtomicInteger();
-            private final Thread.UncaughtExceptionHandler handler =
-                    (t, e) -> log.warn("Error occurred in asynchronous 
processing ", e);
-
-            @Override
-            public Thread newThread(@NotNull Runnable r) {
-                Thread thread = new Thread(r, createName());
-                thread.setDaemon(true);
-                thread.setPriority(Thread.MIN_PRIORITY);
-                thread.setUncaughtExceptionHandler(handler);
-                return thread;
-            }
-
-            private String createName() {
-                return "oak-lucene-" + counter.getAndIncrement();
-            }
-        });
+        ThreadPoolExecutor executor = ExecutorHelper.linkedQueueExecutor(5, 
"oak-lucene-%d", (t, e) -> log.warn("Error occurred in asynchronous processing 
", e));

Review Comment:
   In fat, this is part of the issue. In the original code the number of 
threads is either 0 or 1. It will never go above 1 as the  maximumPoolSize is 
ignored in case of an unbounded queue (as per 
https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ThreadPoolExecutor.html):
   
   > 2. Unbounded queues. Using an unbounded queue (for example a 
[LinkedBlockingQueue](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/LinkedBlockingQueue.html)
 without a predefined capacity) will cause new tasks to wait in the queue when 
all corePoolSize threads are busy. Thus, no more than corePoolSize threads will 
ever be created. (And _**the value of the maximumPoolSize therefore doesn't 
have any effect.**_) This may be appropriate when each task is completely 
independent of others, so tasks cannot affect each others execution; for 
example, in a web page server. While this style of queuing can be useful in 
smoothing out transient bursts of requests, it admits the possibility of 
unbounded work queue growth when commands continue to arrive on average faster 
than they can be processed.
   
   (If fact in this particular case corePoolSize 0 it will go above but only to 
1 thread).
   



##########
oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java:
##########
@@ -128,26 +126,8 @@ public Downloader(int concurrency, int connectTimeoutMs, 
int readTimeoutMs, int
             this.checksumAlgorithm = null;
         }
         this.bufferSize = bufferSize;
-
-        // The maximum number of threads in each executor service,
-        // when using a LinkedBlockingQueue(), is corePoolSize.
-        // all other tasks are kept in the LinkedBlockingQueue, which
-        // is unbounded.
-        // (Using a bounded queue, such as SynchronousQueue,
-        // would result in RejectedExecutionHandler).
-        // We want to keep things simple and don't want
-        // to use back presssure or other mechanisms.
-        // So in summary, corePoolSize threads are used, per service.
-        this.executorService = new ThreadPoolExecutor(
-                corePoolSize, concurrency, 60L, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(),
-                
BasicThreadFactory.builder().namingPattern("downloader-%d").daemon().build()
-        );
-        this.executorServiceForParts = new ThreadPoolExecutor(
-                corePoolSize, concurrency, 60L, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(),
-                
BasicThreadFactory.builder().namingPattern("partDownloader-%d").daemon().build()
-        );
+        this.executorService = 
ExecutorHelper.linkedQueueExecutor(corePoolSize, "downloader-%d");

Review Comment:
   same :)



##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatistics.java:
##########
@@ -69,12 +67,7 @@ public class ElasticIndexStatistics implements 
IndexStatistics {
     private static final String REFRESH_SECONDS = 
"oak.elastic.statsRefreshSeconds";
     private static final Long REFRESH_SECONDS_DEFAULT = 60L;
 
-    private static final ExecutorService REFRESH_EXECUTOR = new 
ThreadPoolExecutor(
-            0, 4, 60L, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(),
-            
BasicThreadFactory.builder().namingPattern("elastic-statistics-cache-refresh-thread-%d").daemon().build()
-    );
-
+    private static final ExecutorService REFRESH_EXECUTOR = 
ExecutorHelper.linkedQueueExecutor(4, "elastic-statistics-cache-refresh-%d");

Review Comment:
   same :)



##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java:
##########
@@ -174,25 +172,7 @@ protected void bindIndexInfoProviders(IndexInfoServiceImpl 
indexInfoService) {
     }
 
     private ThreadPoolExecutor createExecutor() {
-        ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 5, 60L, 
TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
-            private final AtomicInteger counter = new AtomicInteger();
-            private final Thread.UncaughtExceptionHandler handler =
-                    (t, e) -> log.warn("Error occurred in asynchronous 
processing ", e);
-
-            @Override
-            public Thread newThread(@NotNull Runnable r) {
-                Thread thread = new Thread(r, createName());
-                thread.setDaemon(true);
-                thread.setPriority(Thread.MIN_PRIORITY);
-                thread.setUncaughtExceptionHandler(handler);
-                return thread;
-            }
-
-            private String createName() {
-                return "oak-lucene-" + counter.getAndIncrement();
-            }
-        });
+        ThreadPoolExecutor executor = ExecutorHelper.linkedQueueExecutor(5, 
"oak-lucene-%d", (t, e) -> log.warn("Error occurred in asynchronous processing 
", e));

Review Comment:
   So the question is: do we want 1 thread (this would have the code running as 
it currently is) or 5 threads (which seems to have been the intent in case of 
load)....



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to