This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/fluo.git
The following commit(s) were added to refs/heads/master by this push: new ca63aaf fixes #722 Replace ListenableFuture with CompletableFuture (#975) ca63aaf is described below commit ca63aaf83383293d19a2d4aee9fa43346500ffe0 Author: Joseph Koshakow <kosh...@gmail.com> AuthorDate: Wed Dec 6 18:34:40 2017 -0500 fixes #722 Replace ListenableFuture with CompletableFuture (#975) --- .../fluo/core/async/AsyncConditionalWriter.java | 55 +++------ .../apache/fluo/core/impl/SharedBatchWriter.java | 34 +++--- .../org/apache/fluo/core/impl/TransactionImpl.java | 132 +++++++-------------- .../org/apache/fluo/core/oracle/OracleClient.java | 27 ++--- 4 files changed, 86 insertions(+), 162 deletions(-) diff --git a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java index 75a9fa1..6f35fbc 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java +++ b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java @@ -19,15 +19,12 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList.Builder; -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.ConditionalWriter.Result; import org.apache.accumulo.core.data.ConditionalMutation; @@ -36,11 +33,10 @@ import org.apache.fluo.core.impl.FluoConfigurationImpl; import org.apache.fluo.core.util.FluoExecutors; import org.apache.fluo.core.util.Limit; -public class AsyncConditionalWriter - implements AsyncFunction<Collection<ConditionalMutation>, Iterator<Result>> { +public class AsyncConditionalWriter { private final ConditionalWriter cw; - private final ListeningExecutorService les; + private final ExecutorService es; private final Limit semaphore; @@ -50,55 +46,38 @@ public class AsyncConditionalWriter FluoConfigurationImpl.ASYNC_CW_THREADS_DEFAULT); int permits = env.getConfiguration().getInt(FluoConfigurationImpl.ASYNC_CW_LIMIT, FluoConfigurationImpl.ASYNC_CW_LIMIT_DEFAULT); - this.les = - MoreExecutors.listeningDecorator(FluoExecutors.newFixedThreadPool(numThreads, "asyncCW")); + this.es = FluoExecutors.newFixedThreadPool(numThreads, "asyncCw"); // the conditional writer currently has not memory limits... give it too much and it blows out // memory.. need to fix this in conditional writer // for now this needs to be memory based this.semaphore = new Limit(permits); } - private class IterTask implements Callable<Iterator<Result>> { - - private Iterator<Result> input; - private int permitsAcquired; - - public IterTask(Iterator<Result> iter, int permitsAcquired) { - this.input = iter; - this.permitsAcquired = permitsAcquired; + public CompletableFuture<Iterator<Result>> apply(Collection<ConditionalMutation> input) { + if (input.size() == 0) { + return CompletableFuture.completedFuture(Collections.<Result>emptyList().iterator()); } - @Override - public Iterator<Result> call() throws Exception { + semaphore.acquire(input.size()); + Iterator<Result> iter = cw.write(input.iterator()); + return CompletableFuture.supplyAsync(() -> { try { Builder<Result> imlb = ImmutableList.builder(); - while (input.hasNext()) { - Result result = input.next(); + while (iter.hasNext()) { + Result result = iter.next(); imlb.add(result); } return imlb.build().iterator(); } finally { - semaphore.release(permitsAcquired); + semaphore.release(input.size()); } - } - - } - - @Override - public ListenableFuture<Iterator<Result>> apply(Collection<ConditionalMutation> input) { - if (input.size() == 0) { - return Futures.immediateFuture(Collections.<Result>emptyList().iterator()); - } - - semaphore.acquire(input.size()); - Iterator<Result> iter = cw.write(input.iterator()); - return les.submit(new IterTask(iter, input.size())); + }, es); } public void close() { - les.shutdownNow(); + es.shutdownNow(); try { - les.awaitTermination(5, TimeUnit.SECONDS); + es.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java index 2b53cbf..d87e9a7 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java @@ -21,12 +21,11 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.data.Mutation; @@ -42,13 +41,15 @@ public class SharedBatchWriter { private AtomicLong asyncBatchesAdded = new AtomicLong(0); private long asyncBatchesProcessed = 0; + // added to avoid findbugs false positive + private static final Supplier<Void> NULLS = () -> null; private static class MutationBatch { private Collection<Mutation> mutations; private CountDownLatch cdl; private boolean isAsync = false; - private ListenableFutureTask<Void> lf; + private CompletableFuture<Void> cf; public MutationBatch(Collection<Mutation> mutations, boolean isAsync) { this.mutations = mutations; @@ -58,9 +59,9 @@ public class SharedBatchWriter { } } - public MutationBatch(Collection<Mutation> mutations, ListenableFutureTask<Void> lf) { + public MutationBatch(Collection<Mutation> mutations, CompletableFuture<Void> cf) { this.mutations = mutations; - this.lf = lf; + this.cf = cf; this.cdl = null; this.isAsync = false; } @@ -70,8 +71,8 @@ public class SharedBatchWriter { cdl.countDown(); } - if (lf != null) { - lf.run(); + if (cf != null) { + cf.complete(NULLS.get()); } } } @@ -170,27 +171,22 @@ public class SharedBatchWriter { } } - private static final Runnable DO_NOTHING = new Runnable() { - @Override - public void run() {} - }; - - ListenableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) { + CompletableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) { if (ml.size() == 0) { - return Futures.immediateFuture(null); + return CompletableFuture.completedFuture(NULLS.get()); } - ListenableFutureTask<Void> lf = ListenableFutureTask.create(DO_NOTHING, null); + CompletableFuture<Void> cf = new CompletableFuture<>(); try { - MutationBatch mb = new MutationBatch(ml, lf); + MutationBatch mb = new MutationBatch(ml, cf); mutQueue.put(mb); - return lf; + return cf; } catch (Exception e) { throw new RuntimeException(e); } } - ListenableFuture<Void> writeMutationsAsyncFuture(Mutation m) { + CompletableFuture<Void> writeMutationsAsyncFuture(Mutation m) { return writeMutationsAsyncFuture(Collections.singleton(m)); } diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java index 0abdafe..94e4107 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java @@ -26,15 +26,13 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ConditionalWriter; @@ -818,33 +816,12 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra return startTs; } - // async experiment - - private abstract static class CommitCallback<V> implements FutureCallback<V> { - - private CommitData cd; - - CommitCallback(CommitData cd) { - this.cd = cd; - } - - @Override - public void onSuccess(V result) { - try { - onSuccess(cd, result); - } catch (Exception e) { - cd.commitObserver.failed(e); - } - } - - protected abstract void onSuccess(CommitData cd, V result) throws Exception; - - - @Override - public void onFailure(Throwable t) { - cd.commitObserver.failed(t); - } - + /** + * Funcitonal interface to provide next step of asynchronous commit on successful completion of + * the previous one + */ + private static interface OnSuccessInterface<V> { + public void onSuccess(V result) throws Exception; } private abstract static class SynchronousCommitTask implements Runnable { @@ -895,6 +872,24 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra return size; } + private <V> void addCallback(CompletableFuture<V> cfuture, CommitData cd, + OnSuccessInterface<V> onSuccessInterface) { + cfuture.handleAsync((result, exception) -> { + if (exception != null) { + cd.commitObserver.failed(exception); + return null; + } else { + try { + onSuccessInterface.onSuccess(result); + return null; + } catch (Exception e) { + cd.commitObserver.failed(e); + return null; + } + } + }, env.getSharedResources().getAsyncCommitExecutor()); + } + @Override public synchronized void commitAsync(AsyncCommitObserver commitCallback) { @@ -972,13 +967,8 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra final ConditionalMutation pcm = prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, isTriggerRow(cd.prow)); - ListenableFuture<Iterator<Result>> future = cd.acw.apply(Collections.singletonList(pcm)); - Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) { - @Override - protected void onSuccess(CommitData cd, Iterator<Result> result) throws Exception { - postLockPrimary(cd, pcm, Iterators.getOnlyElement(result)); - } - }, env.getSharedResources().getAsyncCommitExecutor()); + CompletableFuture<Iterator<Result>> cfuture = cd.acw.apply(Collections.singletonList(pcm)); + addCallback(cfuture, cd, result -> postLockPrimary(cd, pcm, Iterators.getOnlyElement(result))); } private void postLockPrimary(final CommitData cd, final ConditionalMutation pcm, Result result) @@ -1059,13 +1049,8 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra cd.acceptedRows = new HashSet<>(); - ListenableFuture<Iterator<Result>> future = cd.bacw.apply(mutations); - Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) { - @Override - protected void onSuccess(CommitData cd, Iterator<Result> results) throws Exception { - postLockOther(cd, results); - } - }, env.getSharedResources().getAsyncCommitExecutor()); + CompletableFuture<Iterator<Result>> cfuture = cd.bacw.apply(mutations); + addCallback(cfuture, cd, results -> postLockOther(cd, results)); } private void postLockOther(final CommitData cd, Iterator<Result> results) throws Exception { @@ -1092,13 +1077,8 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra } else if (stopAfterPreCommit) { cd.commitObserver.committed(); } else { - ListenableFuture<Stamp> future = env.getSharedResources().getOracleClient().getStampAsync(); - Futures.addCallback(future, new CommitCallback<Stamp>(cd) { - @Override - protected void onSuccess(CommitData cd, Stamp stamp) throws Exception { - beginSecondCommitPhase(cd, stamp); - } - }, env.getSharedResources().getAsyncCommitExecutor()); + CompletableFuture<Stamp> cfuture = env.getSharedResources().getOracleClient().getStampAsync(); + addCallback(cfuture, cd, stamp -> beginSecondCommitPhase(cd, stamp)); } } @@ -1124,14 +1104,9 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra mutations.add(m); } - ListenableFuture<Void> future = + CompletableFuture<Void> cfuture = env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations); - Futures.addCallback(future, new CommitCallback<Void>(cd) { - @Override - protected void onSuccess(CommitData cd, Void v) throws Exception { - rollbackPrimaryLock(cd); - } - }, env.getSharedResources().getAsyncCommitExecutor()); + addCallback(cfuture, cd, result -> rollbackPrimaryLock(cd)); } private void rollbackPrimaryLock(CommitData cd) throws Exception { @@ -1143,14 +1118,10 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra DelLockValue.encodeRollback(startTs, true, true)); m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY); - ListenableFuture<Void> future = + CompletableFuture<Void> cfuture = env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(m); - Futures.addCallback(future, new CommitCallback<Void>(cd) { - @Override - protected void onSuccess(CommitData cd, Void v) throws Exception { - cd.commitObserver.commitFailed(cd.getShortCollisionMessage()); - } - }, env.getSharedResources().getAsyncCommitExecutor()); + addCallback(cfuture, cd, + result -> cd.commitObserver.commitFailed(cd.getShortCollisionMessage())); } private void beginSecondCommitPhase(CommitData cd, Stamp commitStamp) throws Exception { @@ -1213,14 +1184,9 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra } } - ListenableFuture<Void> future = + CompletableFuture<Void> cfuture = env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations.values()); - Futures.addCallback(future, new CommitCallback<Void>(cd) { - @Override - protected void onSuccess(CommitData cd, Void v) throws Exception { - commmitPrimary(cd, commitTs); - } - }, env.getSharedResources().getAsyncCommitExecutor()); + addCallback(cfuture, cd, result -> commmitPrimary(cd, commitTs)); } private void commmitPrimary(CommitData cd, final long commitTs) { @@ -1237,15 +1203,10 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval), isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation); - ListenableFuture<Iterator<Result>> future = + CompletableFuture<Iterator<Result>> cfuture = cd.acw.apply(Collections.singletonList(delLockMutation)); - Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) { - @Override - protected void onSuccess(CommitData cd, Iterator<Result> result) throws Exception { - handleUnkownStatsAfterPrimary(cd, commitTs, delLockMutation, - Iterators.getOnlyElement(result)); - } - }, env.getSharedResources().getAsyncCommitExecutor()); + addCallback(cfuture, cd, result -> handleUnkownStatsAfterPrimary(cd, commitTs, delLockMutation, + Iterators.getOnlyElement(result))); } private void handleUnkownStatsAfterPrimary(CommitData cd, final long commitTs, @@ -1321,16 +1282,9 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra mutations.add(m); } - - ListenableFuture<Void> future = + CompletableFuture<Void> cfuture = env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations); - Futures.addCallback(future, new CommitCallback<Void>(cd) { - @Override - protected void onSuccess(CommitData cd, Void v) throws Exception { - finishCommit(cd, commitTs); - } - }, env.getSharedResources().getAsyncCommitExecutor()); - + addCallback(cfuture, cd, result -> finishCommit(cd, commitTs)); } @VisibleForTesting diff --git a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java index 012c4a8..9c11239 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java +++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -27,8 +28,6 @@ import java.util.concurrent.atomic.AtomicReference; import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; import com.codahale.metrics.Timer.Context; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; @@ -70,15 +69,10 @@ public class OracleClient implements AutoCloseable { private Participant currentLeader; - private static final class TimeRequest implements Callable<Stamp> { + private static final class TimeRequest { CountDownLatch cdl = new CountDownLatch(1); AtomicReference<Stamp> stampRef = new AtomicReference<>(); - ListenableFutureTask<Stamp> lf = null; - - @Override - public Stamp call() throws Exception { - return stampRef.get(); - } + CompletableFuture<Stamp> cf = null; } private class TimestampRetriever extends LeaderSelectorListenerAdapter @@ -211,11 +205,12 @@ public class OracleClient implements AutoCloseable { for (int i = 0; i < request.size(); i++) { TimeRequest tr = request.get(i); - tr.stampRef.set(new Stamp(txStampsStart + i, gcStamp)); - if (tr.lf == null) { + Stamp stampRes = new Stamp(txStampsStart + i, gcStamp); + tr.stampRef.set(stampRes); + if (tr.cf == null) { tr.cdl.countDown(); } else { - tr.lf.run(); + tr.cf.complete(stampRes); } } } catch (InterruptedException e) { @@ -386,18 +381,18 @@ public class OracleClient implements AutoCloseable { return tr.stampRef.get(); } - public ListenableFuture<Stamp> getStampAsync() { + public CompletableFuture<Stamp> getStampAsync() { checkClosed(); TimeRequest tr = new TimeRequest(); - ListenableFutureTask<Stamp> lf = ListenableFutureTask.create(tr); - tr.lf = lf; + CompletableFuture<Stamp> cf = new CompletableFuture<>(); + tr.cf = cf; try { queue.put(tr); } catch (InterruptedException e) { throw new RuntimeException(e); } - return lf; + return cf; } /** -- To stop receiving notification emails like this one, please contact ['"commits@fluo.apache.org" <commits@fluo.apache.org>'].