This is an automated email from the ASF dual-hosted git repository.
daim pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 16420737b6 OAK-11924 : replaced Guava's
AbstractListeningExecutorService with native java implementation (#2646)
16420737b6 is described below
commit 16420737b6d71352bc2fd95241702b2856e1a129
Author: Rishabh Kumar <[email protected]>
AuthorDate: Thu Dec 4 21:21:20 2025 +0530
OAK-11924 : replaced Guava's AbstractListeningExecutorService with native
java implementation (#2646)
---
.../plugins/blob/AbstractDataStoreCacheTest.java | 63 ++++++++++++----------
.../jackrabbit/oak/plugins/blob/FileCacheTest.java | 7 ++-
2 files changed, 38 insertions(+), 32 deletions(-)
diff --git
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
index 6df2eb8140..6b4c720b87 100644
---
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
+++
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
@@ -37,12 +37,17 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@@ -51,16 +56,10 @@ import org.apache.jackrabbit.core.data.DataIdentifier;
import org.apache.jackrabbit.core.data.DataRecord;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.guava.common.cache.CacheLoader;
-import
org.apache.jackrabbit.guava.common.util.concurrent.AbstractListeningExecutorService;
-import org.apache.jackrabbit.guava.common.util.concurrent.FutureCallback;
-import org.apache.jackrabbit.guava.common.util.concurrent.Futures;
-import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture;
import org.apache.jackrabbit.oak.commons.FileIOUtils;
-import org.apache.jackrabbit.oak.commons.internal.concurrent.DirectExecutor;
import org.apache.jackrabbit.oak.spi.blob.AbstractDataRecord;
import org.apache.jackrabbit.oak.spi.blob.AbstractSharedBackend;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -223,10 +222,10 @@ public class AbstractDataStoreCacheTest {
}
- static class TestExecutor extends AbstractListeningExecutorService {
+ static class TestExecutor extends AbstractExecutorService {
private final CountDownLatch afterLatch;
private final ExecutorService delegate;
- final List<ListenableFuture<Integer>> futures;
+ final List<CompletableFuture<Integer>> futures;
public TestExecutor(int threads, CountDownLatch beforeLatch,
CountDownLatch afterLatch,
CountDownLatch afterExecuteLatch) {
@@ -235,13 +234,19 @@ public class AbstractDataStoreCacheTest {
this.afterLatch = afterLatch;
}
- @Override @NotNull public ListenableFuture<?> submit(@NotNull Callable
task) {
+ @Override @NotNull public Future<Integer> submit(@NotNull Callable
task) {
LOG.trace("Before submitting to super....");
- ListenableFuture<Integer> submit = super.submit(task);
+ CompletableFuture<Integer> submit =
CompletableFuture.supplyAsync(() -> {
+ try {
+ return (Integer) task.call();
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ }, delegate);
LOG.trace("After submitting to super....");
futures.add(submit);
- Futures.addCallback(submit, new
TestFutureCallback<Integer>(afterLatch), DirectExecutor.INSTANCE);
+ submit.whenComplete(new TestFutureCallback<>(afterLatch));
LOG.trace("Added callback");
return submit;
@@ -272,31 +277,33 @@ public class AbstractDataStoreCacheTest {
return delegate.awaitTermination(timeout, unit);
}
- static class TestFutureCallback<Integer> implements FutureCallback {
+ static class TestFutureCallback<Integer> implements
BiConsumer<Integer, Throwable> {
private final CountDownLatch latch;
public TestFutureCallback(CountDownLatch latch) {
this.latch = latch;
}
- @Override public void onSuccess(@Nullable Object result) {
- try {
- LOG.trace("Waiting for latch in callback");
- latch.await(100, TimeUnit.MILLISECONDS);
- LOG.trace("Acquired latch in onSuccess");
- } catch (InterruptedException e) {
- e.printStackTrace();
+ @Override
+ public void accept(Integer integer, Throwable throwable) {
+ if (throwable == null) {
+ try {
+ LOG.trace("Waiting for latch in callback");
+ latch.await(100, TimeUnit.MILLISECONDS);
+ LOG.trace("Acquired latch in onSuccess");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ } else {
+ try {
+ LOG.trace("Waiting for latch onFailure in callback");
+ latch.await(100, TimeUnit.MILLISECONDS);
+ LOG.trace("Acquired latch in onFailure");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
- }
- @Override public void onFailure(@NotNull Throwable t) {
- try {
- LOG.trace("Waiting for latch onFailure in callback");
- latch.await(100, TimeUnit.MILLISECONDS);
- LOG.trace("Acquired latch in onFailure");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
}
}
}
diff --git
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java
index 2042c5bc42..6972f5be2e 100644
---
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java
+++
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java
@@ -31,7 +31,6 @@ import
org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors;
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureUtils;
-import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureConverter;
import org.apache.jackrabbit.oak.commons.pio.Closer;
import org.junit.After;
import org.junit.Before;
@@ -81,7 +80,7 @@ public class FileCacheTest extends AbstractDataStoreCacheTest
{
beforeLatch.countDown();
afterLatch.countDown();
cache = FileCache.build(4 * 1024/* KB */, root, loader, executor);
-
FutureUtils.successfulAsList(FutureConverter.toCompletableFuture(executor.futures)).get();
+ FutureUtils.successfulAsList(executor.futures).get();
closer.register(cache);
@@ -438,7 +437,7 @@ public class FileCacheTest extends
AbstractDataStoreCacheTest {
cache = FileCache.build(4 * 1024/* bytes */, root, loader, executor);
closer.register(cache);
afterExecuteLatch.await();
-
FutureUtils.successfulAsList(FutureConverter.toCompletableFuture(executor.futures)).get();
+ FutureUtils.successfulAsList(executor.futures).get();
LOG.info("Cache rebuilt");
assertCacheIfPresent(0, cache, f);
@@ -473,7 +472,7 @@ public class FileCacheTest extends
AbstractDataStoreCacheTest {
cache = FileCache.build(4 * 1024/* bytes */, root, loader, executor);
closer.register(cache);
afterExecuteLatch.await();
-
FutureUtils.successfulAsList(FutureConverter.toCompletableFuture(executor.futures)).get();
+ FutureUtils.successfulAsList(executor.futures).get();
LOG.info("Cache rebuilt");
assertCacheIfPresent(0, cache, f);