This is an automated email from the ASF dual-hosted git repository.
daim pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7342e58d2e OAK-11918 : removed Guava's ListeningExecutorService (#2638)
7342e58d2e is described below
commit 7342e58d2e764000d5f43ff157e884b2af7ab9dd
Author: Rishabh Kumar <[email protected]>
AuthorDate: Tue Dec 2 13:45:32 2025 +0530
OAK-11918 : removed Guava's ListeningExecutorService (#2638)
---
.../blob/AbstractSharedCachingDataStore.java | 6 ++--
.../oak/plugins/blob/CompositeDataStoreCache.java | 6 ++--
.../oak/plugins/blob/UploadStagingCache.java | 35 +++++++++-------------
.../oak/plugins/blob/CachingDataStoreTest.java | 2 +-
.../blob/ConsolidatedDataStoreStatsTest.java | 2 +-
.../oak/commons/jmx/ManagementOperationTest.java | 9 +++---
6 files changed, 24 insertions(+), 36 deletions(-)
diff --git
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
index 0c83d5b368..569f45cd71 100644
---
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
+++
b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
@@ -58,8 +58,6 @@ import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import
org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService;
-
/**
* Cache files locally and stage files locally for async uploads.
* Configuration:
@@ -137,7 +135,7 @@ public abstract class AbstractSharedCachingDataStore
extends AbstractDataStore
*/
protected AbstractSharedBackend backend;
- protected ListeningExecutorService listeningExecutor;
+ protected ExecutorService executorService;
protected ScheduledExecutorService schedulerExecutor;
@@ -174,7 +172,7 @@ public abstract class AbstractSharedCachingDataStore
extends AbstractDataStore
@Override public void adopt(File f, File moved) throws
IOException {
FileUtils.moveFile(f, moved);
}
- }, statisticsProvider, listeningExecutor, schedulerExecutor,
executor, stagingPurgeInterval,
+ }, statisticsProvider, executorService, schedulerExecutor,
executor, stagingPurgeInterval,
stagingRetryInterval);
}
diff --git
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
index 318940dc60..d1605d326c 100644
---
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
+++
b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
@@ -35,8 +35,6 @@ import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import
org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService;
-
/**
*/
public class CompositeDataStoreCache extends AbstractCache<String, File>
implements Closeable {
@@ -62,7 +60,7 @@ public class CompositeDataStoreCache extends
AbstractCache<String, File> impleme
public CompositeDataStoreCache(String path, File home, long size, int
uploadSplitPercentage,
int uploadThreads, CacheLoader<String, InputStream> loader, final
StagingUploader uploader,
- StatisticsProvider statsProvider, ListeningExecutorService
listeningExecutor,
+ StatisticsProvider statsProvider, ExecutorService executorService,
ScheduledExecutorService scheduledExecutor /* purge scheduled executor
*/,
ExecutorService executor /* File cache executor */,
int purgeInterval /* async purge interval secs */,
@@ -82,7 +80,7 @@ public class CompositeDataStoreCache extends
AbstractCache<String, File> impleme
uploadSplitPercentage, uploadThreads);
this.stagingCache = UploadStagingCache
.build(directory, home, uploadThreads, uploadSize, uploader, null,
statsProvider,
- listeningExecutor, scheduledExecutor, purgeInterval,
stagingRetryInterval);
+ executorService, scheduledExecutor, purgeInterval,
stagingRetryInterval);
this.downloadCache = FileCache.build(fileCacheSize, directory, loader,
executor);
stagingCache.setDownloadCache(downloadCache);
}
diff --git
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
index b8f34ebf49..4210c59071 100644
---
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
+++
b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
@@ -32,8 +32,10 @@ import java.util.List;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
@@ -44,15 +46,9 @@ import java.util.stream.Stream;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.jackrabbit.guava.common.cache.Weigher;
-import org.apache.jackrabbit.guava.common.util.concurrent.FutureCallback;
-import org.apache.jackrabbit.guava.common.util.concurrent.Futures;
-import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture;
-import
org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService;
-import org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
-import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorUtils;
import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
import org.apache.jackrabbit.oak.stats.CounterStats;
import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
@@ -105,7 +101,7 @@ public class UploadStagingCache implements Closeable {
/**
* Executor for async uploads
*/
- private ListeningExecutorService executor;
+ private ExecutorService executor;
/**
* Scheduled executor for build and remove
@@ -155,7 +151,7 @@ public class UploadStagingCache implements Closeable {
private UploadStagingCache(File dir, File home, int uploadThreads, long
size /* bytes */,
StagingUploader uploader, @Nullable FileCache cache,
StatisticsProvider statisticsProvider,
- @Nullable ListeningExecutorService executor,
+ @Nullable ExecutorService executor,
@Nullable ScheduledExecutorService scheduledExecutor,
int purgeInterval /* secs */, int retryInterval /* secs */) {
@@ -163,9 +159,8 @@ public class UploadStagingCache implements Closeable {
this.size = size;
this.executor = executor;
if (executor == null) {
- this.executor = MoreExecutors.listeningDecorator(Executors
- .newFixedThreadPool(uploadThreads,
-
BasicThreadFactory.builder().namingPattern("oak-ds-async-upload-thread-%d").build()));
+ this.executor = Executors.newFixedThreadPool(uploadThreads,
+
BasicThreadFactory.builder().namingPattern("oak-ds-async-upload-thread-%d").build());
}
this.scheduledExecutor = scheduledExecutor;
@@ -200,7 +195,7 @@ public class UploadStagingCache implements Closeable {
public static UploadStagingCache build(File dir, File home, int
uploadThreads, long size
/* bytes */, StagingUploader uploader, @Nullable FileCache cache,
- StatisticsProvider statisticsProvider, @Nullable
ListeningExecutorService executor,
+ StatisticsProvider statisticsProvider, @Nullable ExecutorService
executor,
@Nullable ScheduledExecutorService scheduledExecutor, int
purgeInterval /* secs */,
int retryInterval /* secs */) {
if (size > 0) {
@@ -362,7 +357,7 @@ public class UploadStagingCache implements Closeable {
try {
// create an async job
- ListenableFuture<Integer> future = executor.submit(() -> {
+ CompletableFuture<Integer> future =
CompletableFuture.supplyAsync(() -> {
try (TimerStats.Context uploadContext =
cacheStats.startUpLoaderTimer()) {
uploader.write(id, upload);
@@ -371,13 +366,13 @@ public class UploadStagingCache implements Closeable {
return 1;
} catch (Exception e) {
LOG.error("Error adding file to backend", e);
- throw e;
+ throw new CompletionException(e);
}
- });
+ }, executor);
// Add a callback to the returned Future object for handling
success and error
- Futures.addCallback(future, new FutureCallback<>() {
- @Override public void onSuccess(@Nullable Integer r) {
+ future.whenComplete( (r, t) -> {
+ if (t == null) {
LOG.info("Successfully added [{}], [{}]", id, upload);
try {
@@ -398,14 +393,12 @@ public class UploadStagingCache implements Closeable {
LOG.warn("Error in cleaning up [{}] from staging",
upload);
}
result.complete(r);
- }
-
- @Override public void onFailure(Throwable t) {
+ } else {
LOG.error("Error adding [{}] with file [{}] to backend",
id, upload, t);
result.completeExceptionally(t);
retryQueue.add(id);
}
- }, ExecutorUtils.newDirectExecutorService());
+ });
LOG.debug("File [{}] scheduled for upload [{}]", upload, result);
} catch (Exception e) {
LOG.error("Error staging file for upload [{}]", upload, e);
diff --git
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
index c8dd011a0a..eb046262ab 100644
---
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
+++
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
@@ -130,7 +130,7 @@ public class CachingDataStoreTest extends
AbstractDataStoreCacheTest {
dataStore.setStatisticsProvider(statsProvider);
dataStore.setCacheSize(cacheSize);
dataStore.setStagingSplitPercentage(uploadSplit);
- dataStore.listeningExecutor = listeningExecutor;
+ dataStore.executorService = listeningExecutor;
dataStore.schedulerExecutor = scheduledExecutor;
dataStore.executor = ExecutorUtils.newDirectExecutorService();
dsPath = new File(root.getAbsolutePath(), "ds").getAbsolutePath();
diff --git
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ConsolidatedDataStoreStatsTest.java
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ConsolidatedDataStoreStatsTest.java
index 29b94867e4..45e1e71262 100644
---
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ConsolidatedDataStoreStatsTest.java
+++
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ConsolidatedDataStoreStatsTest.java
@@ -137,7 +137,7 @@ public class ConsolidatedDataStoreStatsTest extends
AbstractDataStoreCacheTest {
}
};
dataStore.setStatisticsProvider(statsProvider);
- dataStore.listeningExecutor = executor;
+ dataStore.executorService = executor;
dataStore.schedulerExecutor = scheduledExecutor;
dataStore.init(root.getAbsolutePath());
diff --git
a/oak-core-spi/src/test/java/org/apache/jackrabbit/oak/commons/jmx/ManagementOperationTest.java
b/oak-core-spi/src/test/java/org/apache/jackrabbit/oak/commons/jmx/ManagementOperationTest.java
index 58000fb1d3..b62cc0b876 100644
---
a/oak-core-spi/src/test/java/org/apache/jackrabbit/oak/commons/jmx/ManagementOperationTest.java
+++
b/oak-core-spi/src/test/java/org/apache/jackrabbit/oak/commons/jmx/ManagementOperationTest.java
@@ -21,9 +21,7 @@ package org.apache.jackrabbit.oak.commons.jmx;
import static java.lang.Thread.currentThread;
import static java.lang.Thread.sleep;
-import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static
org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors.listeningDecorator;
import static
org.apache.jackrabbit.oak.api.jmx.RepositoryManagementMBean.StatusCode.FAILED;
import static
org.apache.jackrabbit.oak.api.jmx.RepositoryManagementMBean.StatusCode.RUNNING;
import static
org.apache.jackrabbit.oak.api.jmx.RepositoryManagementMBean.StatusCode.SUCCEEDED;
@@ -34,12 +32,13 @@ import static org.junit.Assert.fail;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeoutException;
import javax.management.openmbean.CompositeData;
-import
org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService;
import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorUtils;
import org.apache.jackrabbit.oak.commons.jmx.ManagementOperation.Status;
import org.junit.After;
@@ -47,11 +46,11 @@ import org.junit.Before;
import org.junit.Test;
public class ManagementOperationTest {
- private ListeningExecutorService executor;
+ private ExecutorService executor;
@Before
public void setup() {
- executor = listeningDecorator(newCachedThreadPool());
+ executor = Executors.newCachedThreadPool();
}
@After