This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/fluo.git
commit 73ca708c27c4b55991b5c930a0678fa236c2983b Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Dec 15 18:25:16 2017 -0500 Experimenting with high level design for #978 (#1001) --- .../org/apache/fluo/core/impl/TransactionImpl.java | 327 ++++++++++++++++++++- 1 file changed, 320 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java index 94e4107..2d9e2cc 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -27,10 +27,12 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import org.apache.accumulo.core.client.AccumuloException; @@ -890,6 +892,313 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra }, env.getSharedResources().getAsyncCommitExecutor()); } + // TODO exception handling!!!! How????? + abstract class CommitStep { + private CommitStep nextStep; + + // the boolean indicates if the operation was successful. + abstract CompletableFuture<Boolean> getMainOp(CommitData cd); + + // create and run this op in the event that the main op was a failure + abstract CompletableFuture<Void> getFailureOp(CommitData cd); + + // set the next step to run if this step is successful + CommitStep andThen(CommitStep next) { + this.nextStep = next; + return next; + } + + + CompletableFuture<Void> compose(CommitData cd) { + return getMainOp(cd).thenComposeAsync(successful -> { + if (successful) { + if (nextStep != null) { + return nextStep.compose(cd); + } else { + return CompletableFuture.completedFuture(null); + } + } else { + return getFailureOp(cd); + } + }, env.getSharedResources().getAsyncCommitExecutor()); + } + + } + + abstract class ConditionalStep extends CommitStep { + + CommitData cd; + + public abstract Collection<ConditionalMutation> createMutations(CommitData cd); + + public abstract Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results); + + public abstract boolean processResults(CommitData cd, Iterator<Result> results); + + public AsyncConditionalWriter getACW(CommitData cd) { + return cd.acw; + } + + @Override + CompletableFuture<Boolean> getMainOp(CommitData cd) { + // TODO not sure threading is correct + // TODO handle unknown + Executor ace = env.getSharedResources().getAsyncCommitExecutor(); + return getACW(cd).apply(createMutations(cd)).thenCompose(results -> { + // ugh icky that this is an iterator, forces copy to inspect.. could refactor async CW to + // return collection + ArrayList<Result> resultsList = new ArrayList<>(); + Iterators.addAll(resultsList, results); + boolean containsUknown = false; + for (Result result : resultsList) { + containsUknown |= result.getStatus() == Status.UNKNOWN; + } + + if (containsUknown) { + // process unknown in sync executor + Executor se = env.getSharedResources().getSyncCommitExecutor(); + return CompletableFuture.supplyAsync(() -> handleUnknown(cd, resultsList.iterator()), se); + } else { + return CompletableFuture.completedFuture(resultsList.iterator()); + } + }).thenApplyAsync(results -> processResults(cd, results), ace); + } + + + } + + class LockPrimaryStep extends ConditionalStep { + + @Override + public Collection<ConditionalMutation> createMutations(CommitData cd) { + return Collections + .singleton(prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, isTriggerRow(cd.prow))); + } + + @Override + public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results) { + + Result result = Iterators.getOnlyElement(results); + Status mutationStatus = result.getStatus(); + // TODO convert this code to async + while (mutationStatus == Status.UNKNOWN) { + TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs); + + switch (txInfo.status) { + case LOCKED: + return Collections.singleton(new Result(Status.ACCEPTED, result.getMutation(), result.getTabletServer())).iterator(); + case ROLLED_BACK: + return Collections.singleton(new Result(Status.REJECTED, result.getMutation(), result.getTabletServer())).iterator(); + case UNKNOWN: + // TODO async + Result newResult = cd.cw.write(result.getMutation()); + mutationStatus = newResult.getStatus(); + if(mutationStatus != Status.UNKNOWN) { + return Collections.singleton(newResult).iterator(); + } + // TODO handle case were data other tx has lock + break; + case COMMITTED: + default: + throw new IllegalStateException( + "unexpected tx state " + txInfo.status + " " + cd.prow + " " + cd.pcol); + + } + } + + //TODO + throw new IllegalStateException(); + } + + @Override + public boolean processResults(CommitData cd, Iterator<Result> results) { + Result result = Iterators.getOnlyElement(results); + return result.getStatus() == Status.ACCEPTED; + } + + @Override + CompletableFuture<Void> getFailureOp(CommitData cd) { + //TODO can this be simplified by pushing some code to the superclass? + return CompletableFuture.supplyAsync(() -> { + ConditionalMutation pcm = Iterables.getOnlyElement(createMutations(cd)); + + cd.addPrimaryToRejected(); + getStats().setRejected(cd.getRejected()); + // TODO do async + checkForOrphanedLocks(cd); + if (checkForAckCollision(pcm)) { + cd.commitObserver.alreadyAcknowledged(); + } else { + cd.commitObserver.commitFailed(cd.getShortCollisionMessage()); + } + + return null; + }, env.getSharedResources().getSyncCommitExecutor()); + } + + } + + class LockOtherStep extends ConditionalStep { + + @Override + public Collection<ConditionalMutation> createMutations(CommitData cd) { + + ArrayList<ConditionalMutation> mutations = new ArrayList<>(); + + for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) { + ConditionalFlutation cm = null; + + for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) { + if (cm == null) { + cm = prewrite(rowUpdates.getKey(), colUpdates.getKey(), colUpdates.getValue(), cd.prow, + cd.pcol, false); + } else { + prewrite(cm, colUpdates.getKey(), colUpdates.getValue(), cd.prow, cd.pcol, false); + } + } + + mutations.add(cm); + } + + cd.acceptedRows = new HashSet<>(); + + return mutations; + } + + @Override + public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results) { + // TODO this step does not currently handle unknown + return results; + } + + @Override + public boolean processResults(CommitData cd, Iterator<Result> results) { + + while (results.hasNext()) { + Result result = results.next(); + // TODO handle unknown? + Bytes row = Bytes.of(result.getMutation().getRow()); + if (result.getStatus() == Status.ACCEPTED) { + cd.acceptedRows.add(row); + } else { + cd.addToRejected(row, updates.get(row).keySet()); + } + } + + return cd.getRejected().size() == 0; + } + + @Override + CompletableFuture<Void> getFailureOp(CommitData cd) { + return CompletableFuture.supplyAsync(() -> { + getStats().setRejected(cd.getRejected()); + checkForOrphanedLocks(cd); + return null; + }, env.getSharedResources().getSyncCommitExecutor()).thenCompose(v -> rollbackLocks(cd)); + + + } + + } + + abstract class BatchWriterStep extends CommitStep { + public abstract Collection<Mutation> createMutations(CommitData cd); + + @Override + CompletableFuture<Boolean> getMainOp(CommitData cd) { + return env.getSharedResources().getBatchWriter() + .writeMutationsAsyncFuture(createMutations(cd)).thenApply(v -> true); + } + + @Override + CompletableFuture<Void> getFailureOp(CommitData cd) { + throw new IllegalStateException("Failure not expected"); + } + } + + + + private CompletableFuture<Void> rollbackLocks(CommitData cd) { + // TODO + return null; + } + + class GetCommitStampStep extends CommitStep { + + @Override + CompletableFuture<Boolean> getMainOp(CommitData cd) { + // TODO Auto-generated method stub + // TODO set commitTs on commit data + return null; + } + + @Override + CompletableFuture<Void> getFailureOp(CommitData cd) { + // TODO Auto-generated method stub + return null; + } + + } + + class WriteNotificationsStep extends BatchWriterStep { + + @Override + public Collection<Mutation> createMutations(CommitData cd) { + HashMap<Bytes, Mutation> mutations = new HashMap<>(); + // TODO copy code from writeNotificationsAsync() + return mutations.values(); + } + + } + + class CommitPrimaryStep extends ConditionalStep { + + @Override + public Collection<ConditionalMutation> createMutations(CommitData cd) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results) { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean processResults(CommitData cd, Iterator<Result> results) { + // TODO Auto-generated method stub + return false; + } + + @Override + CompletableFuture<Void> getFailureOp(CommitData cd) { + // TODO Auto-generated method stub + return null; + } + + } + + class DeleteLocksStep extends BatchWriterStep { + + @Override + public Collection<Mutation> createMutations(CommitData cd) { + // TODO Auto-generated method stub + return null; + } + + } + + class FinishCommitStep extends BatchWriterStep { + + @Override + public Collection<Mutation> createMutations(CommitData cd) { + // TODO Auto-generated method stub + return null; + } + + } + @Override public synchronized void commitAsync(AsyncCommitObserver commitCallback) { @@ -963,12 +1272,16 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra cd.commitObserver = commitCallback; - // try to lock primary column - final ConditionalMutation pcm = - prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, isTriggerRow(cd.prow)); + CommitStep firstStep = new LockPrimaryStep(); + + firstStep.andThen(new LockOtherStep()) + .andThen(new GetCommitStampStep()) + .andThen(new WriteNotificationsStep()) + .andThen(new CommitPrimaryStep()) + .andThen(new DeleteLocksStep()) + .andThen(new FinishCommitStep()); - CompletableFuture<Iterator<Result>> cfuture = cd.acw.apply(Collections.singletonList(pcm)); - addCallback(cfuture, cd, result -> postLockPrimary(cd, pcm, Iterators.getOnlyElement(result))); + firstStep.compose(cd); } private void postLockPrimary(final CommitData cd, final ConditionalMutation pcm, Result result) -- To stop receiving notification emails like this one, please contact ktur...@apache.org.