This is an automated email from the ASF dual-hosted git repository. daim pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push: new 1887accb39 OAK-11890 : removed usage of Guava's Futures.allAsList (#2493) 1887accb39 is described below commit 1887accb390736a4aeba1276a4c0ec3ba930105f Author: Rishabh Kumar <rishabhdaim1...@gmail.com> AuthorDate: Tue Sep 9 11:13:07 2025 +0530 OAK-11890 : removed usage of Guava's Futures.allAsList (#2493) --- .../internal/concurrent/FutureConverter.java | 2 +- .../commons/internal/concurrent/FutureUtils.java | 25 +++ .../internal/concurrent/FutureUtilsTest.java | 201 +++++++++++++++++---- .../jackrabbit/oak/jcr/AtomicCounterClusterIT.java | 5 +- .../apache/jackrabbit/oak/jcr/AtomicCounterIT.java | 5 +- .../jackrabbit/oak/jcr/ConcurrentReadIT.java | 11 +- 6 files changed, 202 insertions(+), 47 deletions(-) diff --git a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureConverter.java b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureConverter.java index da60a035a0..e3bcc7fc45 100644 --- a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureConverter.java +++ b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureConverter.java @@ -37,7 +37,7 @@ public class FutureConverter { private static final Executor DIRECT_EXECUTOR = Runnable::run; - public static <T> List<CompletableFuture<T>> toCompletableFuture(final List<ListenableFuture<T>> listenableFutures) { + public static <T> List<CompletableFuture<T>> toCompletableFuture(final List<? extends ListenableFuture<T>> listenableFutures) { return listenableFutures.stream() .map(FutureConverter::toCompletableFuture) .collect(Collectors.toList()); diff --git a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtils.java b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtils.java index 22de4cea20..dae861c9f7 100644 --- a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtils.java +++ b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtils.java @@ -18,6 +18,11 @@ */ package org.apache.jackrabbit.oak.commons.internal.concurrent; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.jackrabbit.oak.commons.collections.IterableUtils; +import org.apache.jackrabbit.oak.commons.collections.IteratorUtils; +import org.apache.jackrabbit.oak.commons.collections.StreamUtils; + import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -57,4 +62,24 @@ public class FutureUtils { ); } + /** + * Returns a {@link CompletableFuture} that completes when all of the given input futures complete. + * If all input futures complete successfully, the returned future completes with a list of all results, + * maintaining the input order. + * <p> + * If any input future completes exceptionally or is cancelled, the returned future also completes exceptionally. + * This method is analogous to Guava's {@code Futures.allAsList(tasks)}. + * + * @param tasks the list of CompletableFutures to combine + * @param <T> the result type of the futures + * @return a CompletableFuture that completes with a list of all input results, in order + */ + public static <T> CompletableFuture<List<T>> allAsList(final Iterable<? extends CompletableFuture<? extends T>> tasks) { + + return CompletableFuture.allOf(IteratorUtils.toArray(tasks.iterator(), CompletableFuture.class)) + .thenApply(v -> StreamUtils.toStream(tasks) + .map(CompletableFuture::join) // .join throws if any input failed + .collect(Collectors.toList())); + } + } diff --git a/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtilsTest.java b/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtilsTest.java index 702d3d2fed..21f489fc2a 100644 --- a/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtilsTest.java +++ b/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/internal/concurrent/FutureUtilsTest.java @@ -18,12 +18,16 @@ */ package org.apache.jackrabbit.oak.commons.internal.concurrent; +import org.apache.jackrabbit.guava.common.util.concurrent.Futures; +import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture; +import org.apache.jackrabbit.guava.common.util.concurrent.SettableFuture; import org.junit.Assert; import org.junit.Test; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; /** * Unit cases for {@link FutureUtils} @@ -31,58 +35,181 @@ import java.util.concurrent.CompletableFuture; public class FutureUtilsTest { @Test - public void testAllSuccessful() throws Exception { - CompletableFuture<String> f1 = CompletableFuture.completedFuture("a"); - CompletableFuture<String> f2 = CompletableFuture.completedFuture("b"); + public void successfulAsListAllSuccessful() throws Exception { + CompletableFuture<String> cf1 = CompletableFuture.completedFuture("a"); + CompletableFuture<String> cf2 = CompletableFuture.completedFuture("b"); + List<CompletableFuture<String>> jdkFutures = Arrays.asList(cf1, cf2); - List<CompletableFuture<String>> futures = Arrays.asList(f1, f2); - CompletableFuture<List<String>> combined = FutureUtils.successfulAsList(futures); + SettableFuture<String> lf1 = SettableFuture.create(); + SettableFuture<String> lf2 = SettableFuture.create(); + lf1.set("a"); + lf2.set("b"); + List<ListenableFuture<String>> guavaFutures = Arrays.asList(lf1, lf2); - List<String> results = combined.get(); - Assert.assertEquals(2, results.size()); - Assert.assertEquals("a", results.get(0)); - Assert.assertEquals("b", results.get(1)); + // Native JDK method + List<String> jdkResults = FutureUtils.successfulAsList(jdkFutures).get(); + + // Guava method (blocking) + List<String> guavaResults = Futures.successfulAsList(guavaFutures).get(); + + Assert.assertEquals(guavaResults, jdkResults); + } + + @Test + public void successfulAsListPartialFailure() throws Exception { + CompletableFuture<String> cf1 = CompletableFuture.completedFuture("a"); + CompletableFuture<String> cf2 = new CompletableFuture<>(); + cf2.completeExceptionally(new RuntimeException("fail")); + List<CompletableFuture<String>> jdkFutures = Arrays.asList(cf1, cf2); + + SettableFuture<String> lf1 = SettableFuture.create(); + SettableFuture<String> lf2 = SettableFuture.create(); + lf1.set("a"); + lf2.setException(new RuntimeException("fail")); + List<ListenableFuture<String>> guavaFutures = Arrays.asList(lf1, lf2); + + // Native JDK method + List<String> jdkResults = FutureUtils.successfulAsList(jdkFutures).get(); + + // Guava method (blocking) + List<String> guavaResults = Futures.successfulAsList(guavaFutures).get(); + + Assert.assertEquals(guavaResults, jdkResults); + } + + @Test + public void successfulAsListAllFailures() throws Exception { + CompletableFuture<String> cf1 = new CompletableFuture<>(); + cf1.completeExceptionally(new RuntimeException("fail A")); + CompletableFuture<String> cf2 = new CompletableFuture<>(); + cf2.completeExceptionally(new RuntimeException("fail B")); + List<CompletableFuture<String>> jdkFutures = Arrays.asList(cf1, cf2); + + SettableFuture<String> lf1 = SettableFuture.create(); + SettableFuture<String> lf2 = SettableFuture.create(); + lf1.setException(new RuntimeException("fail A")); + lf2.setException(new RuntimeException("fail B")); + List<ListenableFuture<String>> guavaFutures = Arrays.asList(lf1, lf2); + + // Native JDK method + List<String> jdkResults = FutureUtils.successfulAsList(jdkFutures).get(); + + // Guava method (blocking) + List<String> guavaResults = Futures.successfulAsList(guavaFutures).get(); + + Assert.assertEquals(guavaResults, jdkResults); } @Test - public void testPartialFailure() throws Exception { - CompletableFuture<String> f1 = CompletableFuture.completedFuture("a"); - CompletableFuture<String> f2 = new CompletableFuture<>(); - f2.completeExceptionally(new RuntimeException("fail")); - - List<CompletableFuture<String>> futures = Arrays.asList(f1, f2); - CompletableFuture<List<String>> combined = FutureUtils.successfulAsList(futures); - - List<String> results = combined.get(); - Assert.assertEquals(2, results.size()); - Assert.assertEquals("a", results.get(0)); - Assert.assertNull(results.get(1)); // Failure replaced by null + public void successfulAsListEmptyList() throws Exception { + List<CompletableFuture<String>> jdkFutures = List.of(); + List<ListenableFuture<String>> guavaFutures = List.of(); + + List<String> jdkResult = FutureUtils.successfulAsList(jdkFutures).get(); + List<String> guavaResult = Futures.successfulAsList(guavaFutures).get(); + + Assert.assertEquals(guavaResult, jdkResult); + Assert.assertTrue(jdkResult.isEmpty()); } @Test - public void testAllFailures() throws Exception { - CompletableFuture<String> f1 = new CompletableFuture<>(); - f1.completeExceptionally(new RuntimeException("fail A")); + public void allAsListAllSuccessful() throws Exception { + CompletableFuture<String> cf1 = CompletableFuture.completedFuture("foo"); + CompletableFuture<String> cf2 = CompletableFuture.completedFuture("bar"); + List<CompletableFuture<String>> jdkFutures = Arrays.asList(cf1, cf2); - CompletableFuture<String> f2 = new CompletableFuture<>(); - f2.completeExceptionally(new RuntimeException("fail B")); + SettableFuture<String> lf1 = SettableFuture.create(); + SettableFuture<String> lf2 = SettableFuture.create(); + lf1.set("foo"); + lf2.set("bar"); + List<ListenableFuture<String>> guavaFutures = Arrays.asList(lf1, lf2); - List<CompletableFuture<String>> futures = Arrays.asList(f1, f2); - CompletableFuture<List<String>> combined = FutureUtils.successfulAsList(futures); + // Native JDK method + List<String> jdkResults = FutureUtils.allAsList(jdkFutures).get(); - List<String> results = combined.get(); - Assert.assertEquals(2, results.size()); - Assert.assertNull(results.get(0)); - Assert.assertNull(results.get(1)); + // Guava method (blocking) + List<String> guavaResults = Futures.allAsList(guavaFutures).get(); + + Assert.assertEquals(guavaResults, jdkResults); } @Test - public void testEmptyList() throws Exception { - List<CompletableFuture<String>> futures = List.of(); - CompletableFuture<List<String>> combined = FutureUtils.successfulAsList(futures); + public void allAsListPartialFailure() throws Exception { + CompletableFuture<String> cf1 = CompletableFuture.completedFuture("ok"); + CompletableFuture<String> cf2 = new CompletableFuture<>(); + cf2.completeExceptionally(new IllegalStateException("fail")); + List<CompletableFuture<String>> jdkFutures = Arrays.asList(cf1, cf2); + + SettableFuture<String> lf1 = SettableFuture.create(); + SettableFuture<String> lf2 = SettableFuture.create(); + lf1.set("ok"); + lf2.setException(new IllegalStateException("fail")); + List<ListenableFuture<String>> guavaFutures = Arrays.asList(lf1, lf2); + + boolean jdkFailed; + try { + FutureUtils.allAsList(jdkFutures).get(); + jdkFailed = false; + } catch (ExecutionException e) { + jdkFailed = true; + } + + boolean guavaFailed; + try { + Futures.allAsList(guavaFutures).get(); + guavaFailed = false; + } catch (ExecutionException e) { + guavaFailed = true; + } + + Assert.assertTrue(jdkFailed); + Assert.assertTrue(guavaFailed); + } + + @Test + public void allAsListAllFailures() throws Exception { + CompletableFuture<String> cf1 = new CompletableFuture<>(); + cf1.completeExceptionally(new RuntimeException("f1 failed")); + CompletableFuture<String> cf2 = new CompletableFuture<>(); + cf2.completeExceptionally(new RuntimeException("f2 failed")); + List<CompletableFuture<String>> jdkFutures = Arrays.asList(cf1, cf2); + + SettableFuture<String> lf1 = SettableFuture.create(); + SettableFuture<String> lf2 = SettableFuture.create(); + lf1.setException(new RuntimeException("f1 failed")); + lf2.setException(new RuntimeException("f2 failed")); + List<ListenableFuture<String>> guavaFutures = Arrays.asList(lf1, lf2); + + boolean jdkFailed; + try { + FutureUtils.allAsList(jdkFutures).get(); + jdkFailed = false; + } catch (ExecutionException e) { + jdkFailed = true; + } + + boolean guavaFailed; + try { + Futures.allAsList(guavaFutures).get(); + guavaFailed = false; + } catch (ExecutionException e) { + guavaFailed = true; + } + + Assert.assertTrue(jdkFailed); + Assert.assertTrue(guavaFailed); + } + + @Test + public void allAsListEmptyList() throws Exception { + List<CompletableFuture<String>> jdkFutures = List.of(); + List<ListenableFuture<String>> guavaFutures = List.of(); + + List<String> jdkResult = FutureUtils.allAsList(jdkFutures).get(); + List<String> guavaResult = Futures.allAsList(guavaFutures).get(); - List<String> results = combined.get(); - Assert.assertTrue(results.isEmpty()); + Assert.assertTrue(jdkResult.isEmpty()); + Assert.assertTrue(guavaResult.isEmpty()); } } \ No newline at end of file diff --git a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterClusterIT.java b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterClusterIT.java index bc61380856..c9db2ece61 100644 --- a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterClusterIT.java +++ b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterClusterIT.java @@ -49,6 +49,8 @@ import org.apache.jackrabbit.oak.commons.FixturesHelper; import org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture; import org.apache.jackrabbit.oak.commons.PerfLogger; import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; +import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureConverter; +import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureUtils; import org.apache.jackrabbit.oak.plugins.atomic.AtomicCounterEditor; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.BeforeClass; @@ -56,7 +58,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.jackrabbit.guava.common.util.concurrent.Futures; import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFutureTask; public class AtomicCounterClusterIT extends DocumentClusterIT { @@ -173,7 +174,7 @@ public class AtomicCounterClusterIT extends DocumentClusterIT { } } LOG_PERF.end(start, -1, "Firing threads completed", ""); - Futures.allAsList(tasks).get(); + FutureUtils.allAsList(FutureConverter.toCompletableFuture(tasks)).get(); LOG_PERF.end(start, -1, "Futures completed", ""); waitForTaskCompletion(); diff --git a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterIT.java b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterIT.java index d849479304..9f7f0d98b3 100644 --- a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterIT.java +++ b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/AtomicCounterIT.java @@ -35,11 +35,12 @@ import javax.jcr.Node; import javax.jcr.RepositoryException; import javax.jcr.Session; -import org.apache.jackrabbit.guava.common.util.concurrent.Futures; import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFutureTask; import org.apache.jackrabbit.oak.NodeStoreFixtures; import org.apache.jackrabbit.oak.commons.FixturesHelper; import org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture; +import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureConverter; +import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureUtils; import org.apache.jackrabbit.oak.fixture.NodeStoreFixture; import org.jetbrains.annotations.NotNull; import org.junit.BeforeClass; @@ -83,7 +84,7 @@ public class AtomicCounterIT extends AbstractRepositoryTest { for (int t = 0; t < 100; t++) { tasks.add(updateCounter(counterPath, rnd.nextInt(10) + 1, expected)); } - Futures.allAsList(tasks).get(); + FutureUtils.allAsList(FutureConverter.toCompletableFuture(tasks)).get(); session.refresh(false); assertEquals(expected.get(), diff --git a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java index bfb16978e9..9965612fa4 100644 --- a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java +++ b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java @@ -31,11 +31,12 @@ import javax.jcr.PropertyIterator; import javax.jcr.RepositoryException; import javax.jcr.Session; -import org.apache.jackrabbit.guava.common.util.concurrent.Futures; import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture; import org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService; import org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors; +import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureConverter; +import org.apache.jackrabbit.oak.commons.internal.concurrent.FutureUtils; import org.apache.jackrabbit.oak.fixture.NodeStoreFixture; import org.junit.Test; @@ -62,7 +63,7 @@ public class ConcurrentReadIT extends AbstractRepositoryTest { ListeningExecutorService executorService = MoreExecutors.listeningDecorator( Executors.newCachedThreadPool()); - List<ListenableFuture<?>> futures = new ArrayList<>(); + List<ListenableFuture<Void>> futures = new ArrayList<>(); for (int k = 0; k < 20; k ++) { futures.add(executorService.submit(new Callable<Void>() { @Override @@ -78,7 +79,7 @@ public class ConcurrentReadIT extends AbstractRepositoryTest { } // Throws ExecutionException if any of the submitted task failed - Futures.allAsList(futures).get(); + FutureUtils.allAsList(FutureConverter.toCompletableFuture(futures)).get(); executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); } finally { @@ -100,7 +101,7 @@ public class ConcurrentReadIT extends AbstractRepositoryTest { ListeningExecutorService executorService = MoreExecutors.listeningDecorator( Executors.newCachedThreadPool()); - List<ListenableFuture<?>> futures = new ArrayList<>(); + List<ListenableFuture<Void>> futures = new ArrayList<>(); for (int k = 0; k < 20; k ++) { futures.add(executorService.submit(new Callable<Void>() { @Override @@ -116,7 +117,7 @@ public class ConcurrentReadIT extends AbstractRepositoryTest { } // Throws ExecutionException if any of the submitted task failed - Futures.allAsList(futures).get(); + FutureUtils.allAsList(FutureConverter.toCompletableFuture(futures)).get(); executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); } finally {