keith-turner commented on a change in pull request #1001: [WIP] - Issue 978
URL: https://github.com/apache/fluo/pull/1001#discussion_r164602206
##########
File path:
modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
##########
@@ -872,447 +826,688 @@ public int getSize() {
return size;
}
- private <V> void addCallback(CompletableFuture<V> cfuture, CommitData cd,
- OnSuccessInterface<V> onSuccessInterface) {
- cfuture.handleAsync((result, exception) -> {
- if (exception != null) {
- cd.commitObserver.failed(exception);
- return null;
- } else {
- try {
- onSuccessInterface.onSuccess(result);
- return null;
- } catch (Exception e) {
- cd.commitObserver.failed(e);
- return null;
- }
- }
- }, env.getSharedResources().getAsyncCommitExecutor());
- }
+ abstract class CommitStep {
+ private CommitStep nextStep;
- @Override
- public synchronized void commitAsync(AsyncCommitObserver commitCallback) {
+ // the boolean indicates if the operation was successful.
+ abstract CompletableFuture<Boolean> getMainOp(CommitData cd);
- checkIfOpen();
- status = TxStatus.COMMIT_STARTED;
- commitAttempted = true;
+ // create and run this op in the event that the main op was a failure
+ abstract CompletableFuture<Void> getFailureOp(CommitData cd);
- try {
- CommitData cd = createCommitData();
- beginCommitAsync(cd, commitCallback, null);
- } catch (Exception e) {
- e.printStackTrace();
- commitCallback.failed(e);
+ // set the next step to run if this step is successful
+ CommitStep andThen(CommitStep next) {
+ this.nextStep = next;
+ return next;
+ }
+
+
+ CompletableFuture<Void> compose(CommitData cd) {
+ return getMainOp(cd).thenComposeAsync(successful -> {
+ if (successful) {
+ if (nextStep != null) {
+ return nextStep.compose(cd);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ } else {
+ return getFailureOp(cd);
+ }
+ }, env.getSharedResources().getAsyncCommitExecutor());
}
+
}
- private void beginCommitAsync(CommitData cd, AsyncCommitObserver
commitCallback,
- RowColumn primary) {
+ abstract class ConditionalStep extends CommitStep {
- if (updates.size() == 0) {
- // TODO do async
- deleteWeakRow();
- commitCallback.committed();
- return;
+ public abstract Collection<ConditionalMutation> createMutations(CommitData
cd);
+
+ public abstract Iterator<Result> handleUnknown(CommitData cd,
Iterator<Result> results)
+ throws Exception;
+
+ public abstract boolean processResults(CommitData cd, Iterator<Result>
results)
+ throws Exception;
+
+ public AsyncConditionalWriter getACW(CommitData cd) {
+ return cd.acw;
}
- for (Map<Column, Bytes> cols : updates.values()) {
- stats.incrementEntriesSet(cols.size());
+ @Override
+ CompletableFuture<Boolean> getMainOp(CommitData cd) {
+ // TODO not sure threading is correct
+ Executor ace = env.getSharedResources().getAsyncCommitExecutor();
+ return getACW(cd).apply(createMutations(cd)).thenCompose(results -> {
+ // ugh icky that this is an iterator, forces copy to inspect.. could
refactor async CW to
+ // return collection
+ ArrayList<Result> resultsList = new ArrayList<>();
+ Iterators.addAll(resultsList, results);
+ boolean containsUknown = false;
+ for (Result result : resultsList) {
+ try {
+ containsUknown |= result.getStatus() == Status.UNKNOWN;
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ }
+ if (containsUknown) {
+ // process unknown in sync executor
+ Executor se = env.getSharedResources().getSyncCommitExecutor();
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ return handleUnknown(cd, resultsList.iterator());
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ }, se);
+ } else {
+ return CompletableFuture.completedFuture(resultsList.iterator());
+ }
+ }).thenApplyAsync(results -> {
+ try {
+ return processResults(cd, results);
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ }, ace);
}
- Bytes primRow = null;
- Column primCol = null;
- if (primary != null) {
- primRow = primary.getRow();
- primCol = primary.getColumn();
- if (notification != null &&
!primary.equals(notification.getRowColumn())) {
- throw new IllegalArgumentException("Primary must be notification");
- }
- } else if (notification != null) {
- primRow = notification.getRow();
- primCol = notification.getColumn();
- } else {
+ }
+
+ class LockPrimaryStep extends ConditionalStep {
+
+ @Override
+ public Collection<ConditionalMutation> createMutations(CommitData cd) {
+ return Collections
+ .singleton(prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol,
isTriggerRow(cd.prow)));
+ }
+
+ @Override
+ public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result>
results)
+ throws Exception {
+
+ Result result = Iterators.getOnlyElement(results);
+ Status mutationStatus = result.getStatus();
+ // TODO convert this code to async
+ while (mutationStatus == Status.UNKNOWN) {
+ TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol,
startTs);
+
+ switch (txInfo.status) {
+ case LOCKED:
+ return Collections
+ .singleton(
+ new Result(Status.ACCEPTED, result.getMutation(),
result.getTabletServer()))
+ .iterator();
+ case ROLLED_BACK:
+ return Collections
+ .singleton(
+ new Result(Status.REJECTED, result.getMutation(),
result.getTabletServer()))
+ .iterator();
+ case UNKNOWN:
+ // TODO async
+ Result newResult = cd.cw.write(result.getMutation());
+ mutationStatus = newResult.getStatus();
+ if (mutationStatus != Status.UNKNOWN) {
+ return Collections.singleton(newResult).iterator();
+ }
+ // TODO handle case were data other tx has lock
+ break;
+ case COMMITTED:
+ default:
+ throw new IllegalStateException(
+ "unexpected tx state " + txInfo.status + " " + cd.prow + " " +
cd.pcol);
- outer: for (Entry<Bytes, Map<Column, Bytes>> entry : updates.entrySet())
{
- for (Entry<Column, Bytes> entry2 : entry.getValue().entrySet()) {
- if (!isReadLock(entry2.getValue())) {
- primRow = entry.getKey();
- primCol = entry2.getKey();
- break outer;
- }
}
}
- if (primRow == null) {
- // there are only read locks, so nothing to write
- deleteWeakRow();
- commitCallback.committed();
- return;
- }
+ // TODO
+ throw new IllegalStateException();
}
- // get a primary column
- cd.prow = primRow;
- Map<Column, Bytes> colSet = updates.get(cd.prow);
- cd.pcol = primCol;
- cd.pval = colSet.remove(primCol);
- if (colSet.size() == 0) {
- updates.remove(cd.prow);
+ @Override
+ public boolean processResults(CommitData cd, Iterator<Result> results)
throws Exception {
+ Result result = Iterators.getOnlyElement(results);
+ return result.getStatus() == Status.ACCEPTED;
}
- cd.commitObserver = commitCallback;
+ @Override
+ CompletableFuture<Void> getFailureOp(CommitData cd) {
+ // TODO can this be simplified by pushing some code to the superclass?
+ return CompletableFuture.supplyAsync(() -> {
+ final ConditionalMutation pcm =
Iterables.getOnlyElement(createMutations(cd));
+
+ cd.addPrimaryToRejected();
+ getStats().setRejected(cd.getRejected());
+ // TODO do async
+ try {
+ checkForOrphanedLocks(cd);
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ if (checkForAckCollision(pcm)) {
+ cd.commitObserver.alreadyAcknowledged();
+ } else {
+ cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
+ }
- // try to lock primary column
- final ConditionalMutation pcm =
- prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol,
isTriggerRow(cd.prow));
+ return null;
+ }, env.getSharedResources().getSyncCommitExecutor());
+ }
- CompletableFuture<Iterator<Result>> cfuture =
cd.acw.apply(Collections.singletonList(pcm));
- addCallback(cfuture, cd, result -> postLockPrimary(cd, pcm,
Iterators.getOnlyElement(result)));
}
- private void postLockPrimary(final CommitData cd, final ConditionalMutation
pcm, Result result)
- throws Exception {
- final Status mutationStatus = result.getStatus();
+ class LockOtherStep extends ConditionalStep {
- if (mutationStatus == Status.ACCEPTED) {
- lockOtherColumns(cd);
- } else {
- env.getSharedResources().getSyncCommitExecutor().execute(new
SynchronousCommitTask(cd) {
- @Override
- protected void runCommitStep(CommitData cd) throws Exception {
- synchronousPostLockPrimary(cd, pcm, mutationStatus);
- }
- });
+ @Override
+ public AsyncConditionalWriter getACW(CommitData cd) {
+ return cd.bacw;
}
- }
- private void synchronousPostLockPrimary(CommitData cd, ConditionalMutation
pcm,
- Status mutationStatus) throws AccumuloException,
AccumuloSecurityException, Exception {
- // TODO convert this code to async
- while (mutationStatus == Status.UNKNOWN) {
- TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol,
startTs);
-
- switch (txInfo.status) {
- case LOCKED:
- mutationStatus = Status.ACCEPTED;
- break;
- case ROLLED_BACK:
- mutationStatus = Status.REJECTED;
- break;
- case UNKNOWN:
- // TODO async
- mutationStatus = cd.cw.write(pcm).getStatus();
- // TODO handle case were data other tx has lock
- break;
- case COMMITTED:
- default:
- throw new IllegalStateException(
- "unexpected tx state " + txInfo.status + " " + cd.prow + " " +
cd.pcol);
- }
- }
+ @Override
+ public Collection<ConditionalMutation> createMutations(CommitData cd) {
- if (mutationStatus != Status.ACCEPTED) {
- cd.addPrimaryToRejected();
- getStats().setRejected(cd.getRejected());
- // TODO do async
- checkForOrphanedLocks(cd);
- if (checkForAckCollision(pcm)) {
- cd.commitObserver.alreadyAcknowledged();
- } else {
- cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
+ ArrayList<ConditionalMutation> mutations = new ArrayList<>();
+
+ for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
+ ConditionalFlutation cm = null;
+
+ for (Entry<Column, Bytes> colUpdates :
rowUpdates.getValue().entrySet()) {
+ if (cm == null) {
+ cm = prewrite(rowUpdates.getKey(), colUpdates.getKey(),
colUpdates.getValue(), cd.prow,
+ cd.pcol, false);
+ } else {
+ prewrite(cm, colUpdates.getKey(), colUpdates.getValue(), cd.prow,
cd.pcol, false);
+ }
+ }
+
+ mutations.add(cm);
}
- return;
- }
- lockOtherColumns(cd);
- }
+ cd.acceptedRows = new HashSet<>();
- private void lockOtherColumns(CommitData cd) {
- ArrayList<ConditionalMutation> mutations = new ArrayList<>();
+ return mutations;
+ }
- for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
- ConditionalFlutation cm = null;
+ @Override
+ public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result>
results) {
+ // TODO this step does not currently handle unknown
+ return results;
+ }
- for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet())
{
- if (cm == null) {
- cm = prewrite(rowUpdates.getKey(), colUpdates.getKey(),
colUpdates.getValue(), cd.prow,
- cd.pcol, false);
+ @Override
+ public boolean processResults(CommitData cd, Iterator<Result> results)
throws Exception {
+
+ while (results.hasNext()) {
+ Result result = results.next();
+ // TODO handle unknown?
+ Bytes row = Bytes.of(result.getMutation().getRow());
+ if (result.getStatus() == Status.ACCEPTED) {
+ cd.acceptedRows.add(row);
} else {
- prewrite(cm, colUpdates.getKey(), colUpdates.getValue(), cd.prow,
cd.pcol, false);
+ cd.addToRejected(row, updates.get(row).keySet());
}
}
- mutations.add(cm);
+ return cd.getRejected().size() == 0;
}
- cd.acceptedRows = new HashSet<>();
-
- CompletableFuture<Iterator<Result>> cfuture = cd.bacw.apply(mutations);
- addCallback(cfuture, cd, results -> postLockOther(cd, results));
+ @Override
+ CompletableFuture<Void> getFailureOp(CommitData cd) {
+ return CompletableFuture.supplyAsync(() -> {
+ getStats().setRejected(cd.getRejected());
+ try {
+ // Does this need to be async?
+ checkForOrphanedLocks(cd);
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ return null;
+ }, env.getSharedResources().getSyncCommitExecutor()).thenCompose(v ->
rollbackLocks(cd));
+ }
}
- private void postLockOther(final CommitData cd, Iterator<Result> results)
throws Exception {
- while (results.hasNext()) {
- Result result = results.next();
- // TODO handle unknown?
- Bytes row = Bytes.of(result.getMutation().getRow());
- if (result.getStatus() == Status.ACCEPTED) {
- cd.acceptedRows.add(row);
- } else {
- cd.addToRejected(row, updates.get(row).keySet());
- }
+ abstract class BatchWriterStep extends CommitStep {
+ public abstract Collection<Mutation> createMutations(CommitData cd);
+
+ @Override
+ CompletableFuture<Boolean> getMainOp(CommitData cd) {
+ return env.getSharedResources().getBatchWriter()
+ .writeMutationsAsyncFuture(createMutations(cd)).thenApply(v -> true);
}
- if (cd.getRejected().size() > 0) {
- getStats().setRejected(cd.getRejected());
- env.getSharedResources().getSyncCommitExecutor().execute(new
SynchronousCommitTask(cd) {
- @Override
- protected void runCommitStep(CommitData cd) throws Exception {
- checkForOrphanedLocks(cd);
- rollbackOtherLocks(cd);
- }
- });
- } else if (stopAfterPreCommit) {
- cd.commitObserver.committed();
- } else {
- CompletableFuture<Stamp> cfuture =
env.getSharedResources().getOracleClient().getStampAsync();
- addCallback(cfuture, cd, stamp -> beginSecondCommitPhase(cd, stamp));
+ @Override
+ CompletableFuture<Void> getFailureOp(CommitData cd) {
+ throw new IllegalStateException("Failure not expected");
}
}
- private void rollbackOtherLocks(CommitData cd) throws Exception {
- // roll back locks
- // TODO let rollback be done lazily? this makes GC more difficult
- Flutation m;
+ private CompletableFuture<Void> rollbackLocks(CommitData cd) {
+ CommitStep firstStep = new RollbackOtherLocks();
+ firstStep.andThen(new RollbackPrimaryLock());
- ArrayList<Mutation> mutations = new ArrayList<>(cd.acceptedRows.size());
- for (Bytes row : cd.acceptedRows) {
- m = new Flutation(env, row);
- for (Entry<Column, Bytes> entry : updates.get(row).entrySet()) {
- if (isReadLock(entry.getValue())) {
- m.put(entry.getKey(), ColumnConstants.RLOCK_PREFIX |
ReadLockUtil.encodeTs(startTs, true),
- DelReadLockValue.encodeRollback());
- } else {
- m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs,
- DelLockValue.encodeRollback(false, true));
+ return firstStep.compose(cd)
+ .thenRun(() ->
cd.commitObserver.commitFailed(cd.getShortCollisionMessage()));
+
+ }
+
+
+ class RollbackOtherLocks extends BatchWriterStep {
+
+ @Override
+ public Collection<Mutation> createMutations(CommitData cd) {
+ // roll back locks
+
+ // TODO let rollback be done lazily? this makes GC more difficult
+
+ Flutation m;
+
+ ArrayList<Mutation> mutations = new ArrayList<>(cd.acceptedRows.size());
+ for (Bytes row : cd.acceptedRows) {
+ m = new Flutation(env, row);
+ for (Entry<Column, Bytes> entry : updates.get(row).entrySet()) {
+ if (isReadLock(entry.getValue())) {
+ m.put(entry.getKey(),
+ ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs,
true),
+ DelReadLockValue.encodeRollback());
+ } else {
+ m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs,
+ DelLockValue.encodeRollback(false, true));
+ }
}
+ mutations.add(m);
}
- mutations.add(m);
+
+ return mutations;
}
+ }
+
+ class RollbackPrimaryLock extends BatchWriterStep {
+
+ @Override
+ public Collection<Mutation> createMutations(CommitData cd) {
+ // mark transaction as complete for garbage collection purposes
+ Flutation m = new Flutation(env, cd.prow);
+
+ m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs,
+ DelLockValue.encodeRollback(startTs, true, true));
+ m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);
- CompletableFuture<Void> cfuture =
-
env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
- addCallback(cfuture, cd, result -> rollbackPrimaryLock(cd));
+ return Collections.singletonList(m);
+ }
}
- private void rollbackPrimaryLock(CommitData cd) throws Exception {
+ @VisibleForTesting
+ public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) {
- // mark transaction as complete for garbage collection purposes
- Flutation m = new Flutation(env, cd.prow);
+ SyncCommitObserver sco = new SyncCommitObserver();
+ cd.commitObserver = sco;
+ try {
+ CommitStep firstStep = new GetCommitStampStepTest(commitStamp);
- m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs,
- DelLockValue.encodeRollback(startTs, true, true));
- m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);
+ firstStep.andThen(new WriteNotificationsStep()).andThen(new
CommitPrimaryStep());
- CompletableFuture<Void> cfuture =
- env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(m);
- addCallback(cfuture, cd,
- result ->
cd.commitObserver.commitFailed(cd.getShortCollisionMessage()));
+ firstStep.compose(cd).thenRun(() -> cd.commitObserver.committed())
Review comment:
@jkosh44 I tracked down why the test are failing. Its because `() ->
cd.commitObserver.committed()` is always run. This code is run even when
`CommitPrimaryStep()` fails and calls `commitFailed()` on `SyncCommitObserver`.
So its a timing issue, will `() -> cd.commitObserver.committed()` or code in
`sco.waitForCommit()` run first.
One way to resolve this would be to create simple commit step that calls
`cd.commitObserver.committed()`. Since its a commit step, it will only run if
`CommitPrimaryStep()` is successful.
I tracked this down by adding code to `SyncCommitObserver` to print stack
traces whenever any method on it is called. Need to open a new issue to ensure
only one method can be called on `SyncCommitObserver`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services