This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git

commit 73ca708c27c4b55991b5c930a0678fa236c2983b
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Dec 15 18:25:16 2017 -0500

    Experimenting with high level design for #978 (#1001)
---
 .../org/apache/fluo/core/impl/TransactionImpl.java | 327 ++++++++++++++++++++-
 1 file changed, 320 insertions(+), 7 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java 
b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 94e4107..2d9e2cc 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -27,10 +27,12 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -890,6 +892,313 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
     }, env.getSharedResources().getAsyncCommitExecutor());
   }
 
+  // TODO exception handling!!!!  How?????
+  abstract class CommitStep {
+    private CommitStep nextStep;
+
+    // the boolean indicates if the operation was successful.
+    abstract CompletableFuture<Boolean> getMainOp(CommitData cd);
+
+    // create and run this op in the event that the main op was a failure
+    abstract CompletableFuture<Void> getFailureOp(CommitData cd);
+
+    // set the next step to run if this step is successful
+    CommitStep andThen(CommitStep next) {
+      this.nextStep = next;
+      return next;
+    }
+
+
+    CompletableFuture<Void> compose(CommitData cd) {
+      return getMainOp(cd).thenComposeAsync(successful -> {
+        if (successful) {
+          if (nextStep != null) {
+            return nextStep.compose(cd);
+          } else {
+            return CompletableFuture.completedFuture(null);
+          }
+        } else {
+          return getFailureOp(cd);
+        }
+      }, env.getSharedResources().getAsyncCommitExecutor());
+    }
+
+  }
+
+  abstract class ConditionalStep extends CommitStep {
+
+    CommitData cd;
+
+    public abstract Collection<ConditionalMutation> createMutations(CommitData 
cd);
+
+    public abstract Iterator<Result> handleUnknown(CommitData cd, 
Iterator<Result> results);
+
+    public abstract boolean processResults(CommitData cd, Iterator<Result> 
results);
+
+    public AsyncConditionalWriter getACW(CommitData cd) {
+      return cd.acw;
+    }
+
+    @Override
+    CompletableFuture<Boolean> getMainOp(CommitData cd) {
+      // TODO not sure threading is correct
+      // TODO handle unknown
+      Executor ace = env.getSharedResources().getAsyncCommitExecutor();
+      return getACW(cd).apply(createMutations(cd)).thenCompose(results -> {
+        // ugh icky that this is an iterator, forces copy to inspect.. could 
refactor async CW to
+        // return collection
+        ArrayList<Result> resultsList = new ArrayList<>();
+        Iterators.addAll(resultsList, results);
+        boolean containsUknown = false;
+        for (Result result : resultsList) {
+          containsUknown |= result.getStatus() == Status.UNKNOWN;
+        }
+
+        if (containsUknown) {
+          // process unknown in sync executor
+          Executor se = env.getSharedResources().getSyncCommitExecutor();
+          return CompletableFuture.supplyAsync(() -> handleUnknown(cd, 
resultsList.iterator()), se);
+        } else {
+          return CompletableFuture.completedFuture(resultsList.iterator());
+        }
+      }).thenApplyAsync(results -> processResults(cd, results), ace);
+    }
+
+
+  }
+
+  class LockPrimaryStep extends ConditionalStep {
+
+    @Override
+    public Collection<ConditionalMutation> createMutations(CommitData cd) {
+      return Collections
+          .singleton(prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, 
isTriggerRow(cd.prow)));
+    }
+
+    @Override
+    public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> 
results) {
+
+      Result result = Iterators.getOnlyElement(results);
+      Status mutationStatus = result.getStatus();
+      // TODO convert this code to async
+      while (mutationStatus == Status.UNKNOWN) {
+        TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, 
startTs);
+
+        switch (txInfo.status) {
+          case LOCKED:
+            return Collections.singleton(new Result(Status.ACCEPTED, 
result.getMutation(), result.getTabletServer())).iterator();
+          case ROLLED_BACK:
+            return Collections.singleton(new Result(Status.REJECTED, 
result.getMutation(), result.getTabletServer())).iterator();
+          case UNKNOWN:
+            // TODO async
+            Result newResult = cd.cw.write(result.getMutation());
+            mutationStatus = newResult.getStatus();
+            if(mutationStatus != Status.UNKNOWN) {
+              return Collections.singleton(newResult).iterator();
+            }
+            // TODO handle case were data other tx has lock
+            break;
+          case COMMITTED:
+          default:
+            throw new IllegalStateException(
+                "unexpected tx state " + txInfo.status + " " + cd.prow + " " + 
cd.pcol);
+
+        }
+      }
+
+      //TODO
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean processResults(CommitData cd, Iterator<Result> results) {
+      Result result = Iterators.getOnlyElement(results);
+      return result.getStatus() == Status.ACCEPTED;
+    }
+
+    @Override
+    CompletableFuture<Void> getFailureOp(CommitData cd) {
+      //TODO can this be simplified by pushing some code to the superclass?
+      return CompletableFuture.supplyAsync(() -> {
+        ConditionalMutation pcm = 
Iterables.getOnlyElement(createMutations(cd));
+
+        cd.addPrimaryToRejected();
+        getStats().setRejected(cd.getRejected());
+        // TODO do async
+        checkForOrphanedLocks(cd);
+        if (checkForAckCollision(pcm)) {
+          cd.commitObserver.alreadyAcknowledged();
+        } else {
+          cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
+        }
+
+        return null;
+      }, env.getSharedResources().getSyncCommitExecutor());
+    }
+
+  }
+
+  class LockOtherStep extends ConditionalStep {
+
+    @Override
+    public Collection<ConditionalMutation> createMutations(CommitData cd) {
+
+      ArrayList<ConditionalMutation> mutations = new ArrayList<>();
+
+      for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
+        ConditionalFlutation cm = null;
+
+        for (Entry<Column, Bytes> colUpdates : 
rowUpdates.getValue().entrySet()) {
+          if (cm == null) {
+            cm = prewrite(rowUpdates.getKey(), colUpdates.getKey(), 
colUpdates.getValue(), cd.prow,
+                cd.pcol, false);
+          } else {
+            prewrite(cm, colUpdates.getKey(), colUpdates.getValue(), cd.prow, 
cd.pcol, false);
+          }
+        }
+
+        mutations.add(cm);
+      }
+
+      cd.acceptedRows = new HashSet<>();
+
+      return mutations;
+    }
+
+    @Override
+    public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> 
results) {
+      // TODO this step does not currently handle unknown
+      return results;
+    }
+
+    @Override
+    public boolean processResults(CommitData cd, Iterator<Result> results) {
+
+      while (results.hasNext()) {
+        Result result = results.next();
+        // TODO handle unknown?
+        Bytes row = Bytes.of(result.getMutation().getRow());
+        if (result.getStatus() == Status.ACCEPTED) {
+          cd.acceptedRows.add(row);
+        } else {
+          cd.addToRejected(row, updates.get(row).keySet());
+        }
+      }
+
+      return cd.getRejected().size() == 0;
+    }
+
+    @Override
+    CompletableFuture<Void> getFailureOp(CommitData cd) {
+      return CompletableFuture.supplyAsync(() -> {
+        getStats().setRejected(cd.getRejected());
+        checkForOrphanedLocks(cd);
+        return null;
+      }, env.getSharedResources().getSyncCommitExecutor()).thenCompose(v -> 
rollbackLocks(cd));
+
+
+    }
+
+  }
+
+  abstract class BatchWriterStep extends CommitStep {
+    public abstract Collection<Mutation> createMutations(CommitData cd);
+
+    @Override
+    CompletableFuture<Boolean> getMainOp(CommitData cd) {
+      return env.getSharedResources().getBatchWriter()
+          .writeMutationsAsyncFuture(createMutations(cd)).thenApply(v -> true);
+    }
+
+    @Override
+    CompletableFuture<Void> getFailureOp(CommitData cd) {
+      throw new IllegalStateException("Failure not expected");
+    }
+  }
+
+
+
+  private CompletableFuture<Void> rollbackLocks(CommitData cd) {
+    // TODO
+    return null;
+  }
+
+  class GetCommitStampStep extends CommitStep {
+
+    @Override
+    CompletableFuture<Boolean> getMainOp(CommitData cd) {
+      // TODO Auto-generated method stub
+      // TODO set commitTs on commit data
+      return null;
+    }
+
+    @Override
+    CompletableFuture<Void> getFailureOp(CommitData cd) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+  }
+
+  class WriteNotificationsStep extends BatchWriterStep {
+
+    @Override
+    public Collection<Mutation> createMutations(CommitData cd) {
+      HashMap<Bytes, Mutation> mutations = new HashMap<>();
+      // TODO copy code from writeNotificationsAsync()
+      return mutations.values();
+    }
+
+  }
+
+  class CommitPrimaryStep extends ConditionalStep {
+
+    @Override
+    public Collection<ConditionalMutation> createMutations(CommitData cd) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> 
results) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public boolean processResults(CommitData cd, Iterator<Result> results) {
+      // TODO Auto-generated method stub
+      return false;
+    }
+
+    @Override
+    CompletableFuture<Void> getFailureOp(CommitData cd) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+  }
+
+  class DeleteLocksStep extends BatchWriterStep {
+
+    @Override
+    public Collection<Mutation> createMutations(CommitData cd) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+  }
+
+  class FinishCommitStep extends BatchWriterStep {
+
+    @Override
+    public Collection<Mutation> createMutations(CommitData cd) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+  }
+
   @Override
   public synchronized void commitAsync(AsyncCommitObserver commitCallback) {
 
@@ -963,12 +1272,16 @@ public class TransactionImpl extends 
AbstractTransactionBase implements AsyncTra
 
     cd.commitObserver = commitCallback;
 
-    // try to lock primary column
-    final ConditionalMutation pcm =
-        prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, 
isTriggerRow(cd.prow));
+    CommitStep firstStep = new LockPrimaryStep();
+
+    firstStep.andThen(new LockOtherStep())
+        .andThen(new GetCommitStampStep())
+        .andThen(new WriteNotificationsStep())
+        .andThen(new CommitPrimaryStep())
+        .andThen(new DeleteLocksStep())
+        .andThen(new FinishCommitStep());
 
-    CompletableFuture<Iterator<Result>> cfuture = 
cd.acw.apply(Collections.singletonList(pcm));
-    addCallback(cfuture, cd, result -> postLockPrimary(cd, pcm, 
Iterators.getOnlyElement(result)));
+    firstStep.compose(cd);
   }
 
   private void postLockPrimary(final CommitData cd, final ConditionalMutation 
pcm, Result result)

-- 
To stop receiving notification emails like this one, please contact
ktur...@apache.org.

Reply via email to