This is an automated email from the ASF dual-hosted git repository.

daim pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7342e58d2e OAK-11918 : removed Guava's ListeningExecutorService (#2638)
7342e58d2e is described below

commit 7342e58d2e764000d5f43ff157e884b2af7ab9dd
Author: Rishabh Kumar <[email protected]>
AuthorDate: Tue Dec 2 13:45:32 2025 +0530

    OAK-11918 : removed Guava's ListeningExecutorService (#2638)
---
 .../blob/AbstractSharedCachingDataStore.java       |  6 ++--
 .../oak/plugins/blob/CompositeDataStoreCache.java  |  6 ++--
 .../oak/plugins/blob/UploadStagingCache.java       | 35 +++++++++-------------
 .../oak/plugins/blob/CachingDataStoreTest.java     |  2 +-
 .../blob/ConsolidatedDataStoreStatsTest.java       |  2 +-
 .../oak/commons/jmx/ManagementOperationTest.java   |  9 +++---
 6 files changed, 24 insertions(+), 36 deletions(-)

diff --git 
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
 
b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
index 0c83d5b368..569f45cd71 100644
--- 
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
+++ 
b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
@@ -58,8 +58,6 @@ import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import 
org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService;
-
 /**
  * Cache files locally and stage files locally for async uploads.
  * Configuration:
@@ -137,7 +135,7 @@ public abstract class AbstractSharedCachingDataStore 
extends AbstractDataStore
      */
     protected AbstractSharedBackend backend;
 
-    protected ListeningExecutorService listeningExecutor;
+    protected ExecutorService executorService;
 
     protected ScheduledExecutorService schedulerExecutor;
 
@@ -174,7 +172,7 @@ public abstract class AbstractSharedCachingDataStore 
extends AbstractDataStore
                 @Override public void adopt(File f, File moved) throws 
IOException {
                     FileUtils.moveFile(f, moved);
                 }
-            }, statisticsProvider, listeningExecutor, schedulerExecutor, 
executor, stagingPurgeInterval,
+            }, statisticsProvider, executorService, schedulerExecutor, 
executor, stagingPurgeInterval,
                 stagingRetryInterval);
     }
 
diff --git 
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
 
b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
index 318940dc60..d1605d326c 100644
--- 
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
+++ 
b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
@@ -35,8 +35,6 @@ import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import 
org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService;
-
 /**
  */
 public class CompositeDataStoreCache extends AbstractCache<String, File> 
implements Closeable {
@@ -62,7 +60,7 @@ public class CompositeDataStoreCache extends 
AbstractCache<String, File> impleme
 
     public CompositeDataStoreCache(String path, File home, long size, int 
uploadSplitPercentage,
         int uploadThreads, CacheLoader<String, InputStream> loader, final 
StagingUploader uploader,
-        StatisticsProvider statsProvider, ListeningExecutorService 
listeningExecutor,
+        StatisticsProvider statsProvider, ExecutorService executorService,
         ScheduledExecutorService scheduledExecutor /* purge scheduled executor 
*/,
         ExecutorService executor /* File cache executor */,
         int purgeInterval /* async purge interval secs */,
@@ -82,7 +80,7 @@ public class CompositeDataStoreCache extends 
AbstractCache<String, File> impleme
                 uploadSplitPercentage, uploadThreads);
         this.stagingCache = UploadStagingCache
             .build(directory, home, uploadThreads, uploadSize, uploader, null, 
statsProvider,
-                listeningExecutor, scheduledExecutor, purgeInterval, 
stagingRetryInterval);
+                executorService, scheduledExecutor, purgeInterval, 
stagingRetryInterval);
         this.downloadCache = FileCache.build(fileCacheSize, directory, loader, 
executor);
         stagingCache.setDownloadCache(downloadCache);
     }
diff --git 
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
 
