This is an automated email from the ASF dual-hosted git repository. daim pushed a commit to branch OAK-11921 in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit c332ea47d0440679af5cfdc4a11f05835a871a0a Author: rishabhdaim <[email protected]> AuthorDate: Thu Dec 4 17:06:32 2025 +0530 OAK-11924 : replaced Guava's AbstractListeningExecutorService with native java implementation --- .../plugins/blob/AbstractDataStoreCacheTest.java | 63 ++++++++++++---------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java index 6df2eb8140..6b4c720b87 100644 --- a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java +++ b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java @@ -37,12 +37,17 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -51,16 +56,10 @@ import org.apache.jackrabbit.core.data.DataIdentifier; import org.apache.jackrabbit.core.data.DataRecord; import org.apache.jackrabbit.core.data.DataStoreException; import org.apache.jackrabbit.guava.common.cache.CacheLoader; -import org.apache.jackrabbit.guava.common.util.concurrent.AbstractListeningExecutorService; -import org.apache.jackrabbit.guava.common.util.concurrent.FutureCallback; -import org.apache.jackrabbit.guava.common.util.concurrent.Futures; -import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture; import org.apache.jackrabbit.oak.commons.FileIOUtils; -import org.apache.jackrabbit.oak.commons.internal.concurrent.DirectExecutor; import org.apache.jackrabbit.oak.spi.blob.AbstractDataRecord; import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -223,10 +222,10 @@ public class AbstractDataStoreCacheTest { } - static class TestExecutor extends AbstractListeningExecutorService { + static class TestExecutor extends AbstractExecutorService { private final CountDownLatch afterLatch; private final ExecutorService delegate; - final List<ListenableFuture<Integer>> futures; + final List<CompletableFuture<Integer>> futures; public TestExecutor(int threads, CountDownLatch beforeLatch, CountDownLatch afterLatch, CountDownLatch afterExecuteLatch) { @@ -235,13 +234,19 @@ public class AbstractDataStoreCacheTest { this.afterLatch = afterLatch; } - @Override @NotNull public ListenableFuture<?> submit(@NotNull Callable task) { + @Override @NotNull public Future<Integer> submit(@NotNull Callable task) { LOG.trace("Before submitting to super...."); - ListenableFuture<Integer> submit = super.submit(task); + CompletableFuture<Integer> submit = CompletableFuture.supplyAsync(() -> { + try { + return (Integer) task.call(); + } catch (Exception e) { + throw new CompletionException(e); + } + }, delegate); LOG.trace("After submitting to super...."); futures.add(submit); - Futures.addCallback(submit, new TestFutureCallback<Integer>(afterLatch), DirectExecutor.INSTANCE); + submit.whenComplete(new TestFutureCallback<>(afterLatch)); LOG.trace("Added callback"); return submit; @@ -272,31 +277,33 @@ public class AbstractDataStoreCacheTest { return delegate.awaitTermination(timeout, unit); } - static class TestFutureCallback<Integer> implements FutureCallback { + static class TestFutureCallback<Integer> implements BiConsumer<Integer, Throwable> { private final CountDownLatch latch; public TestFutureCallback(CountDownLatch latch) { this.latch = latch; } - @Override public void onSuccess(@Nullable Object result) { - try { - LOG.trace("Waiting for latch in callback"); - latch.await(100, TimeUnit.MILLISECONDS); - LOG.trace("Acquired latch in onSuccess"); - } catch (InterruptedException e) { - e.printStackTrace(); + @Override + public void accept(Integer integer, Throwable throwable) { + if (throwable == null) { + try { + LOG.trace("Waiting for latch in callback"); + latch.await(100, TimeUnit.MILLISECONDS); + LOG.trace("Acquired latch in onSuccess"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } else { + try { + LOG.trace("Waiting for latch onFailure in callback"); + latch.await(100, TimeUnit.MILLISECONDS); + LOG.trace("Acquired latch in onFailure"); + } catch (InterruptedException e) { + e.printStackTrace(); + } } - } - @Override public void onFailure(@NotNull Throwable t) { - try { - LOG.trace("Waiting for latch onFailure in callback"); - latch.await(100, TimeUnit.MILLISECONDS); - LOG.trace("Acquired latch in onFailure"); - } catch (InterruptedException e) { - e.printStackTrace(); - } } } }
