This is an automated email from the ASF dual-hosted git repository.

daim pushed a commit to branch OAK-11893
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit 45bf5a9cc693d8cef7140f8e3eff124834bca4e2
Author: rishabhdaim <rishabhdaim1...@gmail.com>
AuthorDate: Tue Sep 9 11:15:45 2025 +0530

    OAK-11893 : removed usage of Guava's ListenableFuture
---
 .../oak/plugins/blob/UploadStagingCacheTest.java   | 112 ++++++++++-----------
 .../oak/plugins/blob/datastore/FSBackendIT.java    |  24 ++---
 .../commons/internal/concurrent/FutureUtils.java   |   2 -
 .../jackrabbit/oak/jcr/ConcurrentReadIT.java       |  53 +++++-----
 .../oak/segment/file/TarRevisionsTest.java         |  53 +++++-----
 5 files changed, 114 insertions(+), 130 deletions(-)

diff --git 
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
 
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
index c7b3364e0a..b4ccbf5046 100644
--- 
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
+++ 
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
@@ -31,9 +31,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -42,9 +42,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import ch.qos.logback.classic.Level;
 
-import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture;
-import 
org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService;
-import org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors;
 import org.apache.jackrabbit.guava.common.util.concurrent.SettableFuture;
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.core.data.DataStoreException;
@@ -174,7 +171,7 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
     @Test
     public void testAdd() throws Exception {
         // add load
-        List<ListenableFuture<Integer>> futures = put(folder);
+        List<CompletableFuture<Integer>> futures = put(folder);
 
         //start
         taskLatch.countDown();
@@ -205,7 +202,7 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
         init(2, secondTimeUploader, null);
 
         // Add load
-        List<ListenableFuture<Integer>> futures = put(folder);
+        List<CompletableFuture<Integer>> futures = put(folder);
 
         //start
         taskLatch.countDown();
@@ -290,7 +287,7 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
     @Test
     public void testGetAddDifferent() throws Exception {
         //add load
-        List<ListenableFuture<Integer>> futures = put(folder);
+        List<CompletableFuture<Integer>> futures = put(folder);
 
         // Create an async retrieve task
         final SettableFuture<File> retFuture = SettableFuture.create();
@@ -326,7 +323,7 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
         closer.register(stagingCache);
 
         // add load
-        List<ListenableFuture<Integer>> futures = put(folder);
+        List<CompletableFuture<Integer>> futures = put(folder);
 
         // Add another load
         File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
@@ -342,7 +339,7 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
         Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX 
+ 1, f2);
         futures = new ArrayList<>();
         if (future.isPresent()) {
-            futures.add(future.get());
+            futures.add(FutureConverter.toCompletableFuture(future.get()));
         }
         assertFuture(futures, 1);
 
@@ -356,7 +353,7 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
     @Test
     public void testGetAllIdentifiers() throws Exception {
         // add load
-        List<ListenableFuture<Integer>> futures = put(folder);
+        List<CompletableFuture<Integer>> futures = put(folder);
 
         // Check getAllIdentifiers
         Iterator<String> idsIter = stagingCache.getAllIdentifiers();
@@ -383,7 +380,7 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
     @Test
     public void testInvalidate() throws Exception {
         // add load
-        List<ListenableFuture<Integer>> futures = put(folder);
+        List<CompletableFuture<Integer>> futures = put(folder);
 
         // Check invalidate
         stagingCache.invalidate(ID_PREFIX + 0);
@@ -411,7 +408,7 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
         LOG.info("Starting testConcurrentSameAdd");
 
         // Add load
-        List<ListenableFuture<Integer>> futures = put(folder);
+        List<CompletableFuture<Integer>> futures = put(folder);
 
         File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
         Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX 
+ 0, f);
@@ -437,19 +434,18 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
         LOG.info("Starting testConcurrentSameAddRequest");
 
         closer.close();
-        ListeningExecutorService executorService =
-            MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
 
-        List<ListenableFuture<Integer>> futures = new ArrayList<>();
+        List<CompletableFuture<Integer>> futures = new ArrayList<>();
         CountDownLatch moveLatch = new CountDownLatch(1);
         init(1, new TestStagingUploader(folder.newFolder(), moveLatch), null);
 
         //1st request
-        ListenableFuture<Boolean> resultReq1 = putThread(folder, 
executorService, futures);
+        CompletableFuture<Boolean> resultReq1 = putThread(folder, 
executorService, futures);
         Thread.sleep(100);
 
         //2nd Request
-        ListenableFuture<Boolean> resultReq2 = putThread(folder, 
executorService, futures);
+        CompletableFuture<Boolean> resultReq2 = putThread(folder, 
executorService, futures);
         Thread.sleep(200);
 
         // Allow any thread to start moving
@@ -474,13 +470,13 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
     @Test
     public void testConcurrentDifferentAdd() throws Exception {
         // Add load
-        List<ListenableFuture<Integer>> futures = put(folder);
+        List<CompletableFuture<Integer>> futures = put(folder);
 
         // Add diff load
         File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
         Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX 
+ 1, f2);
         if (future2.isPresent()) {
-            futures.add(future2.get());
+            futures.add(FutureConverter.toCompletableFuture(future2.get()));
         }
 
         //start
@@ -497,12 +493,11 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
      */
     @Test
     public void testConcurrentGetDelete() throws Exception {
-        ListeningExecutorService executorService =
-            MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
         closer.register(new ExecutorCloser(executorService));
 
         // Add load
-        List<ListenableFuture<Integer>> futures = put(folder);
+        List<CompletableFuture<Integer>> futures = put(folder);
 
         // Get a handle to the file and open stream
         File file = stagingCache.getIfPresent(ID_PREFIX + 0);
@@ -511,7 +506,7 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
         // task to copy the steam to a file simulating read from the stream
         File temp = folder.newFile();
         CountDownLatch copyThreadLatch = new CountDownLatch(1);
-        SettableFuture<File> future1 =
+        CompletableFuture<File> future1 =
             copyStreamThread(executorService, fStream, temp, copyThreadLatch);
 
         //start
@@ -546,20 +541,19 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
     }
 
     private void testConcurrentPutDelete(int diff) throws Exception {
-        ListeningExecutorService executorService =
-            MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
         closer.register(new ExecutorCloser(executorService));
         //start immediately
         taskLatch.countDown();
 
         // Add immediately
-        List<ListenableFuture<Integer>> futures = put(folder);
+        List<CompletableFuture<Integer>> futures = put(folder);
 
         // New task to put another file
         File f2 = copyToFile(randomStream(diff, 4 * 1024), folder.newFile());
         CountDownLatch putThreadLatch = new CountDownLatch(1);
         CountDownLatch triggerLatch = new CountDownLatch(1);
-        SettableFuture<Optional<SettableFuture<Integer>>> future1 =
+        CompletableFuture<Optional<SettableFuture<Integer>>> future1 =
             putThread(executorService, diff, f2, stagingCache, putThreadLatch, 
triggerLatch);
         putThreadLatch.countDown();
 
@@ -569,10 +563,10 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
             removeExecutor.schedule(stagingCache.new RemoveJob(), 0, 
TimeUnit.MILLISECONDS);
         triggerLatch.await();
         if (future1.get().isPresent()) {
-            futures.add(future1.get().get());
+            
futures.add(FutureConverter.toCompletableFuture(future1.get().get()));
         }
 
-        CompletableFuture<List<Integer>> listCompletableFuture = 
FutureUtils.successfulAsList(FutureConverter.toCompletableFuture(futures));
+        CompletableFuture<List<Integer>> listCompletableFuture = 
FutureUtils.successfulAsList(futures);
         try {
             listCompletableFuture.get();
             scheduledFuture.get();
@@ -593,7 +587,7 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
     @Test
     public void testBuild() throws Exception {
         // Add load
-        List<ListenableFuture<Integer>> futures = put(folder);
+        List<CompletableFuture<Integer>> futures = put(folder);
         // Close before uploading finished
         closer.close();
 
@@ -631,7 +625,7 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
         // Start staging cache
         init(3);
 
-        List<ListenableFuture<Integer>> futures = put(folder);
+        List<CompletableFuture<Integer>> futures = put(folder);
         // Not staged as already full
         assertTrue(futures.isEmpty());
 
@@ -662,7 +656,7 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
     @Test
     public void testUpgrade() throws Exception {
         // Add load
-        List<ListenableFuture<Integer>> futures = put(folder);
+        List<CompletableFuture<Integer>> futures = put(folder);
         // Close before uploading finished
         closer.close();
 
@@ -734,30 +728,28 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
         assertFalse(pendingUploadFile.exists());
     }
 
-    private static SettableFuture<File> 
copyStreamThread(ListeningExecutorService executor,
+    private static CompletableFuture<File> copyStreamThread(ExecutorService 
executor,
         final InputStream fStream, final File temp, final CountDownLatch 
start) {
-        final SettableFuture<File> future = SettableFuture.create();
-        executor.submit(new Runnable() {
-            @Override public void run() {
-                try {
-                    LOG.info("Waiting for start of copying");
-                    start.await();
-                    LOG.info("Starting copy of [{}]", temp);
-                    FileUtils.copyInputStreamToFile(fStream, temp);
-                    LOG.info("Finished retrieve");
-                    future.set(temp);
-                } catch (Exception e) {
-                    LOG.info("Exception in get", e);
-                }
+        final CompletableFuture<File> future = new CompletableFuture<>();
+        executor.submit(() -> {
+            try {
+                LOG.info("Waiting for start of copying");
+                start.await();
+                LOG.info("Starting copy of [{}]", temp);
+                FileUtils.copyInputStreamToFile(fStream, temp);
+                LOG.info("Finished retrieve");
+                future.complete(temp);
+            } catch (Exception e) {
+                LOG.info("Exception in get", e);
             }
         });
         return future;
     }
 
-    private static SettableFuture<Optional<SettableFuture<Integer>>> putThread(
-        ListeningExecutorService executor, final int seed, final File f, final 
UploadStagingCache cache,
+    private static CompletableFuture<Optional<SettableFuture<Integer>>> 
putThread(
+        ExecutorService executor, final int seed, final File f, final 
UploadStagingCache cache,
         final CountDownLatch start, final CountDownLatch trigger) {
-        final SettableFuture<Optional<SettableFuture<Integer>>> future = 
SettableFuture.create();
+        final CompletableFuture<Optional<SettableFuture<Integer>>> future = 
new CompletableFuture<>();
         executor.submit(new Runnable() {
             @Override public void run() {
                 try {
@@ -767,7 +759,7 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
                     trigger.countDown();
                     Optional<SettableFuture<Integer>> opt = 
cache.put(ID_PREFIX + seed, f);
                     LOG.info("Finished put");
-                    future.set(opt);
+                    future.complete(opt);
                 } catch (Exception e) {
                     LOG.info("Exception in get", e);
                 }
@@ -776,8 +768,8 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
         return future;
     }
 
-    private void waitFinish(List<ListenableFuture<Integer>> futures) {
-        CompletableFuture<List<Integer>> listCompletableFuture = 
FutureUtils.successfulAsList(FutureConverter.toCompletableFuture(futures));
+    private void waitFinish(List<CompletableFuture<Integer>> futures) {
+        CompletableFuture<List<Integer>> listCompletableFuture = 
FutureUtils.successfulAsList(futures);
         try {
             listCompletableFuture.get();
             ScheduledFuture<?> scheduledFuture =
@@ -789,11 +781,10 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
     }
 
 
-    private ListenableFuture<Boolean> putThread(TemporaryFolder folder, 
ListeningExecutorService executorService, List<ListenableFuture<Integer>> 
futures) {
+    private CompletableFuture<Boolean> putThread(TemporaryFolder folder, 
ExecutorService executorService, List<CompletableFuture<Integer>> futures) {
         closer.register(new ExecutorCloser(executorService));
 
-        ListenableFuture<Boolean> result = executorService.submit(new 
Callable<Boolean>() {
-            @Override public Boolean call() {
+        CompletableFuture<Boolean> result = CompletableFuture.supplyAsync(() 
-> {
                 try {
                     LOG.info("Starting put");
                     futures.addAll(put(folder));
@@ -805,24 +796,23 @@ public class UploadStagingCacheTest extends 
AbstractDataStoreCacheTest {
                     LOG.info("Exception in get", e);
                 }
                 return false;
-            }
-        });
+        }, executorService);
 
         return result;
     }
 
-    private List<ListenableFuture<Integer>> put(TemporaryFolder folder)
+    private List<CompletableFuture<Integer>> put(TemporaryFolder folder)
         throws IOException {
         File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
         Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX 
+ 0, f);
-        List<ListenableFuture<Integer>> futures = new ArrayList<>();
+        List<CompletableFuture<Integer>> futures = new ArrayList<>();
         if (future.isPresent()) {
-            futures.add(future.get());
+            futures.add(FutureConverter.toCompletableFuture(future.get()));
         }
         return futures;
     }
 
-    private void assertFuture(List<ListenableFuture<Integer>> futures, int... 
seeds)
+    private void assertFuture(List<CompletableFuture<Integer>> futures, int... 
seeds)
         throws Exception {
         waitFinish(futures);
 
diff --git 
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/FSBackendIT.java
 
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/FSBackendIT.java
index da525a4c7e..c0b977fe0f 100644
--- 
a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/FSBackendIT.java
+++ 
b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/FSBackendIT.java
@@ -28,11 +28,9 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture;
-import 
org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService;
-import org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors;
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.core.data.DataIdentifier;
 import org.apache.jackrabbit.core.data.DataRecord;
@@ -44,7 +42,6 @@ import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.commons.collections.MapUtils;
 import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
 import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureUtils;
-import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureConverter;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -69,7 +66,7 @@ public class FSBackendIT {
     private FSBackend backend;
     private String dataStoreDir;
     private DataStore ds;
-    private ListeningExecutorService executor;
+    private ExecutorService executor;
     private Random rand = new Random(0);
 
     @Before
@@ -80,8 +77,7 @@ public class FSBackendIT {
         props.setProperty("fsBackendPath", dataStoreDir);
         ds = createDataStore();
         backend = (FSBackend) ((CachingFileDataStore) ds).getBackend();
-        this.executor = MoreExecutors.listeningDecorator(Executors
-            .newFixedThreadPool(25, new 
NamedThreadFactory("oak-backend-test-write-thread")));
+        this.executor = Executors.newFixedThreadPool(25, new 
NamedThreadFactory("oak-backend-test-write-thread"));
     }
 
     protected DataStore createDataStore() {
@@ -199,7 +195,7 @@ public class FSBackendIT {
      * Method to assert record while writing and deleting record from FSBackend
      */
     void doTest(DataStore ds, int concurrency, boolean same) throws Exception {
-        List<ListenableFuture<Integer>> futures = new ArrayList<>();
+        List<CompletableFuture<Integer>> futures = new ArrayList<>();
         CountDownLatch latch = new CountDownLatch(concurrency);
 
         int seed = 0;
@@ -217,13 +213,13 @@ public class FSBackendIT {
         assertFuture(futures);
     }
 
-    private List<ListenableFuture<Integer>> put(TemporaryFolder folder, 
List<ListenableFuture<Integer>> futures,
+    private List<CompletableFuture<Integer>> put(TemporaryFolder folder, 
List<CompletableFuture<Integer>> futures,
         int seed, CountDownLatch writeLatch)
         throws IOException {
 
         File f = copyToFile(randomStream(seed, 4 * 1024 * 1024), 
folder.newFile());
 
-        ListenableFuture<Integer> future = executor.submit(() -> {
+        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() 
-> {
             try {
                 writeLatch.await();
                 backend.write(new DataIdentifier("0000ID" + seed), f);
@@ -238,8 +234,8 @@ public class FSBackendIT {
         return futures;
     }
 
-    private void waitFinish(List<ListenableFuture<Integer>> futures) {
-        CompletableFuture<List<Integer>> completableFutures = 
FutureUtils.successfulAsList(FutureConverter.toCompletableFuture(futures));
+    private void waitFinish(List<CompletableFuture<Integer>> futures) {
+        CompletableFuture<List<Integer>> completableFutures = 
FutureUtils.successfulAsList(futures);
         try {
             completableFutures.get();
         } catch (Exception e) {
@@ -247,10 +243,10 @@ public class FSBackendIT {
         }
     }
 
-    private void assertFuture(List<ListenableFuture<Integer>> futures) throws 
Exception {
+    private void assertFuture(List<CompletableFuture<Integer>> futures) throws 
Exception {
         waitFinish(futures);
 
-        for (ListenableFuture future : futures) {
+        for (CompletableFuture future : futures) {
             assertFile((Integer) future.get(), folder);
         }
     }
diff --git 
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtils.java
 
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtils.java
index dae861c9f7..8c24c9a99a 100644
--- 
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtils.java
+++ 
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtils.java
@@ -18,8 +18,6 @@
  */
 package org.apache.jackrabbit.oak.commons.internal.concurrent;
 
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.jackrabbit.oak.commons.collections.IterableUtils;
 import org.apache.jackrabbit.oak.commons.collections.IteratorUtils;
 import org.apache.jackrabbit.oak.commons.collections.StreamUtils;
 
diff --git 
a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java 
b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java
index 9965612fa4..9c48ea90ec 100644
--- a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java
+++ b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java
@@ -20,8 +20,10 @@ package org.apache.jackrabbit.oak.jcr;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
@@ -31,11 +33,6 @@ import javax.jcr.PropertyIterator;
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 
-import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture;
-import 
org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService;
-import org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors;
-
-import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureConverter;
 import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureUtils;
 import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
 import org.junit.Test;
@@ -60,26 +57,26 @@ public class ConcurrentReadIT extends 
AbstractRepositoryTest {
             }
             session.save();
 
-            ListeningExecutorService executorService = 
MoreExecutors.listeningDecorator(
-                    Executors.newCachedThreadPool());
+            ExecutorService executorService = Executors.newCachedThreadPool();
 
-            List<ListenableFuture<Void>> futures = new ArrayList<>();
+            List<CompletableFuture<?>> futures = new ArrayList<>();
             for (int k = 0; k < 20; k ++) {
-                futures.add(executorService.submit(new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        for (int k = 0; k < 10000; k++) {
+                futures.add(CompletableFuture.supplyAsync(() -> {
+                    for (int i = 0; i < 10000; i++) {
+                        try {
                             session.refresh(false);
                             NodeIterator children = testRoot.getNodes();
                             children.hasNext();
+                        } catch (Exception e) {
+                            throw new CompletionException(e);
                         }
-                        return null;
                     }
-                }));
+                    return null;
+                }, executorService));
             }
 
             // Throws ExecutionException if any of the submitted task failed
-            
FutureUtils.allAsList(FutureConverter.toCompletableFuture(futures)).get();
+            FutureUtils.allAsList(futures).get();
             executorService.shutdown();
             executorService.awaitTermination(1, TimeUnit.DAYS);
         } finally {
@@ -98,26 +95,26 @@ public class ConcurrentReadIT extends 
AbstractRepositoryTest {
             }
             session.save();
 
-            ListeningExecutorService executorService = 
MoreExecutors.listeningDecorator(
-                    Executors.newCachedThreadPool());
+            ExecutorService executorService = Executors.newCachedThreadPool();
 
-            List<ListenableFuture<Void>> futures = new ArrayList<>();
+            List<CompletableFuture<?>> futures = new ArrayList<>();
             for (int k = 0; k < 20; k ++) {
-                futures.add(executorService.submit(new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        for (int k = 0; k < 100000; k++) {
-                            session.refresh(false);
-                            PropertyIterator properties = 
testRoot.getProperties();
-                            properties.hasNext();
+                futures.add(CompletableFuture.supplyAsync(() -> {
+                        for (int i = 0; i < 100000; i++) {
+                            try {
+                                session.refresh(false);
+                                PropertyIterator properties = 
testRoot.getProperties();
+                                properties.hasNext();
+                            } catch (Exception e) {
+                                throw new CompletionException(e);
+                            }
                         }
                         return null;
-                    }
                 }));
             }
 
             // Throws ExecutionException if any of the submitted task failed
-            
FutureUtils.allAsList(FutureConverter.toCompletableFuture(futures)).get();
+            FutureUtils.allAsList(futures).get();
             executorService.shutdown();
             executorService.awaitTermination(1, TimeUnit.DAYS);
         } finally {
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarRevisionsTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarRevisionsTest.java
index 05cbe06926..f54f681ac1 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarRevisionsTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarRevisionsTest.java
@@ -19,8 +19,6 @@
 
 package org.apache.jackrabbit.oak.segment.file;
 
-import static 
org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors.listeningDecorator;
-import static java.util.concurrent.Executors.newFixedThreadPool;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -30,15 +28,16 @@ import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 
 import org.apache.jackrabbit.guava.common.base.Functions;
-import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture;
-import 
org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService;
 import org.apache.jackrabbit.oak.segment.RecordId;
 import org.apache.jackrabbit.oak.segment.SegmentNodeBuilder;
 import org.apache.jackrabbit.oak.segment.SegmentNodeState;
@@ -159,32 +158,34 @@ public class TarRevisionsTest {
     @Test
     public void concurrentSetHeadFromFunction()
     throws InterruptedException, ExecutionException, TimeoutException {
-        ListeningExecutorService executor = 
listeningDecorator(newFixedThreadPool(2));
+        final ExecutorService executor = Executors.newFixedThreadPool(2);
         try {
-            ListenableFuture<Boolean> t1 = executor.submit(new 
Callable<Boolean>() {
-                @Override
-                public Boolean call() throws Exception {
-                    return null != revisions.setHead(new Function<RecordId, 
RecordId>() {
+            CompletableFuture<Boolean> t1 = CompletableFuture.supplyAsync(() 
-> {
+                try {
+                    return null != revisions.setHead(new Function<>() {
                         @Nullable
                         @Override
                         public RecordId apply(RecordId headId) {
                             return addChild(reader.readNode(headId), 
"a").getRecordId();
                         }
                     });
+                } catch (Exception e) {
+                    throw new CompletionException(e);
                 }
-            });
-            ListenableFuture<Boolean> t2 = executor.submit(new 
Callable<Boolean>() {
-                @Override
-                public Boolean call() throws Exception {
-                    return null != revisions.setHead(new Function<RecordId, 
RecordId>() {
+            }, executor);
+            CompletableFuture<Boolean> t2 = CompletableFuture.supplyAsync(() 
-> {
+                try {
+                    return null != revisions.setHead(new Function<>() {
                         @Nullable
                         @Override
                         public RecordId apply(RecordId headId) {
                             return addChild(reader.readNode(headId), 
"b").getRecordId();
                         }
                     });
+                } catch (Exception e) {
+                    throw new CompletionException(e);
                 }
-            });
+            }, executor);
 
             assertTrue(t1.get(500, MILLISECONDS));
             assertTrue(t2.get(500, MILLISECONDS));
@@ -200,30 +201,32 @@ public class TarRevisionsTest {
     @Test
     public void setFromFunctionBlocks()
     throws ExecutionException, InterruptedException, TimeoutException {
-        ListeningExecutorService executor = 
listeningDecorator(newFixedThreadPool(2));
+        final ExecutorService executor = Executors.newFixedThreadPool(2);
         try {
             final CountDownLatch latch = new CountDownLatch(1);
 
-            ListenableFuture<Boolean> t1 = executor.submit(new 
Callable<Boolean>() {
-                @Override
-                public Boolean call() throws Exception {
+            CompletableFuture<Boolean> t1 = CompletableFuture.supplyAsync(() 
-> {
+                try {
                     latch.await();
                     return null != 
revisions.setHead(Functions.<RecordId>identity());
+                } catch (Exception e) {
+                    throw new CompletionException(e);
                 }
-            });
+            }, executor);
 
             try {
                 t1.get(500, MILLISECONDS);
                 fail("SetHead from function should block");
             } catch (TimeoutException expected) {}
 
-            ListenableFuture<Boolean> t2 = executor.submit(new 
Callable<Boolean>() {
-                @Override
-                public Boolean call() throws Exception {
+            CompletableFuture<Boolean> t2 = CompletableFuture.supplyAsync(() 
-> {
+                try {
                     latch.countDown();
                     return null != 
revisions.setHead(Functions.<RecordId>identity());
+                } catch (Exception e) {
+                    throw new CompletionException(e);
                 }
-            });
+            }, executor);
 
             assertTrue(t2.get(500, MILLISECONDS));
             assertTrue(t1.get(500, MILLISECONDS));

Reply via email to