This is an automated email from the ASF dual-hosted git repository. daim pushed a commit to branch OAK-11893 in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit 45bf5a9cc693d8cef7140f8e3eff124834bca4e2 Author: rishabhdaim <rishabhdaim1...@gmail.com> AuthorDate: Tue Sep 9 11:15:45 2025 +0530 OAK-11893 : removed usage of Guava's ListenableFuture --- .../oak/plugins/blob/UploadStagingCacheTest.java | 112 ++++++++++----------- .../oak/plugins/blob/datastore/FSBackendIT.java | 24 ++--- .../commons/internal/concurrent/FutureUtils.java | 2 - .../jackrabbit/oak/jcr/ConcurrentReadIT.java | 53 +++++----- .../oak/segment/file/TarRevisionsTest.java | 53 +++++----- 5 files changed, 114 insertions(+), 130 deletions(-) diff --git a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java index c7b3364e0a..b4ccbf5046 100644 --- a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java +++ b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java @@ -31,9 +31,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -42,9 +42,6 @@ import java.util.concurrent.atomic.AtomicInteger; import ch.qos.logback.classic.Level; -import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture; -import org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService; -import org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors; import org.apache.jackrabbit.guava.common.util.concurrent.SettableFuture; import org.apache.commons.io.FileUtils; import org.apache.jackrabbit.core.data.DataStoreException; @@ -174,7 +171,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { @Test public void testAdd() throws Exception { // add load - List<ListenableFuture<Integer>> futures = put(folder); + List<CompletableFuture<Integer>> futures = put(folder); //start taskLatch.countDown(); @@ -205,7 +202,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { init(2, secondTimeUploader, null); // Add load - List<ListenableFuture<Integer>> futures = put(folder); + List<CompletableFuture<Integer>> futures = put(folder); //start taskLatch.countDown(); @@ -290,7 +287,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { @Test public void testGetAddDifferent() throws Exception { //add load - List<ListenableFuture<Integer>> futures = put(folder); + List<CompletableFuture<Integer>> futures = put(folder); // Create an async retrieve task final SettableFuture<File> retFuture = SettableFuture.create(); @@ -326,7 +323,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { closer.register(stagingCache); // add load - List<ListenableFuture<Integer>> futures = put(folder); + List<CompletableFuture<Integer>> futures = put(folder); // Add another load File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile()); @@ -342,7 +339,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 1, f2); futures = new ArrayList<>(); if (future.isPresent()) { - futures.add(future.get()); + futures.add(FutureConverter.toCompletableFuture(future.get())); } assertFuture(futures, 1); @@ -356,7 +353,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { @Test public void testGetAllIdentifiers() throws Exception { // add load - List<ListenableFuture<Integer>> futures = put(folder); + List<CompletableFuture<Integer>> futures = put(folder); // Check getAllIdentifiers Iterator<String> idsIter = stagingCache.getAllIdentifiers(); @@ -383,7 +380,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { @Test public void testInvalidate() throws Exception { // add load - List<ListenableFuture<Integer>> futures = put(folder); + List<CompletableFuture<Integer>> futures = put(folder); // Check invalidate stagingCache.invalidate(ID_PREFIX + 0); @@ -411,7 +408,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { LOG.info("Starting testConcurrentSameAdd"); // Add load - List<ListenableFuture<Integer>> futures = put(folder); + List<CompletableFuture<Integer>> futures = put(folder); File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 0, f); @@ -437,19 +434,18 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { LOG.info("Starting testConcurrentSameAddRequest"); closer.close(); - ListeningExecutorService executorService = - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); + ExecutorService executorService = Executors.newFixedThreadPool(2); - List<ListenableFuture<Integer>> futures = new ArrayList<>(); + List<CompletableFuture<Integer>> futures = new ArrayList<>(); CountDownLatch moveLatch = new CountDownLatch(1); init(1, new TestStagingUploader(folder.newFolder(), moveLatch), null); //1st request - ListenableFuture<Boolean> resultReq1 = putThread(folder, executorService, futures); + CompletableFuture<Boolean> resultReq1 = putThread(folder, executorService, futures); Thread.sleep(100); //2nd Request - ListenableFuture<Boolean> resultReq2 = putThread(folder, executorService, futures); + CompletableFuture<Boolean> resultReq2 = putThread(folder, executorService, futures); Thread.sleep(200); // Allow any thread to start moving @@ -474,13 +470,13 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { @Test public void testConcurrentDifferentAdd() throws Exception { // Add load - List<ListenableFuture<Integer>> futures = put(folder); + List<CompletableFuture<Integer>> futures = put(folder); // Add diff load File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile()); Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 1, f2); if (future2.isPresent()) { - futures.add(future2.get()); + futures.add(FutureConverter.toCompletableFuture(future2.get())); } //start @@ -497,12 +493,11 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { */ @Test public void testConcurrentGetDelete() throws Exception { - ListeningExecutorService executorService = - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); + ExecutorService executorService = Executors.newFixedThreadPool(2); closer.register(new ExecutorCloser(executorService)); // Add load - List<ListenableFuture<Integer>> futures = put(folder); + List<CompletableFuture<Integer>> futures = put(folder); // Get a handle to the file and open stream File file = stagingCache.getIfPresent(ID_PREFIX + 0); @@ -511,7 +506,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { // task to copy the steam to a file simulating read from the stream File temp = folder.newFile(); CountDownLatch copyThreadLatch = new CountDownLatch(1); - SettableFuture<File> future1 = + CompletableFuture<File> future1 = copyStreamThread(executorService, fStream, temp, copyThreadLatch); //start @@ -546,20 +541,19 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { } private void testConcurrentPutDelete(int diff) throws Exception { - ListeningExecutorService executorService = - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); + ExecutorService executorService = Executors.newFixedThreadPool(2); closer.register(new ExecutorCloser(executorService)); //start immediately taskLatch.countDown(); // Add immediately - List<ListenableFuture<Integer>> futures = put(folder); + List<CompletableFuture<Integer>> futures = put(folder); // New task to put another file File f2 = copyToFile(randomStream(diff, 4 * 1024), folder.newFile()); CountDownLatch putThreadLatch = new CountDownLatch(1); CountDownLatch triggerLatch = new CountDownLatch(1); - SettableFuture<Optional<SettableFuture<Integer>>> future1 = + CompletableFuture<Optional<SettableFuture<Integer>>> future1 = putThread(executorService, diff, f2, stagingCache, putThreadLatch, triggerLatch); putThreadLatch.countDown(); @@ -569,10 +563,10 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { removeExecutor.schedule(stagingCache.new RemoveJob(), 0, TimeUnit.MILLISECONDS); triggerLatch.await(); if (future1.get().isPresent()) { - futures.add(future1.get().get()); + futures.add(FutureConverter.toCompletableFuture(future1.get().get())); } - CompletableFuture<List<Integer>> listCompletableFuture = FutureUtils.successfulAsList(FutureConverter.toCompletableFuture(futures)); + CompletableFuture<List<Integer>> listCompletableFuture = FutureUtils.successfulAsList(futures); try { listCompletableFuture.get(); scheduledFuture.get(); @@ -593,7 +587,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { @Test public void testBuild() throws Exception { // Add load - List<ListenableFuture<Integer>> futures = put(folder); + List<CompletableFuture<Integer>> futures = put(folder); // Close before uploading finished closer.close(); @@ -631,7 +625,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { // Start staging cache init(3); - List<ListenableFuture<Integer>> futures = put(folder); + List<CompletableFuture<Integer>> futures = put(folder); // Not staged as already full assertTrue(futures.isEmpty()); @@ -662,7 +656,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { @Test public void testUpgrade() throws Exception { // Add load - List<ListenableFuture<Integer>> futures = put(folder); + List<CompletableFuture<Integer>> futures = put(folder); // Close before uploading finished closer.close(); @@ -734,30 +728,28 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { assertFalse(pendingUploadFile.exists()); } - private static SettableFuture<File> copyStreamThread(ListeningExecutorService executor, + private static CompletableFuture<File> copyStreamThread(ExecutorService executor, final InputStream fStream, final File temp, final CountDownLatch start) { - final SettableFuture<File> future = SettableFuture.create(); - executor.submit(new Runnable() { - @Override public void run() { - try { - LOG.info("Waiting for start of copying"); - start.await(); - LOG.info("Starting copy of [{}]", temp); - FileUtils.copyInputStreamToFile(fStream, temp); - LOG.info("Finished retrieve"); - future.set(temp); - } catch (Exception e) { - LOG.info("Exception in get", e); - } + final CompletableFuture<File> future = new CompletableFuture<>(); + executor.submit(() -> { + try { + LOG.info("Waiting for start of copying"); + start.await(); + LOG.info("Starting copy of [{}]", temp); + FileUtils.copyInputStreamToFile(fStream, temp); + LOG.info("Finished retrieve"); + future.complete(temp); + } catch (Exception e) { + LOG.info("Exception in get", e); } }); return future; } - private static SettableFuture<Optional<SettableFuture<Integer>>> putThread( - ListeningExecutorService executor, final int seed, final File f, final UploadStagingCache cache, + private static CompletableFuture<Optional<SettableFuture<Integer>>> putThread( + ExecutorService executor, final int seed, final File f, final UploadStagingCache cache, final CountDownLatch start, final CountDownLatch trigger) { - final SettableFuture<Optional<SettableFuture<Integer>>> future = SettableFuture.create(); + final CompletableFuture<Optional<SettableFuture<Integer>>> future = new CompletableFuture<>(); executor.submit(new Runnable() { @Override public void run() { try { @@ -767,7 +759,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { trigger.countDown(); Optional<SettableFuture<Integer>> opt = cache.put(ID_PREFIX + seed, f); LOG.info("Finished put"); - future.set(opt); + future.complete(opt); } catch (Exception e) { LOG.info("Exception in get", e); } @@ -776,8 +768,8 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { return future; } - private void waitFinish(List<ListenableFuture<Integer>> futures) { - CompletableFuture<List<Integer>> listCompletableFuture = FutureUtils.successfulAsList(FutureConverter.toCompletableFuture(futures)); + private void waitFinish(List<CompletableFuture<Integer>> futures) { + CompletableFuture<List<Integer>> listCompletableFuture = FutureUtils.successfulAsList(futures); try { listCompletableFuture.get(); ScheduledFuture<?> scheduledFuture = @@ -789,11 +781,10 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { } - private ListenableFuture<Boolean> putThread(TemporaryFolder folder, ListeningExecutorService executorService, List<ListenableFuture<Integer>> futures) { + private CompletableFuture<Boolean> putThread(TemporaryFolder folder, ExecutorService executorService, List<CompletableFuture<Integer>> futures) { closer.register(new ExecutorCloser(executorService)); - ListenableFuture<Boolean> result = executorService.submit(new Callable<Boolean>() { - @Override public Boolean call() { + CompletableFuture<Boolean> result = CompletableFuture.supplyAsync(() -> { try { LOG.info("Starting put"); futures.addAll(put(folder)); @@ -805,24 +796,23 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { LOG.info("Exception in get", e); } return false; - } - }); + }, executorService); return result; } - private List<ListenableFuture<Integer>> put(TemporaryFolder folder) + private List<CompletableFuture<Integer>> put(TemporaryFolder folder) throws IOException { File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, f); - List<ListenableFuture<Integer>> futures = new ArrayList<>(); + List<CompletableFuture<Integer>> futures = new ArrayList<>(); if (future.isPresent()) { - futures.add(future.get()); + futures.add(FutureConverter.toCompletableFuture(future.get())); } return futures; } - private void assertFuture(List<ListenableFuture<Integer>> futures, int... seeds) + private void assertFuture(List<CompletableFuture<Integer>> futures, int... seeds) throws Exception { waitFinish(futures); diff --git a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/FSBackendIT.java b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/FSBackendIT.java index da525a4c7e..c0b977fe0f 100644 --- a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/FSBackendIT.java +++ b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/FSBackendIT.java @@ -28,11 +28,9 @@ import java.util.Properties; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture; -import org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService; -import org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors; import org.apache.commons.io.FileUtils; import org.apache.jackrabbit.core.data.DataIdentifier; import org.apache.jackrabbit.core.data.DataRecord; @@ -44,7 +42,6 @@ import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.commons.collections.MapUtils; import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureUtils; -import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureConverter; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -69,7 +66,7 @@ public class FSBackendIT { private FSBackend backend; private String dataStoreDir; private DataStore ds; - private ListeningExecutorService executor; + private ExecutorService executor; private Random rand = new Random(0); @Before @@ -80,8 +77,7 @@ public class FSBackendIT { props.setProperty("fsBackendPath", dataStoreDir); ds = createDataStore(); backend = (FSBackend) ((CachingFileDataStore) ds).getBackend(); - this.executor = MoreExecutors.listeningDecorator(Executors - .newFixedThreadPool(25, new NamedThreadFactory("oak-backend-test-write-thread"))); + this.executor = Executors.newFixedThreadPool(25, new NamedThreadFactory("oak-backend-test-write-thread")); } protected DataStore createDataStore() { @@ -199,7 +195,7 @@ public class FSBackendIT { * Method to assert record while writing and deleting record from FSBackend */ void doTest(DataStore ds, int concurrency, boolean same) throws Exception { - List<ListenableFuture<Integer>> futures = new ArrayList<>(); + List<CompletableFuture<Integer>> futures = new ArrayList<>(); CountDownLatch latch = new CountDownLatch(concurrency); int seed = 0; @@ -217,13 +213,13 @@ public class FSBackendIT { assertFuture(futures); } - private List<ListenableFuture<Integer>> put(TemporaryFolder folder, List<ListenableFuture<Integer>> futures, + private List<CompletableFuture<Integer>> put(TemporaryFolder folder, List<CompletableFuture<Integer>> futures, int seed, CountDownLatch writeLatch) throws IOException { File f = copyToFile(randomStream(seed, 4 * 1024 * 1024), folder.newFile()); - ListenableFuture<Integer> future = executor.submit(() -> { + CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { writeLatch.await(); backend.write(new DataIdentifier("0000ID" + seed), f); @@ -238,8 +234,8 @@ public class FSBackendIT { return futures; } - private void waitFinish(List<ListenableFuture<Integer>> futures) { - CompletableFuture<List<Integer>> completableFutures = FutureUtils.successfulAsList(FutureConverter.toCompletableFuture(futures)); + private void waitFinish(List<CompletableFuture<Integer>> futures) { + CompletableFuture<List<Integer>> completableFutures = FutureUtils.successfulAsList(futures); try { completableFutures.get(); } catch (Exception e) { @@ -247,10 +243,10 @@ public class FSBackendIT { } } - private void assertFuture(List<ListenableFuture<Integer>> futures) throws Exception { + private void assertFuture(List<CompletableFuture<Integer>> futures) throws Exception { waitFinish(futures); - for (ListenableFuture future : futures) { + for (CompletableFuture future : futures) { assertFile((Integer) future.get(), folder); } } diff --git a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtils.java b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtils.java index dae861c9f7..8c24c9a99a 100644 --- a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtils.java +++ b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtils.java @@ -18,8 +18,6 @@ */ package org.apache.jackrabbit.oak.commons.internal.concurrent; -import com.google.common.util.concurrent.ListenableFuture; -import org.apache.jackrabbit.oak.commons.collections.IterableUtils; import org.apache.jackrabbit.oak.commons.collections.IteratorUtils; import org.apache.jackrabbit.oak.commons.collections.StreamUtils; diff --git a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java index 9965612fa4..9c48ea90ec 100644 --- a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java +++ b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java @@ -20,8 +20,10 @@ package org.apache.jackrabbit.oak.jcr; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -31,11 +33,6 @@ import javax.jcr.PropertyIterator; import javax.jcr.RepositoryException; import javax.jcr.Session; -import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture; -import org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService; -import org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors; - -import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureConverter; import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureUtils; import org.apache.jackrabbit.oak.fixture.NodeStoreFixture; import org.junit.Test; @@ -60,26 +57,26 @@ public class ConcurrentReadIT extends AbstractRepositoryTest { } session.save(); - ListeningExecutorService executorService = MoreExecutors.listeningDecorator( - Executors.newCachedThreadPool()); + ExecutorService executorService = Executors.newCachedThreadPool(); - List<ListenableFuture<Void>> futures = new ArrayList<>(); + List<CompletableFuture<?>> futures = new ArrayList<>(); for (int k = 0; k < 20; k ++) { - futures.add(executorService.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - for (int k = 0; k < 10000; k++) { + futures.add(CompletableFuture.supplyAsync(() -> { + for (int i = 0; i < 10000; i++) { + try { session.refresh(false); NodeIterator children = testRoot.getNodes(); children.hasNext(); + } catch (Exception e) { + throw new CompletionException(e); } - return null; } - })); + return null; + }, executorService)); } // Throws ExecutionException if any of the submitted task failed - FutureUtils.allAsList(FutureConverter.toCompletableFuture(futures)).get(); + FutureUtils.allAsList(futures).get(); executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); } finally { @@ -98,26 +95,26 @@ public class ConcurrentReadIT extends AbstractRepositoryTest { } session.save(); - ListeningExecutorService executorService = MoreExecutors.listeningDecorator( - Executors.newCachedThreadPool()); + ExecutorService executorService = Executors.newCachedThreadPool(); - List<ListenableFuture<Void>> futures = new ArrayList<>(); + List<CompletableFuture<?>> futures = new ArrayList<>(); for (int k = 0; k < 20; k ++) { - futures.add(executorService.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - for (int k = 0; k < 100000; k++) { - session.refresh(false); - PropertyIterator properties = testRoot.getProperties(); - properties.hasNext(); + futures.add(CompletableFuture.supplyAsync(() -> { + for (int i = 0; i < 100000; i++) { + try { + session.refresh(false); + PropertyIterator properties = testRoot.getProperties(); + properties.hasNext(); + } catch (Exception e) { + throw new CompletionException(e); + } } return null; - } })); } // Throws ExecutionException if any of the submitted task failed - FutureUtils.allAsList(FutureConverter.toCompletableFuture(futures)).get(); + FutureUtils.allAsList(futures).get(); executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); } finally { diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarRevisionsTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarRevisionsTest.java index 05cbe06926..f54f681ac1 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarRevisionsTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/TarRevisionsTest.java @@ -19,8 +19,6 @@ package org.apache.jackrabbit.oak.segment.file; -import static org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors.listeningDecorator; -import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -30,15 +28,16 @@ import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; import java.util.function.Function; import org.apache.jackrabbit.guava.common.base.Functions; -import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture; -import org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService; import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.SegmentNodeBuilder; import org.apache.jackrabbit.oak.segment.SegmentNodeState; @@ -159,32 +158,34 @@ public class TarRevisionsTest { @Test public void concurrentSetHeadFromFunction() throws InterruptedException, ExecutionException, TimeoutException { - ListeningExecutorService executor = listeningDecorator(newFixedThreadPool(2)); + final ExecutorService executor = Executors.newFixedThreadPool(2); try { - ListenableFuture<Boolean> t1 = executor.submit(new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - return null != revisions.setHead(new Function<RecordId, RecordId>() { + CompletableFuture<Boolean> t1 = CompletableFuture.supplyAsync(() -> { + try { + return null != revisions.setHead(new Function<>() { @Nullable @Override public RecordId apply(RecordId headId) { return addChild(reader.readNode(headId), "a").getRecordId(); } }); + } catch (Exception e) { + throw new CompletionException(e); } - }); - ListenableFuture<Boolean> t2 = executor.submit(new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - return null != revisions.setHead(new Function<RecordId, RecordId>() { + }, executor); + CompletableFuture<Boolean> t2 = CompletableFuture.supplyAsync(() -> { + try { + return null != revisions.setHead(new Function<>() { @Nullable @Override public RecordId apply(RecordId headId) { return addChild(reader.readNode(headId), "b").getRecordId(); } }); + } catch (Exception e) { + throw new CompletionException(e); } - }); + }, executor); assertTrue(t1.get(500, MILLISECONDS)); assertTrue(t2.get(500, MILLISECONDS)); @@ -200,30 +201,32 @@ public class TarRevisionsTest { @Test public void setFromFunctionBlocks() throws ExecutionException, InterruptedException, TimeoutException { - ListeningExecutorService executor = listeningDecorator(newFixedThreadPool(2)); + final ExecutorService executor = Executors.newFixedThreadPool(2); try { final CountDownLatch latch = new CountDownLatch(1); - ListenableFuture<Boolean> t1 = executor.submit(new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { + CompletableFuture<Boolean> t1 = CompletableFuture.supplyAsync(() -> { + try { latch.await(); return null != revisions.setHead(Functions.<RecordId>identity()); + } catch (Exception e) { + throw new CompletionException(e); } - }); + }, executor); try { t1.get(500, MILLISECONDS); fail("SetHead from function should block"); } catch (TimeoutException expected) {} - ListenableFuture<Boolean> t2 = executor.submit(new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { + CompletableFuture<Boolean> t2 = CompletableFuture.supplyAsync(() -> { + try { latch.countDown(); return null != revisions.setHead(Functions.<RecordId>identity()); + } catch (Exception e) { + throw new CompletionException(e); } - }); + }, executor); assertTrue(t2.get(500, MILLISECONDS)); assertTrue(t1.get(500, MILLISECONDS));