This is an automated email from the ASF dual-hosted git repository.

daim pushed a commit to branch OAK-11921
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit c332ea47d0440679af5cfdc4a11f05835a871a0a
Author: rishabhdaim <[email protected]>
AuthorDate: Thu Dec 4 17:06:32 2025 +0530

    OAK-11924 : replaced Guava's AbstractListeningExecutorService with native 
java implementation
---
 .../plugins/blob/AbstractDataStoreCacheTest.java   | 63 ++++++++++++----------
 1 file changed, 35 insertions(+), 28 deletions(-)

diff --git 
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
 
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
index 6df2eb8140..6b4c720b87 100644
--- 
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
+++ 
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
@@ -37,12 +37,17 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.AbstractExecutorService;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
@@ -51,16 +56,10 @@ import org.apache.jackrabbit.core.data.DataIdentifier;
 import org.apache.jackrabbit.core.data.DataRecord;
 import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.guava.common.cache.CacheLoader;
-import 
org.apache.jackrabbit.guava.common.util.concurrent.AbstractListeningExecutorService;
-import org.apache.jackrabbit.guava.common.util.concurrent.FutureCallback;
-import org.apache.jackrabbit.guava.common.util.concurrent.Futures;
-import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture;
 import org.apache.jackrabbit.oak.commons.FileIOUtils;
-import org.apache.jackrabbit.oak.commons.internal.concurrent.DirectExecutor;
 import org.apache.jackrabbit.oak.spi.blob.AbstractDataRecord;
 import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
 import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -223,10 +222,10 @@ public class AbstractDataStoreCacheTest {
     }
 
 
-    static class TestExecutor extends AbstractListeningExecutorService {
+    static class TestExecutor extends AbstractExecutorService {
         private final CountDownLatch afterLatch;
         private final ExecutorService delegate;
-        final List<ListenableFuture<Integer>> futures;
+        final List<CompletableFuture<Integer>> futures;
 
         public TestExecutor(int threads, CountDownLatch beforeLatch, 
CountDownLatch afterLatch,
             CountDownLatch afterExecuteLatch) {
@@ -235,13 +234,19 @@ public class AbstractDataStoreCacheTest {
             this.afterLatch = afterLatch;
         }
 
-        @Override @NotNull public ListenableFuture<?> submit(@NotNull Callable 
task) {
+        @Override @NotNull public Future<Integer> submit(@NotNull Callable 
task) {
             LOG.trace("Before submitting to super....");
-            ListenableFuture<Integer> submit = super.submit(task);
+            CompletableFuture<Integer> submit = 
CompletableFuture.supplyAsync(() -> {
+                try {
+                    return (Integer) task.call();
+                } catch (Exception e) {
+                    throw new CompletionException(e);
+                }
+            }, delegate);
             LOG.trace("After submitting to super....");
 
             futures.add(submit);
-            Futures.addCallback(submit, new 
TestFutureCallback<Integer>(afterLatch), DirectExecutor.INSTANCE);
+            submit.whenComplete(new TestFutureCallback<>(afterLatch));
             LOG.trace("Added callback");
 
             return submit;
@@ -272,31 +277,33 @@ public class AbstractDataStoreCacheTest {
             return delegate.awaitTermination(timeout, unit);
         }
 
-        static class TestFutureCallback<Integer> implements FutureCallback {
+        static class TestFutureCallback<Integer> implements 
BiConsumer<Integer, Throwable> {
             private final CountDownLatch latch;
 
             public TestFutureCallback(CountDownLatch latch) {
                 this.latch = latch;
             }
 
-            @Override public void onSuccess(@Nullable Object result) {
-                try {
-                    LOG.trace("Waiting for latch in callback");
-                    latch.await(100, TimeUnit.MILLISECONDS);
-                    LOG.trace("Acquired latch in onSuccess");
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
+            @Override
+            public void accept(Integer integer, Throwable throwable) {
+                if (throwable == null) {
+                    try {
+                        LOG.trace("Waiting for latch in callback");
+                        latch.await(100, TimeUnit.MILLISECONDS);
+                        LOG.trace("Acquired latch in onSuccess");
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                } else {
+                    try {
+                        LOG.trace("Waiting for latch onFailure in callback");
+                        latch.await(100, TimeUnit.MILLISECONDS);
+                        LOG.trace("Acquired latch in onFailure");
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
                 }
-            }
 
-            @Override public void onFailure(@NotNull Throwable t) {
-                try {
-                    LOG.trace("Waiting for latch onFailure in callback");
-                    latch.await(100, TimeUnit.MILLISECONDS);
-                    LOG.trace("Acquired latch in onFailure");
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
             }
         }
     }

Reply via email to