joerghoh commented on code in PR #2679: URL: https://github.com/apache/jackrabbit-oak/pull/2679#discussion_r2677632182
########## oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/concurrent/ExecutorHelper.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.commons.concurrent; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class ExecutorHelper { + public static ThreadPoolExecutor linkedQueueExecutor(int poolSize, String namePattern, Thread.UncaughtExceptionHandler handler) { Review Comment: Javadoc missing ########## oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/concurrent/NamedThreadFactory.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.commons.concurrent; + +import org.jspecify.annotations.NonNull; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class NamedThreadFactory implements ThreadFactory { Review Comment: What about having it into the ExecutorHelper class directly? ########## oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexStatistics.java: ########## @@ -69,12 +67,7 @@ public class ElasticIndexStatistics implements IndexStatistics { private static final String REFRESH_SECONDS = "oak.elastic.statsRefreshSeconds"; private static final Long REFRESH_SECONDS_DEFAULT = 60L; - private static final ExecutorService REFRESH_EXECUTOR = new ThreadPoolExecutor( - 0, 4, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - BasicThreadFactory.builder().namingPattern("elastic-statistics-cache-refresh-thread-%d").daemon().build() - ); - + private static final ExecutorService REFRESH_EXECUTOR = ExecutorHelper.linkedQueueExecutor(4, "elastic-statistics-cache-refresh-%d"); Review Comment: same as above ########## oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/concurrent/NamedThreadFactory.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.commons.concurrent; + +import org.jspecify.annotations.NonNull; Review Comment: IIRC for the majority of Oak we (still) use the jetbrains annotations, can you switch to that too? (A potential migration to jspecify should be handled separately.) ########## oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java: ########## @@ -174,25 +172,7 @@ protected void bindIndexInfoProviders(IndexInfoServiceImpl indexInfoService) { } private ThreadPoolExecutor createExecutor() { - ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 5, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { - private final AtomicInteger counter = new AtomicInteger(); - private final Thread.UncaughtExceptionHandler handler = - (t, e) -> log.warn("Error occurred in asynchronous processing ", e); - - @Override - public Thread newThread(@NotNull Runnable r) { - Thread thread = new Thread(r, createName()); - thread.setDaemon(true); - thread.setPriority(Thread.MIN_PRIORITY); - thread.setUncaughtExceptionHandler(handler); - return thread; - } - - private String createName() { - return "oak-lucene-" + counter.getAndIncrement(); - } - }); + ThreadPoolExecutor executor = ExecutorHelper.linkedQueueExecutor(5, "oak-lucene-%d", (t, e) -> log.warn("Error occurred in asynchronous processing ", e)); Review Comment: This is not identical to the code you're removing, because with the change it will always have 5 threads running, while in the original it can go down to 0 threads if no work is required. ########## oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java: ########## @@ -128,26 +126,8 @@ public Downloader(int concurrency, int connectTimeoutMs, int readTimeoutMs, int this.checksumAlgorithm = null; } this.bufferSize = bufferSize; - - // The maximum number of threads in each executor service, - // when using a LinkedBlockingQueue(), is corePoolSize. - // all other tasks are kept in the LinkedBlockingQueue, which - // is unbounded. - // (Using a bounded queue, such as SynchronousQueue, - // would result in RejectedExecutionHandler). - // We want to keep things simple and don't want - // to use back presssure or other mechanisms. - // So in summary, corePoolSize threads are used, per service. - this.executorService = new ThreadPoolExecutor( - corePoolSize, concurrency, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - BasicThreadFactory.builder().namingPattern("downloader-%d").daemon().build() - ); - this.executorServiceForParts = new ThreadPoolExecutor( - corePoolSize, concurrency, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - BasicThreadFactory.builder().namingPattern("partDownloader-%d").daemon().build() - ); + this.executorService = ExecutorHelper.linkedQueueExecutor(corePoolSize, "downloader-%d"); Review Comment: same as above, the number of threads running consistently is different than before. ########## oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/concurrent/ExecutorHelper.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.commons.concurrent; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class ExecutorHelper { + public static ThreadPoolExecutor linkedQueueExecutor(int poolSize, String namePattern, Thread.UncaughtExceptionHandler handler) { + // The maximum number of threads in each executor service, + // when using a LinkedBlockingQueue(), is corePoolSize. + // all other tasks are kept in the LinkedBlockingQueue, which + // is unbounded. + // (Using a bounded queue, such as SynchronousQueue, + // would result in RejectedExecutionHandler). + // We want to keep things simple and don't want + // to use back pressure or other mechanisms. + // So in summary, corePoolSize threads are used, per service. + return new ThreadPoolExecutor(poolSize, poolSize, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), new NamedThreadFactory(namePattern, handler)); + } + + public static ExecutorService linkedQueueExecutor(int poolSize, String namePattern) { Review Comment: missing Javadoc ########## oak-search/src/main/java/org/apache/jackrabbit/oak/plugins/index/search/ExtractedTextCache.java: ########## @@ -346,26 +344,7 @@ private synchronized void createExecutor() { return; } log.debug("ExtractedTextCache createExecutor {}", this); - ThreadPoolExecutor executor = new ThreadPoolExecutor(1, EXTRACTION_MAX_THREADS, - 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), new ThreadFactory() { - private final AtomicInteger counter = new AtomicInteger(); - private final Thread.UncaughtExceptionHandler handler = (t, e) -> log.warn("Error occurred in asynchronous processing ", e); - @Override - public Thread newThread(@NotNull Runnable r) { - Thread thread = new Thread(r, createName()); - thread.setDaemon(true); - thread.setPriority(Thread.MIN_PRIORITY); - thread.setUncaughtExceptionHandler(handler); - return thread; - } - - private String createName() { - int index = counter.getAndIncrement(); - return "oak binary text extractor" + (index == 0 ? "" : " " + index); - } - }); - executor.setKeepAliveTime(1, TimeUnit.MINUTES); + ThreadPoolExecutor executor = ExecutorHelper.linkedQueueExecutor(EXTRACTION_MAX_THREADS, "oak-binary-text-extractor-%d", (t, e) -> log.warn("Error occurred in asynchronous processing ", e)); Review Comment: same as above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
