This is an automated email from the ASF dual-hosted git repository.

daim pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 16420737b6 OAK-11924 : replaced Guava's 
AbstractListeningExecutorService with native java implementation (#2646)
16420737b6 is described below

commit 16420737b6d71352bc2fd95241702b2856e1a129
Author: Rishabh Kumar <[email protected]>
AuthorDate: Thu Dec 4 21:21:20 2025 +0530

    OAK-11924 : replaced Guava's AbstractListeningExecutorService with native 
java implementation (#2646)
---
 .../plugins/blob/AbstractDataStoreCacheTest.java   | 63 ++++++++++++----------
 .../jackrabbit/oak/plugins/blob/FileCacheTest.java |  7 ++-
 2 files changed, 38 insertions(+), 32 deletions(-)

diff --git 
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
 
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
index 6df2eb8140..6b4c720b87 100644
--- 
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
+++ 
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
@@ -37,12 +37,17 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.AbstractExecutorService;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
@@ -51,16 +56,10 @@ import org.apache.jackrabbit.core.data.DataIdentifier;
 import org.apache.jackrabbit.core.data.DataRecord;
 import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.guava.common.cache.CacheLoader;
-import 
org.apache.jackrabbit.guava.common.util.concurrent.AbstractListeningExecutorService;
-import org.apache.jackrabbit.guava.common.util.concurrent.FutureCallback;
-import org.apache.jackrabbit.guava.common.util.concurrent.Futures;
-import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture;
 import org.apache.jackrabbit.oak.commons.FileIOUtils;
-import org.apache.jackrabbit.oak.commons.internal.concurrent.DirectExecutor;
 import org.apache.jackrabbit.oak.spi.blob.AbstractDataRecord;
 import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
 import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -223,10 +222,10 @@ public class AbstractDataStoreCacheTest {
     }
 
 
-    static class TestExecutor extends AbstractListeningExecutorService {
+    static class TestExecutor extends AbstractExecutorService {
         private final CountDownLatch afterLatch;
         private final ExecutorService delegate;
-        final List<ListenableFuture<Integer>> futures;
+        final List<CompletableFuture<Integer>> futures;
 
         public TestExecutor(int threads, CountDownLatch beforeLatch, 
CountDownLatch afterLatch,
             CountDownLatch afterExecuteLatch) {
@@ -235,13 +234,19 @@ public class AbstractDataStoreCacheTest {
             this.afterLatch = afterLatch;
         }
 
-        @Override @NotNull public ListenableFuture<?> submit(@NotNull Callable 
task) {
+        @Override @NotNull public Future<Integer> submit(@NotNull Callable 
task) {
             LOG.trace("Before submitting to super....");
-            ListenableFuture<Integer> submit = super.submit(task);
+            CompletableFuture<Integer> submit = 
CompletableFuture.supplyAsync(() -> {
+                try {
+                    return (Integer) task.call();
+                } catch (Exception e) {
+                    throw new CompletionException(e);
+                }
+            }, delegate);
             LOG.trace("After submitting to super....");
 
             futures.add(submit);
-            Futures.addCallback(submit, new 
TestFutureCallback<Integer>(afterLatch), DirectExecutor.INSTANCE);
+            submit.whenComplete(new TestFutureCallback<>(afterLatch));
             LOG.trace("Added callback");
 
             return submit;
@@ -272,31 +277,33 @@ public class AbstractDataStoreCacheTest {
             return delegate.awaitTermination(timeout, unit);
         }
 
-        static class TestFutureCallback<Integer> implements FutureCallback {
+        static class TestFutureCallback<Integer> implements 
BiConsumer<Integer, Throwable> {
             private final CountDownLatch latch;
 
             public TestFutureCallback(CountDownLatch latch) {
                 this.latch = latch;
             }
 
-            @Override public void onSuccess(@Nullable Object result) {
-                try {
-                    LOG.trace("Waiting for latch in callback");
-                    latch.await(100, TimeUnit.MILLISECONDS);
-                    LOG.trace("Acquired latch in onSuccess");
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
+            @Override
+            public void accept(Integer integer, Throwable throwable) {
+                if (throwable == null) {
+                    try {
+                        LOG.trace("Waiting for latch in callback");
+                        latch.await(100, TimeUnit.MILLISECONDS);
+                        LOG.trace("Acquired latch in onSuccess");
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                } else {
+                    try {
+                        LOG.trace("Waiting for latch onFailure in callback");
+                        latch.await(100, TimeUnit.MILLISECONDS);
+                        LOG.trace("Acquired latch in onFailure");
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
                 }
-            }
 
-            @Override public void onFailure(@NotNull Throwable t) {
-                try {
-                    LOG.trace("Waiting for latch onFailure in callback");
-                    latch.await(100, TimeUnit.MILLISECONDS);
-                    LOG.trace("Acquired latch in onFailure");
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
             }
         }
     }
diff --git 
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java
 
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java
index 2042c5bc42..6972f5be2e 100644
--- 
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java
+++ 
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java
@@ -31,7 +31,6 @@ import 
org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors;
 import org.apache.jackrabbit.oak.commons.StringUtils;
 import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
 import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureUtils;
-import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureConverter;
 import org.apache.jackrabbit.oak.commons.pio.Closer;
 import org.junit.After;
 import org.junit.Before;
@@ -81,7 +80,7 @@ public class FileCacheTest extends AbstractDataStoreCacheTest 
{
         beforeLatch.countDown();
         afterLatch.countDown();
         cache = FileCache.build(4 * 1024/* KB */, root, loader, executor);
-        
FutureUtils.successfulAsList(FutureConverter.toCompletableFuture(executor.futures)).get();
+        FutureUtils.successfulAsList(executor.futures).get();
 
         closer.register(cache);
 
@@ -438,7 +437,7 @@ public class FileCacheTest extends 
AbstractDataStoreCacheTest {
         cache = FileCache.build(4 * 1024/* bytes */, root, loader, executor);
         closer.register(cache);
         afterExecuteLatch.await();
-        
FutureUtils.successfulAsList(FutureConverter.toCompletableFuture(executor.futures)).get();
+        FutureUtils.successfulAsList(executor.futures).get();
         LOG.info("Cache rebuilt");
 
         assertCacheIfPresent(0, cache, f);
@@ -473,7 +472,7 @@ public class FileCacheTest extends 
AbstractDataStoreCacheTest {
         cache = FileCache.build(4 * 1024/* bytes */, root, loader, executor);
         closer.register(cache);
         afterExecuteLatch.await();
-        
FutureUtils.successfulAsList(FutureConverter.toCompletableFuture(executor.futures)).get();
+        FutureUtils.successfulAsList(executor.futures).get();
         LOG.info("Cache rebuilt");
 
         assertCacheIfPresent(0, cache, f);

Reply via email to