keith-turner commented on a change in pull request #1001: [WIP] - Issue 978
URL: https://github.com/apache/fluo/pull/1001#discussion_r162492873
 
 

 ##########
 File path: 
modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
 ##########
 @@ -872,447 +846,694 @@ public int getSize() {
     return size;
   }
 
-  private <V> void addCallback(CompletableFuture<V> cfuture, CommitData cd,
-      OnSuccessInterface<V> onSuccessInterface) {
-    cfuture.handleAsync((result, exception) -> {
-      if (exception != null) {
-        cd.commitObserver.failed(exception);
+  // TODO exception handling!!!! How?????
+  abstract class CommitStep {
+    private CommitStep nextStep;
+
+    // the boolean indicates if the operation was successful.
+    abstract CompletableFuture<Boolean> getMainOp(CommitData cd);
+
+    // create and run this op in the event that the main op was a failure
+    abstract CompletableFuture<Void> getFailureOp(CommitData cd);
+
+    // set the next step to run if this step is successful
+    CommitStep andThen(CommitStep next) {
+      this.nextStep = next;
+      return next;
+    }
+
+
+    CompletableFuture<Void> compose(CommitData cd) {
+      try {
+        return getMainOp(cd).thenComposeAsync(successful -> {
+          if (successful) {
+            if (nextStep != null) {
+              return nextStep.compose(cd);
+            } else {
+              return CompletableFuture.completedFuture(null);
+            }
+          } else {
+            return getFailureOp(cd);
+          }
+        }, env.getSharedResources().getAsyncCommitExecutor());
+      } catch (Exception e) {
+        cd.commitObserver.failed(e);
         return null;
-      } else {
+      }
+    }
+
+  }
+
+  abstract class ConditionalStep extends CommitStep {
+
+    CommitData cd;
+
+    public abstract Collection<ConditionalMutation> createMutations(CommitData 
cd);
+
+    public abstract Iterator<Result> handleUnknown(CommitData cd, 
Iterator<Result> results)
+        throws Exception;
+
+    public abstract boolean processResults(CommitData cd, Iterator<Result> 
results)
+        throws Exception;
+
+    public AsyncConditionalWriter getACW(CommitData cd) {
+      return cd.acw;
+    }
+
+    @Override
+    CompletableFuture<Boolean> getMainOp(CommitData cd) {
+      // TODO not sure threading is correct
+      Executor ace = env.getSharedResources().getAsyncCommitExecutor();
+      return getACW(cd).apply(createMutations(cd)).thenCompose(results -> {
+        // ugh icky that this is an iterator, forces copy to inspect.. could 
refactor async CW to
+        // return collection
+        ArrayList<Result> resultsList = new ArrayList<>();
+        Iterators.addAll(resultsList, results);
+        boolean containsUknown = false;
+        for (Result result : resultsList) {
+          try {
+            containsUknown |= result.getStatus() == Status.UNKNOWN;
+          } catch (Exception e) {
+            cd.commitObserver.failed(e);
+          }
+        }
+        if (containsUknown) {
+          // process unknown in sync executor
+          Executor se = env.getSharedResources().getSyncCommitExecutor();
+          return CompletableFuture.supplyAsync(() -> {
+            try {
+              return handleUnknown(cd, resultsList.iterator());
+            } catch (Exception e) {
+              cd.commitObserver.failed(e);
 
 Review comment:
   I have been trying to wrap my head around exception handling with 
completable futures.  I did some local testing with something like the 
following.
   
   ```java
     CompletableFuture<Foo> cf;
     cf.thenCompose(f1).thenApply(f2).thenApply(f3).exceptionally(f4)
   ```
   
   and found that if `f1`, `f2`, or `f3` throws an exception that `f4` is 
always called.  Also if `f1` throws an exception then `f2` and `f3` are skipped 
and `f4` is called. I also found that java has something called a 
CompletionException.  So I think given this that all operations should catch 
checked exceptions and throw CompletionException.  Also, exceptionally should 
be called at the end of the top level completable future.  Below is a commit 
where I was experimenting with these ideas.  I made a few changes to illustrate 
what I am thinking.
   
   
https://github.com/keith-turner/fluo/commit/41ed54d05b1e8124b3e9232bc58ea988742cdb3c

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to