belliottsmith commented on code in PR #3943: URL: https://github.com/apache/cassandra/pull/3943#discussion_r1978889257
########## src/java/org/apache/cassandra/service/accord/AccordService.java: ########## @@ -540,133 +508,234 @@ public IVerbHandler<? extends Reply> responseHandler() return responseHandler; } - public DurabilityScheduling.ImmutableView durabilityScheduling() + public ShardDurability.ImmutableView shardDurability() { - return node.durabilityScheduling().immutableView(); + return node.durability().shards().immutableView(); } - private Seekables<?, ?> barrier(@Nonnull Seekables<?, ?> keysOrRanges, long epoch, Dispatcher.RequestTime requestTime, long timeoutNanos, BarrierType barrierType, boolean isForWrite, BiFunction<Node, FullRoute<?>, AsyncSyncPoint> syncPoint) + @Override + public AsyncChain<Void> sync(Object requestedBy, Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote) + { + return node.durability().sync(requestedBy, minBound, ranges, include, syncLocal, syncRemote); + } + + @Override + public AsyncChain<Void> sync(Timestamp minBound, Keys keys, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote) + { + if (keys.size() != 1) + return syncInternal(minBound, keys, syncLocal, syncRemote); + + return KeyBarriers.find(node, minBound, keys.get(0).toUnseekable(), syncLocal, syncRemote) + .flatMap(found -> KeyBarriers.await(node, found, syncLocal, syncRemote)) + .flatMap(success -> { + if (success) + return null; + return syncInternal(minBound, keys, syncLocal, syncRemote); + }); + } + + private AsyncChain<Void> syncInternal(Timestamp minBound, Keys keys, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote) { - Stopwatch sw = Stopwatch.createStarted(); - keysOrRanges = intersectionWithAccordManagedRanges(keysOrRanges); - // It's possible none of them were Accord managed and we aren't going to treat that as an error - if (keysOrRanges.isEmpty()) + TxnId txnId = node.nextTxnId(minBound, Write, Key, cardinality(keys)); + FullRoute<?> route = node.computeRoute(txnId, keys); + Txn txn = new Txn.InMemory(Write, keys, TxnRead.createNoOpRead(keys), TxnQuery.NONE, TxnUpdate.empty()); + return CoordinateTransaction.coordinate(node, route, txnId, txn) + .map(ignore -> (Void)null).beginAsResult(); + } + + @Override + public AsyncChain<Timestamp> maxConflict(Ranges ranges) + { + return node.commandStores().any().build(() -> CoordinateMaxConflict.maxConflict(node, ranges)).flatMap(i -> i); + } + + public static <V> V getBlocking(AsyncChain<V> async, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline, boolean isTxnRequest) + { + return getBlocking(async, null, keysOrRanges, bookkeeping, startedAt, deadline, isTxnRequest); + } + + public static <V> V getBlocking(AsyncChain<V> async, @Nullable TxnId txnId, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline, boolean isTxnRequest) + { + AccordResult<V> result = new AccordResult<>(txnId, keysOrRanges, bookkeeping, startedAt, deadline, isTxnRequest); + async.begin(result); + return result.awaitAndGet(); + } + + public static void getBlocking(AsyncChain<Void> async, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAt, long deadline) + { + getBlocking(async, keysOrRanges, bookkeeping, startedAt, deadline, false); + } + + public static class AccordResult<V> extends AsyncFuture<V> implements BiConsumer<V, Throwable>, IAccordService.IAccordResult<V> + { + final @Nullable TxnId txnId; + final Seekables<?, ?> keysOrRanges; + final RequestBookkeeping bookkeeping; + final long startedAtNanos, deadlineAtNanos; + final boolean isTxnRequest; + + public AccordResult(@Nullable TxnId txnId, Seekables<?, ?> keysOrRanges, RequestBookkeeping bookkeeping, long startedAtNanos, long deadlineAtNanos, boolean isTxnRequest) { - logger.info("Skipping barrier because there are no ranges managed by Accord"); - return keysOrRanges; + this.txnId = txnId; + this.keysOrRanges = keysOrRanges; + this.bookkeeping = bookkeeping; + this.startedAtNanos = startedAtNanos; + this.deadlineAtNanos = deadlineAtNanos; + this.isTxnRequest = isTxnRequest; } - FullRoute<?> route = node.computeRoute(epoch, keysOrRanges); - AccordClientRequestMetrics metrics = isForWrite ? accordWriteMetrics : accordReadMetrics; - try + @Override + public V awaitAndGet() throws RequestExecutionException { - logger.debug("Starting barrier key: {} epoch: {} barrierType: {} isForWrite {}", keysOrRanges, epoch, barrierType, isForWrite); - AsyncResult<TxnId> asyncResult = syncPoint == null - ? Barrier.barrier(node, keysOrRanges, route, epoch, barrierType) - : Barrier.barrier(node, keysOrRanges, route, epoch, barrierType, syncPoint); - long deadlineNanos = requestTime.startedAtNanos() + timeoutNanos; - TxnId txnId = AsyncChains.getBlocking(asyncResult, deadlineNanos - nanoTime(), NANOSECONDS); - if (keysOrRanges.domain() == Key) + try { - PartitionKey key = (PartitionKey)keysOrRanges.get(0); - maybeSaveAccordKeyMigrationLocally(key, Epoch.create(txnId.epoch())); + if (!awaitUntil(deadlineAtNanos)) + accept(null, new Timeout(txnId, null)); } - logger.debug("Completed barrier attempt in {}ms, {}ms since attempts start, barrier key: {} epoch: {} barrierType: {} isForWrite {}", - sw.elapsed(MILLISECONDS), - NANOSECONDS.toMillis(nanoTime() - requestTime.startedAtNanos()), - keysOrRanges, epoch, barrierType, isForWrite); - return keysOrRanges; - } - catch (ExecutionException e) - { - Throwable cause = Throwables.getRootCause(e); - if (cause instanceof Timeout) + catch (InterruptedException e) { - TxnId txnId = ((Timeout) cause).txnId(); - ((AccordAgent) node.agent()).onFailedBarrier(txnId, keysOrRanges, cause); - metrics.timeouts.mark(); - throw newBarrierTimeout(((CoordinationFailed)cause).txnId(), barrierType, isForWrite, keysOrRanges); + accept(null, e); } - if (cause instanceof Preempted) + + Throwable fail = fail(); + if (fail != null) + throw (RequestExecutionException) fail; + return success(); + } + + @Override + public void accept(V success, Throwable fail) + { + if (fail != null) { - TxnId txnId = ((Preempted) cause).txnId(); - ((AccordAgent) node.agent()).onFailedBarrier(txnId, keysOrRanges, cause); - //TODO need to improve - // Coordinator "could" query the accord state to see whats going on but that doesn't exist yet. - // Protocol also doesn't have a way to denote "unknown" outcome, so using a timeout as the closest match - throw newBarrierPreempted(((CoordinationFailed)cause).txnId(), barrierType, isForWrite, keysOrRanges); + RequestExecutionException report; + CoordinationFailed coordinationFailed = findCoordinationFailed(fail); + TxnId txnId = this.txnId; + if (coordinationFailed != null) + { + if (txnId == null && coordinationFailed.txnId() != null) + txnId = coordinationFailed.txnId(); + + if (coordinationFailed instanceof Timeout) + { + report = bookkeeping.newTimeout(txnId, keysOrRanges); + } + else if (coordinationFailed instanceof Preempted) + { + report = bookkeeping.newPreempted(txnId, keysOrRanges); + } + else if (isTxnRequest && coordinationFailed instanceof TopologyMismatch) + { + // Excluding bugs topology mismatch can occur because a table was dropped in between creating the txn + // and executing it. + // It could also race with the table stopping/starting being managed by Accord. + // The caller can retry if the table indeed exists and is managed by Accord. + Set<TableId> txnDroppedTables = txnDroppedTables(keysOrRanges); + Tracing.trace("Accord returned topology mismatch: " + coordinationFailed.getMessage()); + logger.debug("Accord returned topology mismatch", coordinationFailed); + bookkeeping.markTopologyMismatch(); + // Throw IRE in case the caller fails to check if the table still exists + if (!txnDroppedTables.isEmpty()) + { + Tracing.trace("Accord txn uses dropped tables {}", txnDroppedTables); + logger.debug("Accord txn uses dropped tables {}", txnDroppedTables); + throw new InvalidRequestException("Accord transaction uses dropped tables"); + } + trySuccess((V)RetryWithNewProtocolResult.instance); + return; + } + else + { + // this case happens when a non-timeout exception is seen, and we are unable to move forward + if (coordinationFailed instanceof Exhausted) Review Comment: fair point! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org