This is an automated email from the ASF dual-hosted git repository.
daim pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1144153681 OAK-12025 : replaced Guava's SameThreadExecutorService
(#2637)
1144153681 is described below
commit 1144153681044c50c7abba0b7bf38692392af8d4
Author: Rishabh Kumar <[email protected]>
AuthorDate: Mon Dec 1 19:45:35 2025 +0530
OAK-12025 : replaced Guava's SameThreadExecutorService (#2637)
---
.../plugins/blob/SameThreadExecutorService.java | 157 ---------------------
.../oak/plugins/blob/UploadStagingCache.java | 3 +-
2 files changed, 2 insertions(+), 158 deletions(-)
diff --git
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java
b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java
deleted file mode 100644
index 66f813574b..0000000000
---
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.jackrabbit.oak.plugins.blob;
-
-import
org.apache.jackrabbit.guava.common.util.concurrent.AbstractListeningExecutorService;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Class copied from the Guava 15, to make the AzureDataStore compatible with
- * the Guava 26 (where the SameThreadExecutorService is not present).
- *
- * TODO: Remove this class once the whole Oak is migrated to use Guava 26.
- */
-class SameThreadExecutorService extends AbstractListeningExecutorService {
- /**
- * Lock used whenever accessing the state variables
- * (runningTasks, shutdown, terminationCondition) of the executor
- */
- private final Lock lock = new ReentrantLock();
-
- /** Signaled after the executor is shutdown and running tasks are done */
- private final Condition termination = lock.newCondition();
-
- /*
- * Conceptually, these two variables describe the executor being in
- * one of three states:
- * - Active: shutdown == false
- * - Shutdown: runningTasks > 0 and shutdown == true
- * - Terminated: runningTasks == 0 and shutdown == true
- */
- private int runningTasks = 0;
- private boolean shutdown = false;
-
- @Override
- public void execute(Runnable command) {
- startTask();
- try {
- command.run();
- } finally {
- endTask();
- }
- }
-
- @Override
- public boolean isShutdown() {
- lock.lock();
- try {
- return shutdown;
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void shutdown() {
- lock.lock();
- try {
- shutdown = true;
- } finally {
- lock.unlock();
- }
- }
-
- // See sameThreadExecutor javadoc for unusual behavior of this method.
- @Override
- public List<Runnable> shutdownNow() {
- shutdown();
- return Collections.emptyList();
- }
-
- @Override
- public boolean isTerminated() {
- lock.lock();
- try {
- return shutdown && runningTasks == 0;
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException {
- long nanos = unit.toNanos(timeout);
- lock.lock();
- try {
- for (;;) {
- if (isTerminated()) {
- return true;
- } else if (nanos <= 0) {
- return false;
- } else {
- nanos = termination.awaitNanos(nanos);
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Checks if the executor has been shut down and increments the running
- * task count.
- *
- * @throws RejectedExecutionException if the executor has been previously
- * shutdown
- */
- private void startTask() {
- lock.lock();
- try {
- if (isShutdown()) {
- throw new RejectedExecutionException("Executor already
shutdown");
- }
- runningTasks++;
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Decrements the running task count.
- */
- private void endTask() {
- lock.lock();
- try {
- runningTasks--;
- if (isTerminated()) {
- termination.signalAll();
- }
- } finally {
- lock.unlock();
- }
- }
-}
diff --git
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
index 36f1a8a6e4..b8f34ebf49 100644
---
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
+++
b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
@@ -52,6 +52,7 @@ import
org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorUtils;
import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
import org.apache.jackrabbit.oak.stats.CounterStats;
import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
@@ -404,7 +405,7 @@ public class UploadStagingCache implements Closeable {
result.completeExceptionally(t);
retryQueue.add(id);
}
- }, new SameThreadExecutorService());
+ }, ExecutorUtils.newDirectExecutorService());
LOG.debug("File [{}] scheduled for upload [{}]", upload, result);
} catch (Exception e) {
LOG.error("Error staging file for upload [{}]", upload, e);