b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
index b8f34ebf49..4210c59071 100644
--- 
a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
+++ 
b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
@@ -32,8 +32,10 @@ import java.util.List;
 import java.util.Optional;
 import java.util.StringJoiner;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -44,15 +46,9 @@ import java.util.stream.Stream;
 
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.jackrabbit.guava.common.cache.Weigher;
-import org.apache.jackrabbit.guava.common.util.concurrent.FutureCallback;
-import org.apache.jackrabbit.guava.common.util.concurrent.Futures;
-import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture;
-import 
org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService;
-import org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors;
 import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.oak.commons.StringUtils;
 import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
-import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorUtils;
 import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
 import org.apache.jackrabbit.oak.stats.CounterStats;
 import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
@@ -105,7 +101,7 @@ public class UploadStagingCache implements Closeable {
     /**
      * Executor for async uploads
      */
-    private ListeningExecutorService executor;
+    private ExecutorService executor;
 
     /**
      * Scheduled executor for build and remove
@@ -155,7 +151,7 @@ public class UploadStagingCache implements Closeable {
 
     private UploadStagingCache(File dir, File home, int uploadThreads, long 
size /* bytes */,
         StagingUploader uploader, @Nullable FileCache cache, 
StatisticsProvider statisticsProvider,
-        @Nullable ListeningExecutorService executor,
+        @Nullable ExecutorService executor,
         @Nullable ScheduledExecutorService scheduledExecutor,
         int purgeInterval /* secs */, int retryInterval /* secs */) {
 
@@ -163,9 +159,8 @@ public class UploadStagingCache implements Closeable {
         this.size = size;
         this.executor = executor;
         if (executor == null) {
-            this.executor = MoreExecutors.listeningDecorator(Executors
-                .newFixedThreadPool(uploadThreads,
-                        
BasicThreadFactory.builder().namingPattern("oak-ds-async-upload-thread-%d").build()));
+            this.executor = Executors.newFixedThreadPool(uploadThreads,
+                    
BasicThreadFactory.builder().namingPattern("oak-ds-async-upload-thread-%d").build());
         }
 
         this.scheduledExecutor = scheduledExecutor;
@@ -200,7 +195,7 @@ public class UploadStagingCache implements Closeable {
 
     public static UploadStagingCache build(File dir, File home, int 
uploadThreads, long size
         /* bytes */, StagingUploader uploader, @Nullable FileCache cache,
-        StatisticsProvider statisticsProvider, @Nullable 
ListeningExecutorService executor,
+        StatisticsProvider statisticsProvider, @Nullable ExecutorService 
executor,
         @Nullable ScheduledExecutorService scheduledExecutor, int 
purgeInterval /* secs */,
         int retryInterval /* secs */) {
         if (size > 0) {
@@ -362,7 +357,7 @@ public class UploadStagingCache implements Closeable {
 
         try {
             // create an async job
-            ListenableFuture<Integer> future = executor.submit(() -> {
+            CompletableFuture<Integer> future = 
CompletableFuture.supplyAsync(() -> {
                 try (TimerStats.Context uploadContext = 
cacheStats.startUpLoaderTimer()) {
 
                     uploader.write(id, upload);
@@ -371,13 +366,13 @@ public class UploadStagingCache implements Closeable {
                     return 1;
                 } catch (Exception e) {
                     LOG.error("Error adding file to backend", e);
-                    throw e;
+                    throw new CompletionException(e);
                 }
-            });
+            }, executor);
 
             // Add a callback to the returned Future object for handling 
success and error
-            Futures.addCallback(future, new FutureCallback<>() {
-                @Override public void onSuccess(@Nullable Integer r) {
+            future.whenComplete( (r, t) -> {
+                if (t == null) {
                     LOG.info("Successfully added [{}], [{}]", id, upload);
 
                     try {
@@ -398,14 +393,12 @@ public class UploadStagingCache implements Closeable {
                         LOG.warn("Error in cleaning up [{}] from staging", 
upload);
                     }
                     result.complete(r);
-                }
-
-                @Override public void onFailure(Throwable t) {
+                } else {
                     LOG.error("Error adding [{}] with file [{}] to backend", 
id, upload, t);
                     result.completeExceptionally(t);
                     retryQueue.add(id);
                 }
-            }, ExecutorUtils.newDirectExecutorService());
+            });
             LOG.debug("File [{}] scheduled for upload [{}]", upload, result);
         } catch (Exception e) {
             LOG.error("Error staging file for upload [{}]", upload, e);
diff --git 
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
 
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
index c8dd011a0a..eb046262ab 100644
--- 
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
+++ 
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
@@ -130,7 +130,7 @@ public class CachingDataStoreTest extends 
AbstractDataStoreCacheTest {
         dataStore.setStatisticsProvider(statsProvider);
         dataStore.setCacheSize(cacheSize);
         dataStore.setStagingSplitPercentage(uploadSplit);
-        dataStore.listeningExecutor = listeningExecutor;
+        dataStore.executorService = listeningExecutor;
         dataStore.schedulerExecutor = scheduledExecutor;
         dataStore.executor = ExecutorUtils.newDirectExecutorService();
         dsPath = new File(root.getAbsolutePath(), "ds").getAbsolutePath();
diff --git 
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ConsolidatedDataStoreStatsTest.java
 
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ConsolidatedDataStoreStatsTest.java
index 29b94867e4..45e1e71262 100644
--- 
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ConsolidatedDataStoreStatsTest.java
+++ 
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/ConsolidatedDataStoreStatsTest.java
@@ -137,7 +137,7 @@ public class ConsolidatedDataStoreStatsTest extends 
AbstractDataStoreCacheTest {
             }
         };
         dataStore.setStatisticsProvider(statsProvider);
-        dataStore.listeningExecutor = executor;
+        dataStore.executorService = executor;
         dataStore.schedulerExecutor = scheduledExecutor;
         dataStore.init(root.getAbsolutePath());
 
diff --git 
a/oak-core-spi/src/test/java/org/apache/jackrabbit/oak/commons/jmx/ManagementOperationTest.java
 
b/oak-core-spi/src/test/java/org/apache/jackrabbit/oak/commons/jmx/ManagementOperationTest.java
index 58000fb1d3..b62cc0b876 100644
--- 
a/oak-core-spi/src/test/java/org/apache/jackrabbit/oak/commons/jmx/ManagementOperationTest.java
+++ 
b/oak-core-spi/src/test/java/org/apache/jackrabbit/oak/commons/jmx/ManagementOperationTest.java
@@ -21,9 +21,7 @@ package org.apache.jackrabbit.oak.commons.jmx;
 
 import static java.lang.Thread.currentThread;
 import static java.lang.Thread.sleep;
-import static java.util.concurrent.Executors.newCachedThreadPool;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static 
org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors.listeningDecorator;
 import static 
org.apache.jackrabbit.oak.api.jmx.RepositoryManagementMBean.StatusCode.FAILED;
 import static 
org.apache.jackrabbit.oak.api.jmx.RepositoryManagementMBean.StatusCode.RUNNING;
 import static 
org.apache.jackrabbit.oak.api.jmx.RepositoryManagementMBean.StatusCode.SUCCEEDED;
@@ -34,12 +32,13 @@ import static org.junit.Assert.fail;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeoutException;
 
 import javax.management.openmbean.CompositeData;
 
-import 
org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService;
 import org.apache.jackrabbit.oak.commons.internal.concurrent.ExecutorUtils;
 import org.apache.jackrabbit.oak.commons.jmx.ManagementOperation.Status;
 import org.junit.After;
@@ -47,11 +46,11 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class ManagementOperationTest {
-    private ListeningExecutorService executor;
+    private ExecutorService executor;
 
     @Before
     public void setup() {
-        executor = listeningDecorator(newCachedThreadPool());
+        executor = Executors.newCachedThreadPool();
     }
 
     @After

Reply via email to