This is an automated email from the ASF dual-hosted git repository. daim pushed a commit to branch OAK-12025 in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit d994f776c0f4520ccbb08f8710ec80c749868a19 Author: rishabhdaim <[email protected]> AuthorDate: Mon Dec 1 13:46:58 2025 +0530 OAK-12025 : replaced Guava's SameThreadExecutorService --- .../plugins/blob/SameThreadExecutorService.java | 157 --------------------- .../oak/plugins/blob/UploadStagingCache.java | 3 +- 2 files changed, 2 insertions(+), 158 deletions(-) diff --git a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java deleted file mode 100644 index 66f813574b..0000000000 --- a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.jackrabbit.oak.plugins.blob; - -import org.apache.jackrabbit.guava.common.util.concurrent.AbstractListeningExecutorService; - -import java.util.Collections; -import java.util.List; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Class copied from the Guava 15, to make the AzureDataStore compatible with - * the Guava 26 (where the SameThreadExecutorService is not present). - * - * TODO: Remove this class once the whole Oak is migrated to use Guava 26. - */ -class SameThreadExecutorService extends AbstractListeningExecutorService { - /** - * Lock used whenever accessing the state variables - * (runningTasks, shutdown, terminationCondition) of the executor - */ - private final Lock lock = new ReentrantLock(); - - /** Signaled after the executor is shutdown and running tasks are done */ - private final Condition termination = lock.newCondition(); - - /* - * Conceptually, these two variables describe the executor being in - * one of three states: - * - Active: shutdown == false - * - Shutdown: runningTasks > 0 and shutdown == true - * - Terminated: runningTasks == 0 and shutdown == true - */ - private int runningTasks = 0; - private boolean shutdown = false; - - @Override - public void execute(Runnable command) { - startTask(); - try { - command.run(); - } finally { - endTask(); - } - } - - @Override - public boolean isShutdown() { - lock.lock(); - try { - return shutdown; - } finally { - lock.unlock(); - } - } - - @Override - public void shutdown() { - lock.lock(); - try { - shutdown = true; - } finally { - lock.unlock(); - } - } - - // See sameThreadExecutor javadoc for unusual behavior of this method. - @Override - public List<Runnable> shutdownNow() { - shutdown(); - return Collections.emptyList(); - } - - @Override - public boolean isTerminated() { - lock.lock(); - try { - return shutdown && runningTasks == 0; - } finally { - lock.unlock(); - } - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException { - long nanos = unit.toNanos(timeout); - lock.lock(); - try { - for (;;) { - if (isTerminated()) { - return true; - } else if (nanos <= 0) { - return false; - } else { - nanos = termination.awaitNanos(nanos); - } - } - } finally { - lock.unlock(); - } - } - - /** - * Checks if the executor has been shut down and increments the running - * task count. - * - * @throws RejectedExecutionException if the executor has been previously - * shutdown - */ - private void startTask() { - lock.lock(); - try { - if (isShutdown()) { - throw new RejectedExecutionException("Executor already shutdown"); - } - runningTasks++; - } finally { - lock.unlock(); - } - } - - /** - * Decrements the running task count. - */ - private void endTask() { - lock.lock(); - try { - runningTasks--; - if (isTerminated()) { - termination.signalAll(); - } - } finally { - lock.unlock(); - } - } -} diff --git a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java index 36f1a8a6e4..b8f34ebf49 100644 --- a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java +++ b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java @@ -52,6 +52,7 @@ import org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors; import org.apache.jackrabbit.core.data.DataStoreException; import org.apache.jackrabbit.oak.commons.StringUtils; import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; +import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorUtils; import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean; import org.apache.jackrabbit.oak.stats.CounterStats; import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider; @@ -404,7 +405,7 @@ public class UploadStagingCache implements Closeable { result.completeExceptionally(t); retryQueue.add(id); } - }, new SameThreadExecutorService()); + }, ExecutorUtils.newDirectExecutorService()); LOG.debug("File [{}] scheduled for upload [{}]", upload, result); } catch (Exception e) { LOG.error("Error staging file for upload [{}]", upload, e);
