This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git


The following commit(s) were added to refs/heads/master by this push:
     new ca63aaf  fixes #722 Replace ListenableFuture with CompletableFuture 
(#975)
ca63aaf is described below

commit ca63aaf83383293d19a2d4aee9fa43346500ffe0
Author: Joseph Koshakow <kosh...@gmail.com>
AuthorDate: Wed Dec 6 18:34:40 2017 -0500

    fixes #722 Replace ListenableFuture with CompletableFuture (#975)
---
 .../fluo/core/async/AsyncConditionalWriter.java    |  55 +++------
 .../apache/fluo/core/impl/SharedBatchWriter.java   |  34 +++---
 .../org/apache/fluo/core/impl/TransactionImpl.java | 132 +++++++--------------
 .../org/apache/fluo/core/oracle/OracleClient.java  |  27 ++---
 4 files changed, 86 insertions(+), 162 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
 
b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
index 75a9fa1..6f35fbc 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
@@ -19,15 +19,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.client.ConditionalWriter.Result;
 import org.apache.accumulo.core.data.ConditionalMutation;
@@ -36,11 +33,10 @@ import org.apache.fluo.core.impl.FluoConfigurationImpl;
 import org.apache.fluo.core.util.FluoExecutors;
 import org.apache.fluo.core.util.Limit;
 
-public class AsyncConditionalWriter
-    implements AsyncFunction<Collection<ConditionalMutation>, 
Iterator<Result>> {
+public class AsyncConditionalWriter {
 
   private final ConditionalWriter cw;
-  private final ListeningExecutorService les;
+  private final ExecutorService es;
   private final Limit semaphore;
 
 
@@ -50,55 +46,38 @@ public class AsyncConditionalWriter
         FluoConfigurationImpl.ASYNC_CW_THREADS_DEFAULT);
     int permits = 
env.getConfiguration().getInt(FluoConfigurationImpl.ASYNC_CW_LIMIT,
         FluoConfigurationImpl.ASYNC_CW_LIMIT_DEFAULT);
-    this.les =
-        
MoreExecutors.listeningDecorator(FluoExecutors.newFixedThreadPool(numThreads, 
"asyncCW"));
+    this.es = FluoExecutors.newFixedThreadPool(numThreads, "asyncCw");
     // the conditional writer currently has not memory limits... give it too 
much and it blows out
     // memory.. need to fix this in conditional writer
     // for now this needs to be memory based
     this.semaphore = new Limit(permits);
   }
 
-  private class IterTask implements Callable<Iterator<Result>> {
-
-    private Iterator<Result> input;
-    private int permitsAcquired;
-
-    public IterTask(Iterator<Result> iter, int permitsAcquired) {
-      this.input = iter;
-      this.permitsAcquired = permitsAcquired;
+  public CompletableFuture<Iterator<Result>> 
apply(Collection<ConditionalMutation> input) {
+    if (input.size() == 0) {
+      return 
CompletableFuture.completedFuture(Collections.<Result>emptyList().iterator());
     }
 
-    @Override
-    public Iterator<Result> call() throws Exception {
+    semaphore.acquire(input.size());
+    Iterator<Result> iter = cw.write(input.iterator());
+    return CompletableFuture.supplyAsync(() -> {
       try {
         Builder<Result> imlb = ImmutableList.builder();
-        while (input.hasNext()) {
-          Result result = input.next();
+        while (iter.hasNext()) {
+          Result result = iter.next();
           imlb.add(result);
         }
         return imlb.build().iterator();
       } finally {
-        semaphore.release(permitsAcquired);
+        semaphore.release(input.size());
       }
-    }
-
-  }
-
-  @Override
-  public ListenableFuture<Iterator<Result>> 
apply(Collection<ConditionalMutation> input) {
-    if (input.size() == 0) {
-      return 
Futures.immediateFuture(Collections.<Result>emptyList().iterator());
-    }
-
-    semaphore.acquire(input.size());
-    Iterator<Result> iter = cw.write(input.iterator());
-    return les.submit(new IterTask(iter, input.size()));
+    }, es);
   }
 
   public void close() {
-    les.shutdownNow();
+    es.shutdownNow();
     try {
-      les.awaitTermination(5, TimeUnit.SECONDS);
+      es.awaitTermination(5, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java
index 2b53cbf..d87e9a7 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java
@@ -21,12 +21,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.data.Mutation;
@@ -42,13 +41,15 @@ public class SharedBatchWriter {
 
   private AtomicLong asyncBatchesAdded = new AtomicLong(0);
   private long asyncBatchesProcessed = 0;
+  // added to avoid findbugs false positive
+  private static final Supplier<Void> NULLS = () -> null;
 
   private static class MutationBatch {
 
     private Collection<Mutation> mutations;
     private CountDownLatch cdl;
     private boolean isAsync = false;
-    private ListenableFutureTask<Void> lf;
+    private CompletableFuture<Void> cf;
 
     public MutationBatch(Collection<Mutation> mutations, boolean isAsync) {
       this.mutations = mutations;
@@ -58,9 +59,9 @@ public class SharedBatchWriter {
       }
     }
 
-    public MutationBatch(Collection<Mutation> mutations, 
ListenableFutureTask<Void> lf) {
+    public MutationBatch(Collection<Mutation> mutations, 
CompletableFuture<Void> cf) {
       this.mutations = mutations;
-      this.lf = lf;
+      this.cf = cf;
       this.cdl = null;
       this.isAsync = false;
     }
@@ -70,8 +71,8 @@ public class SharedBatchWriter {
         cdl.countDown();
       }
 
-      if (lf != null) {
-        lf.run();
+      if (cf != null) {
+        cf.complete(NULLS.get());
       }
     }
   }
@@ -170,27 +171,22 @@ public class SharedBatchWriter {
     }
   }
 
-  private static final Runnable DO_NOTHING = new Runnable() {
-    @Override
-    public void run() {}
-  };
-
-  ListenableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) {
+  CompletableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) {
     if (ml.size() == 0) {
-      return Futures.immediateFuture(null);
+      return CompletableFuture.completedFuture(NULLS.get());
     }
 
-    ListenableFutureTask<Void> lf = ListenableFutureTask.create(DO_NOTHING, 
null);
+    CompletableFuture<Void> cf = new CompletableFuture<>();
     try {
-      MutationBatch mb = new MutationBatch(ml, lf);
+      MutationBatch mb = new MutationBatch(ml, cf);
       mutQueue.put(mb);
-      return lf;
+      return cf;
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
-  ListenableFuture<Void> writeMutationsAsyncFuture(Mutation m) {
+  CompletableFuture<Void> writeMutationsAsyncFuture(Mutation m) {
     return writeMutationsAsyncFuture(Collections.singleton(m));
   }
 
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 0abdafe..94e4107 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -26,15 +26,13 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.ConditionalWriter;
@@ -818,33 +816,12 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
     return startTs;
   }
 
-  // async experiment
-
-  private abstract static class CommitCallback<V> implements FutureCallback<V> 
{
-
-    private CommitData cd;
-
-    CommitCallback(CommitData cd) {
-      this.cd = cd;
-    }
-
-    @Override
-    public void onSuccess(V result) {
-      try {
-        onSuccess(cd, result);
-      } catch (Exception e) {
-        cd.commitObserver.failed(e);
-      }
-    }
-
-    protected abstract void onSuccess(CommitData cd, V result) throws 
Exception;
-
-
-    @Override
-    public void onFailure(Throwable t) {
-      cd.commitObserver.failed(t);
-    }
-
+  /**
+   * Funcitonal interface to provide next step of asynchronous commit on 
successful completion of
+   * the previous one
+   */
+  private static interface OnSuccessInterface<V> {
+    public void onSuccess(V result) throws Exception;
   }
 
   private abstract static class SynchronousCommitTask implements Runnable {
@@ -895,6 +872,24 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
     return size;
   }
 
+  private <V> void addCallback(CompletableFuture<V> cfuture, CommitData cd,
+      OnSuccessInterface<V> onSuccessInterface) {
+    cfuture.handleAsync((result, exception) -> {
+      if (exception != null) {
+        cd.commitObserver.failed(exception);
+        return null;
+      } else {
+        try {
+          onSuccessInterface.onSuccess(result);
+          return null;
+        } catch (Exception e) {
+          cd.commitObserver.failed(e);
+          return null;
+        }
+      }
+    }, env.getSharedResources().getAsyncCommitExecutor());
+  }
+
   @Override
   public synchronized void commitAsync(AsyncCommitObserver commitCallback) {
 
@@ -972,13 +967,8 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
     final ConditionalMutation pcm =
         prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, 
isTriggerRow(cd.prow));
 
-    ListenableFuture<Iterator<Result>> future = 
cd.acw.apply(Collections.singletonList(pcm));
-    Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Iterator<Result> result) throws 
Exception {
-        postLockPrimary(cd, pcm, Iterators.getOnlyElement(result));
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    CompletableFuture<Iterator<Result>> cfuture = 
cd.acw.apply(Collections.singletonList(pcm));
+    addCallback(cfuture, cd, result -> postLockPrimary(cd, pcm, 
Iterators.getOnlyElement(result)));
   }
 
   private void postLockPrimary(final CommitData cd, final ConditionalMutation 
pcm, Result result)
@@ -1059,13 +1049,8 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
 
     cd.acceptedRows = new HashSet<>();
 
-    ListenableFuture<Iterator<Result>> future = cd.bacw.apply(mutations);
-    Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Iterator<Result> results) throws 
Exception {
-        postLockOther(cd, results);
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    CompletableFuture<Iterator<Result>> cfuture = cd.bacw.apply(mutations);
+    addCallback(cfuture, cd, results -> postLockOther(cd, results));
   }
 
   private void postLockOther(final CommitData cd, Iterator<Result> results) 
throws Exception {
@@ -1092,13 +1077,8 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
     } else if (stopAfterPreCommit) {
       cd.commitObserver.committed();
     } else {
-      ListenableFuture<Stamp> future = 
env.getSharedResources().getOracleClient().getStampAsync();
-      Futures.addCallback(future, new CommitCallback<Stamp>(cd) {
-        @Override
-        protected void onSuccess(CommitData cd, Stamp stamp) throws Exception {
-          beginSecondCommitPhase(cd, stamp);
-        }
-      }, env.getSharedResources().getAsyncCommitExecutor());
+      CompletableFuture<Stamp> cfuture = 
env.getSharedResources().getOracleClient().getStampAsync();
+      addCallback(cfuture, cd, stamp -> beginSecondCommitPhase(cd, stamp));
     }
   }
 
@@ -1124,14 +1104,9 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
       mutations.add(m);
     }
 
-    ListenableFuture<Void> future =
+    CompletableFuture<Void> cfuture =
         
env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
-    Futures.addCallback(future, new CommitCallback<Void>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Void v) throws Exception {
-        rollbackPrimaryLock(cd);
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    addCallback(cfuture, cd, result -> rollbackPrimaryLock(cd));
   }
 
   private void rollbackPrimaryLock(CommitData cd) throws Exception {
@@ -1143,14 +1118,10 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
         DelLockValue.encodeRollback(startTs, true, true));
     m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);
 
-    ListenableFuture<Void> future =
+    CompletableFuture<Void> cfuture =
         env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(m);
-    Futures.addCallback(future, new CommitCallback<Void>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Void v) throws Exception {
-        cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    addCallback(cfuture, cd,
+        result -> 
cd.commitObserver.commitFailed(cd.getShortCollisionMessage()));
   }
 
   private void beginSecondCommitPhase(CommitData cd, Stamp commitStamp) throws 
Exception {
@@ -1213,14 +1184,9 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
       }
     }
 
-    ListenableFuture<Void> future =
+    CompletableFuture<Void> cfuture =
         
env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations.values());
-    Futures.addCallback(future, new CommitCallback<Void>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Void v) throws Exception {
-        commmitPrimary(cd, commitTs);
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    addCallback(cfuture, cd, result -> commmitPrimary(cd, commitTs));
   }
 
   private void commmitPrimary(CommitData cd, final long commitTs) {
@@ -1237,15 +1203,10 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
     ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), 
isDelete(cd.pval),
         isReadLock(cd.pval), startTs, commitTs, observedColumns, 
delLockMutation);
 
-    ListenableFuture<Iterator<Result>> future =
+    CompletableFuture<Iterator<Result>> cfuture =
         cd.acw.apply(Collections.singletonList(delLockMutation));
-    Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Iterator<Result> result) throws 
Exception {
-        handleUnkownStatsAfterPrimary(cd, commitTs, delLockMutation,
-            Iterators.getOnlyElement(result));
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    addCallback(cfuture, cd, result -> handleUnkownStatsAfterPrimary(cd, 
commitTs, delLockMutation,
+        Iterators.getOnlyElement(result)));
   }
 
   private void handleUnkownStatsAfterPrimary(CommitData cd, final long 
commitTs,
@@ -1321,16 +1282,9 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
       mutations.add(m);
     }
 
-
-    ListenableFuture<Void> future =
+    CompletableFuture<Void> cfuture =
         
env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
-    Futures.addCallback(future, new CommitCallback<Void>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Void v) throws Exception {
-        finishCommit(cd, commitTs);
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
-
+    addCallback(cfuture, cd, result -> finishCommit(cd, commitTs));
   }
 
   @VisibleForTesting
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java 
b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
index 012c4a8..9c11239 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
@@ -19,6 +19,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,8 +28,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Timer;
 import com.codahale.metrics.Timer.Context;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@@ -70,15 +69,10 @@ public class OracleClient implements AutoCloseable {
 
   private Participant currentLeader;
 
-  private static final class TimeRequest implements Callable<Stamp> {
+  private static final class TimeRequest {
     CountDownLatch cdl = new CountDownLatch(1);
     AtomicReference<Stamp> stampRef = new AtomicReference<>();
-    ListenableFutureTask<Stamp> lf = null;
-
-    @Override
-    public Stamp call() throws Exception {
-      return stampRef.get();
-    }
+    CompletableFuture<Stamp> cf = null;
   }
 
   private class TimestampRetriever extends LeaderSelectorListenerAdapter
@@ -211,11 +205,12 @@ public class OracleClient implements AutoCloseable {
 
           for (int i = 0; i < request.size(); i++) {
             TimeRequest tr = request.get(i);
-            tr.stampRef.set(new Stamp(txStampsStart + i, gcStamp));
-            if (tr.lf == null) {
+            Stamp stampRes = new Stamp(txStampsStart + i, gcStamp);
+            tr.stampRef.set(stampRes);
+            if (tr.cf == null) {
               tr.cdl.countDown();
             } else {
-              tr.lf.run();
+              tr.cf.complete(stampRes);
             }
           }
         } catch (InterruptedException e) {
@@ -386,18 +381,18 @@ public class OracleClient implements AutoCloseable {
     return tr.stampRef.get();
   }
 
-  public ListenableFuture<Stamp> getStampAsync() {
+  public CompletableFuture<Stamp> getStampAsync() {
     checkClosed();
 
     TimeRequest tr = new TimeRequest();
-    ListenableFutureTask<Stamp> lf = ListenableFutureTask.create(tr);
-    tr.lf = lf;
+    CompletableFuture<Stamp> cf = new CompletableFuture<>();
+    tr.cf = cf;
     try {
       queue.put(tr);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
-    return lf;
+    return cf;
   }
 
   /**

-- 
To stop receiving notification emails like this one, please contact
['"commits@fluo.apache.org" <commits@fluo.apache.org>'].

Reply via email to