This is an automated email from the ASF dual-hosted git repository. daim pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push: new 521582edfc OAK-11903 : removed usage of Guava's SettableFuture from Prod and its related test files (#2501) 521582edfc is described below commit 521582edfc8d5bcdf02d1c1189a8196a2563db3e Author: Rishabh Kumar <rishabhdaim1...@gmail.com> AuthorDate: Thu Sep 11 14:19:57 2025 +0530 OAK-11903 : removed usage of Guava's SettableFuture from Prod and its related test files (#2501) --- .../oak/plugins/blob/UploadStagingCache.java | 21 ++- .../oak/plugins/blob/UploadStagingCacheTest.java | 38 +++--- .../internal/concurrent/FutureConverter.java | 41 +++--- .../internal/concurrent/FutureConverterTest.java | 142 +++++++++++++++++++++ .../oak/run/osgi/OakOSGiRepositoryFactory.java | 10 +- .../oak/plugins/document/BatchCommit.java | 8 +- 6 files changed, 202 insertions(+), 58 deletions(-) diff --git a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java index cb644e2527..e2d0599f51 100644 --- a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java +++ b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java @@ -31,6 +31,7 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.StringJoiner; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; @@ -47,7 +48,6 @@ import org.apache.jackrabbit.guava.common.util.concurrent.Futures; import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture; import org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService; import org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors; -import org.apache.jackrabbit.guava.common.util.concurrent.SettableFuture; import org.apache.jackrabbit.core.data.DataStoreException; import org.apache.jackrabbit.core.data.util.NamedThreadFactory; import org.apache.jackrabbit.oak.commons.StringUtils; @@ -205,7 +205,7 @@ public class UploadStagingCache implements Closeable { statisticsProvider, executor, scheduledExecutor, purgeInterval, retryInterval); } return new UploadStagingCache() { - @Override public Optional<SettableFuture<Integer>> put(String id, File input) { + @Override public Optional<CompletableFuture<Integer>> put(String id, File input) { return Optional.empty(); } @@ -251,7 +251,7 @@ public class UploadStagingCache implements Closeable { int count = 0; for (File toBeSyncedFile : files) { - Optional<SettableFuture<Integer>> scheduled = + Optional<CompletableFuture<Integer>> scheduled = putOptionalDisregardingSize(toBeSyncedFile.getName(), toBeSyncedFile, true); if (scheduled.isPresent()) { count++; @@ -273,7 +273,7 @@ public class UploadStagingCache implements Closeable { * 1 if upload was successful, * 0 if an existing id is already pending for upload */ - public Optional<SettableFuture<Integer>> put(String id, File input) { + public Optional<CompletableFuture<Integer>> put(String id, File input) { return putOptionalDisregardingSize(id, input, false); } @@ -286,7 +286,7 @@ public class UploadStagingCache implements Closeable { * @param ignoreSize * @return */ - private Optional<SettableFuture<Integer>> putOptionalDisregardingSize(String id, File input, + private Optional<CompletableFuture<Integer>> putOptionalDisregardingSize(String id, File input, boolean ignoreSize) { cacheStats.markRequest(); @@ -320,8 +320,7 @@ public class UploadStagingCache implements Closeable { // if file is still pending upload, count it as present if (map.containsKey(id) || attic.containsKey(id)) { - SettableFuture<Integer> result = SettableFuture.create(); - result.set(0); + CompletableFuture<Integer> result = CompletableFuture.completedFuture(0); return Optional.of(result); } } @@ -355,8 +354,8 @@ public class UploadStagingCache implements Closeable { * @param upload the file to be staged * @return a SettableFuture instance */ - private SettableFuture<Integer> stage(final String id, final File upload) { - final SettableFuture<Integer> result = SettableFuture.create(); + private CompletableFuture<Integer> stage(final String id, final File upload) { + final CompletableFuture<Integer> result = new CompletableFuture<>(); try { // create an async job @@ -395,12 +394,12 @@ public class UploadStagingCache implements Closeable { } catch (IOException e) { LOG.warn("Error in cleaning up [{}] from staging", upload); } - result.set(r); + result.complete(r); } @Override public void onFailure(Throwable t) { LOG.error("Error adding [{}] with file [{}] to backend", id, upload, t); - result.setException(t); + result.completeExceptionally(t); retryQueue.add(id); } }, new SameThreadExecutorService()); diff --git a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java index b4ccbf5046..9e89fc32f1 100644 --- a/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java +++ b/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java @@ -42,7 +42,6 @@ import java.util.concurrent.atomic.AtomicInteger; import ch.qos.logback.classic.Level; -import org.apache.jackrabbit.guava.common.util.concurrent.SettableFuture; import org.apache.commons.io.FileUtils; import org.apache.jackrabbit.core.data.DataStoreException; import org.apache.jackrabbit.oak.commons.FileIOUtils; @@ -50,7 +49,6 @@ import org.apache.jackrabbit.oak.commons.collections.IteratorUtils; import org.apache.jackrabbit.oak.commons.collections.ListUtils; import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureUtils; -import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureConverter; import org.apache.jackrabbit.oak.commons.junit.LogCustomizer; import org.apache.jackrabbit.oak.commons.pio.Closer; import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider; @@ -140,7 +138,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { closer.register(stagingCache); File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); - Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, f); + Optional<CompletableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, f); assertFalse(future.isPresent()); assertNull(stagingCache.getIfPresent(ID_PREFIX + 0)); @@ -156,7 +154,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { // add load File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); - Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, f); + Optional<CompletableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, f); assertTrue(future.isPresent()); assertNotNull(stagingCache.getIfPresent(ID_PREFIX + 0)); @@ -273,7 +271,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { public void testPutMoveFileError() throws Exception { File empty = new File(folder.getRoot(), String.valueOf(System.currentTimeMillis())); assertFalse(empty.exists()); - Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, empty); + Optional<CompletableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, empty); // assert no file assertFalse(future.isPresent()); assertEquals(1, stagingCache.getStats().getMissCount()); @@ -290,10 +288,10 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { List<CompletableFuture<Integer>> futures = put(folder); // Create an async retrieve task - final SettableFuture<File> retFuture = SettableFuture.create(); + final CompletableFuture<File> retFuture = new CompletableFuture<>(); Thread t = new Thread(new Runnable() { @Override public void run() { - retFuture.set(stagingCache.getIfPresent(ID_PREFIX + 1)); + retFuture.complete(stagingCache.getIfPresent(ID_PREFIX + 1)); } }); @@ -327,7 +325,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { // Add another load File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile()); - Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 1, f2); + Optional<CompletableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 1, f2); assertFalse(future2.isPresent()); //start @@ -336,10 +334,10 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { assertFuture(futures, 0); // Try 2nd upload again - Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 1, f2); + Optional<CompletableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 1, f2); futures = new ArrayList<>(); if (future.isPresent()) { - futures.add(FutureConverter.toCompletableFuture(future.get())); + futures.add(future.get()); } assertFuture(futures, 1); @@ -411,7 +409,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { List<CompletableFuture<Integer>> futures = put(folder); File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); - Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 0, f); + Optional<CompletableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 0, f); assertTrue(future2.isPresent()); assertEquals(future2.get().get().intValue(), 0); @@ -474,9 +472,9 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { // Add diff load File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile()); - Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 1, f2); + Optional<CompletableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 1, f2); if (future2.isPresent()) { - futures.add(FutureConverter.toCompletableFuture(future2.get())); + futures.add(future2.get()); } //start @@ -553,7 +551,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { File f2 = copyToFile(randomStream(diff, 4 * 1024), folder.newFile()); CountDownLatch putThreadLatch = new CountDownLatch(1); CountDownLatch triggerLatch = new CountDownLatch(1); - CompletableFuture<Optional<SettableFuture<Integer>>> future1 = + CompletableFuture<Optional<CompletableFuture<Integer>>> future1 = putThread(executorService, diff, f2, stagingCache, putThreadLatch, triggerLatch); putThreadLatch.countDown(); @@ -563,7 +561,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { removeExecutor.schedule(stagingCache.new RemoveJob(), 0, TimeUnit.MILLISECONDS); triggerLatch.await(); if (future1.get().isPresent()) { - futures.add(FutureConverter.toCompletableFuture(future1.get().get())); + futures.add(future1.get().get()); } CompletableFuture<List<Integer>> listCompletableFuture = FutureUtils.successfulAsList(futures); @@ -746,10 +744,10 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { return future; } - private static CompletableFuture<Optional<SettableFuture<Integer>>> putThread( + private static CompletableFuture<Optional<CompletableFuture<Integer>>> putThread( ExecutorService executor, final int seed, final File f, final UploadStagingCache cache, final CountDownLatch start, final CountDownLatch trigger) { - final CompletableFuture<Optional<SettableFuture<Integer>>> future = new CompletableFuture<>(); + final CompletableFuture<Optional<CompletableFuture<Integer>>> future = new CompletableFuture<>(); executor.submit(new Runnable() { @Override public void run() { try { @@ -757,7 +755,7 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { start.await(); LOG.info("Starting put"); trigger.countDown(); - Optional<SettableFuture<Integer>> opt = cache.put(ID_PREFIX + seed, f); + Optional<CompletableFuture<Integer>> opt = cache.put(ID_PREFIX + seed, f); LOG.info("Finished put"); future.complete(opt); } catch (Exception e) { @@ -804,10 +802,10 @@ public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { private List<CompletableFuture<Integer>> put(TemporaryFolder folder) throws IOException { File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); - Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, f); + Optional<CompletableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, f); List<CompletableFuture<Integer>> futures = new ArrayList<>(); if (future.isPresent()) { - futures.add(FutureConverter.toCompletableFuture(future.get())); + futures.add(future.get()); } return futures; } diff --git a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureConverter.java b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureConverter.java index 1859d3b930..9f9bab4ac7 100644 --- a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureConverter.java +++ b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureConverter.java @@ -92,23 +92,13 @@ public class FutureConverter { } }; - listenableFuture.addListener(() -> { - try { - if (listenableFuture.isCancelled()) { - // If source future was cancelled, cancel this CompletableFuture too - completable.cancel(false); - } else { - // Complete normally with the result - completable.complete(listenableFuture.get()); - } - } catch (InterruptedException ex) { - // fix for sonar : https://sonarcloud.io/organizations/apache/rules?open=java%3AS2142&rule_key=java%3AS2142 - Thread.currentThread().interrupt(); - completable.completeExceptionally(ex); - } catch (Exception ex) { - completable.completeExceptionally(ex.getCause() != null ? ex.getCause() : ex); - } - }, DIRECT_EXECUTOR); + // Check if the ListenableFuture is already done to avoid unnecessary async overhead + if (listenableFuture.isDone()) { + handleConversion(listenableFuture, completable); + } else { + // Future is not done yet, add listener for completion + listenableFuture.addListener(() -> handleConversion(listenableFuture, completable), DIRECT_EXECUTOR); + } return completable; } @@ -169,4 +159,21 @@ public class FutureConverter { } }; } + + // helper methods + + private static <T> void handleConversion(final ListenableFuture<T> listenableFuture, final CompletableFuture<T> completable) { + try { + if (listenableFuture.isCancelled()) { + completable.cancel(false); + } else { + completable.complete(listenableFuture.get()); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + completable.completeExceptionally(ex); + } catch (Exception ex) { + completable.completeExceptionally(ex.getCause() != null ? ex.getCause() : ex); + } + } } diff --git a/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureConverterTest.java b/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureConverterTest.java index 6310705850..f638b95b7d 100644 --- a/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureConverterTest.java +++ b/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureConverterTest.java @@ -27,8 +27,10 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Unit cases for {@link FutureConverter} @@ -105,6 +107,76 @@ public class FutureConverterTest { Assert.assertTrue(listenable.isCancelled()); } + @Test + public void toCompletableFutureTestGetRestoresInterruptStatus() throws Exception { + SettableFuture<String> listenable = SettableFuture.create(); + CompletableFuture<String> completable = FutureConverter.toCompletableFuture(listenable); + + final AtomicBoolean interruptedInCatch = new AtomicBoolean(false); + final AtomicBoolean caughtInterruptedException = new AtomicBoolean(false); + final CountDownLatch threadStarted = new CountDownLatch(1); + + Thread testThread = new Thread(() -> { + threadStarted.countDown(); + try { + completable.get(); // This will block, we interrupt + Assert.fail("Expected InterruptedException"); + } catch (InterruptedException e) { + // Expected interrupt + caughtInterruptedException.set(true); + interruptedInCatch.set(Thread.currentThread().isInterrupted()); + } catch (Exception e) { + Assert.fail("Unexpected exception: " + e); + } + }); + + testThread.start(); + // Wait for thread to start and then interrupt + threadStarted.await(); + Thread.sleep(50); // Small delay to ensure get() is called + testThread.interrupt(); + + testThread.join(); + + Assert.assertTrue("Should have caught InterruptedException", caughtInterruptedException.get()); + Assert.assertTrue("Thread should be interrupted when catching InterruptedException", interruptedInCatch.get()); + } + + @Test + public void toCompletableFutureTestGetTimeoutRestoresInterruptStatus() throws Exception { + SettableFuture<String> listenable = SettableFuture.create(); + CompletableFuture<String> completable = FutureConverter.toCompletableFuture(listenable); + + final AtomicBoolean interruptedInCatch = new AtomicBoolean(false); + final AtomicBoolean caughtInterruptedException = new AtomicBoolean(false); + final CountDownLatch threadStarted = new CountDownLatch(1); + + Thread testThread = new Thread(() -> { + threadStarted.countDown(); + try { + completable.get(10, TimeUnit.SECONDS); // Will block and get interrupted + Assert.fail("Expected InterruptedException"); + } catch (InterruptedException e) { + // Expected interrupt + caughtInterruptedException.set(true); + interruptedInCatch.set(Thread.currentThread().isInterrupted()); + } catch (Exception e) { + Assert.fail("Unexpected exception: " + e); + } + }); + + testThread.start(); + // Wait for thread to start and then interrupt + threadStarted.await(); + Thread.sleep(50); // Small delay to ensure get() is called + testThread.interrupt(); + + testThread.join(); + + Assert.assertTrue("Should have caught InterruptedException", caughtInterruptedException.get()); + Assert.assertTrue("Thread should be interrupted when catching InterruptedException", interruptedInCatch.get()); + } + @Test public void testConvertListSuccessful() throws Exception { SettableFuture<String> f1 = SettableFuture.create(); @@ -203,4 +275,74 @@ public class FutureConverterTest { Assert.assertTrue(completable.isCancelled()); } + @Test + public void toListenableFutureTestGetRestoresInterruptStatus() throws Exception { + CompletableFuture<String> completable = new CompletableFuture<>(); + ListenableFuture<String> listenable = FutureConverter.toListenableFuture(completable); + + final AtomicBoolean interruptedInCatch = new AtomicBoolean(false); + final AtomicBoolean caughtInterruptedException = new AtomicBoolean(false); + final CountDownLatch threadStarted = new CountDownLatch(1); + + Thread testThread = new Thread(() -> { + threadStarted.countDown(); + try { + listenable.get(); // This will block, we interrupt + Assert.fail("Expected InterruptedException"); + } catch (InterruptedException e) { + // Expected interrupt + caughtInterruptedException.set(true); + interruptedInCatch.set(Thread.currentThread().isInterrupted()); + } catch (Exception e) { + Assert.fail("Unexpected exception: " + e); + } + }); + + testThread.start(); + // Wait for thread to start and then interrupt + threadStarted.await(); + Thread.sleep(50); // Small delay to ensure get() is called + testThread.interrupt(); + + testThread.join(); + + Assert.assertTrue("Should have caught InterruptedException", caughtInterruptedException.get()); + Assert.assertTrue("Thread should be interrupted when catching InterruptedException", interruptedInCatch.get()); + } + + @Test + public void toListenableFutureTestGetTimeoutRestoresInterruptStatus() throws Exception { + CompletableFuture<String> completable = new CompletableFuture<>(); + ListenableFuture<String> listenable = FutureConverter.toListenableFuture(completable); + + final AtomicBoolean interruptedInCatch = new AtomicBoolean(false); + final AtomicBoolean caughtInterruptedException = new AtomicBoolean(false); + final CountDownLatch threadStarted = new CountDownLatch(1); + + Thread testThread = new Thread(() -> { + threadStarted.countDown(); + try { + listenable.get(10, TimeUnit.SECONDS); // Will block and get interrupted + Assert.fail("Expected InterruptedException"); + } catch (InterruptedException e) { + // Expected interrupt + caughtInterruptedException.set(true); + interruptedInCatch.set(Thread.currentThread().isInterrupted()); + } catch (Exception e) { + Assert.fail("Unexpected exception: " + e); + } + }); + + testThread.start(); + // Wait for thread to start and then interrupt + threadStarted.await(); + Thread.sleep(50); // Small delay to ensure get() is called + testThread.interrupt(); + + testThread.join(); + + Assert.assertTrue("Should have caught InterruptedException", caughtInterruptedException.get()); + Assert.assertTrue("Thread should be interrupted when catching InterruptedException", interruptedInCatch.get()); + } + } \ No newline at end of file diff --git a/oak-pojosr/src/main/java/org/apache/jackrabbit/oak/run/osgi/OakOSGiRepositoryFactory.java b/oak-pojosr/src/main/java/org/apache/jackrabbit/oak/run/osgi/OakOSGiRepositoryFactory.java index 1ac4cd133d..1c2e62b2fe 100644 --- a/oak-pojosr/src/main/java/org/apache/jackrabbit/oak/run/osgi/OakOSGiRepositoryFactory.java +++ b/oak-pojosr/src/main/java/org/apache/jackrabbit/oak/run/osgi/OakOSGiRepositoryFactory.java @@ -31,6 +31,7 @@ import java.util.Hashtable; import java.util.List; import java.util.Map; import java.util.ServiceLoader; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -45,7 +46,6 @@ import javax.management.AttributeList; import javax.management.MBeanServer; import javax.management.ObjectName; -import org.apache.jackrabbit.guava.common.util.concurrent.SettableFuture; import org.apache.commons.io.FilenameUtils; import org.apache.felix.connect.launch.BundleDescriptor; import org.apache.felix.connect.launch.ClasspathScanner; @@ -177,7 +177,7 @@ public class OakOSGiRepositoryFactory implements RepositoryFactory { //Future which would be used to notify when repository is ready // to be used - SettableFuture<Repository> repoFuture = SettableFuture.create(); + CompletableFuture<Repository> repoFuture = new CompletableFuture<>(); new RunnableJobTracker(registry.getBundleContext()); @@ -387,14 +387,14 @@ public class OakOSGiRepositoryFactory implements RepositoryFactory { } private static class RepositoryTracker extends ServiceTracker<Repository, Repository> { - private final SettableFuture<Repository> repoFuture; + private final CompletableFuture<Repository> repoFuture; private final PojoServiceRegistry registry; private final BundleActivator activator; private RepositoryProxy proxy; private final int timeoutInSecs; public RepositoryTracker(PojoServiceRegistry registry, BundleActivator activator, - SettableFuture<Repository> repoFuture, int timeoutInSecs) { + CompletableFuture<Repository> repoFuture, int timeoutInSecs) { super(registry.getBundleContext(), Repository.class.getName(), null); this.repoFuture = repoFuture; this.registry = registry; @@ -410,7 +410,7 @@ public class OakOSGiRepositoryFactory implements RepositoryFactory { //As its possible that future is accessed before the service //get registered with tracker. We also capture the initial reference //and use that for the first access case - repoFuture.set(createProxy(service)); + repoFuture.complete(createProxy(service)); } return service; } diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommit.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommit.java index c79053899f..c803fdc286 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommit.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/BatchCommit.java @@ -28,8 +28,6 @@ import org.apache.jackrabbit.oak.commons.conditions.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.jackrabbit.guava.common.util.concurrent.SettableFuture; - import static org.apache.jackrabbit.oak.commons.conditions.Validate.checkArgument; import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; @@ -150,11 +148,11 @@ final class BatchCommit { void executeIndividually() { DocumentStore store = queue.getStore(); for (UpdateOp op : ops) { - SettableFuture<NodeDocument> result = SettableFuture.create(); + CompletableFuture<NodeDocument> result = new CompletableFuture<>(); try { - result.set(store.findAndUpdate(NODES, op)); + result.complete(store.findAndUpdate(NODES, op)); } catch (Throwable t) { - result.setException(t); + result.completeExceptionally(t); } results.add(result); }