belliottsmith commented on code in PR #257:
URL: https://github.com/apache/cassandra-accord/pull/257#discussion_r2505515279
##########
accord-core/src/main/java/accord/local/durability/DurabilityQueue.java:
##########
@@ -73,325 +86,702 @@ public class DurabilityQueue
private static final Logger logger =
LoggerFactory.getLogger(DurabilityQueue.class);
private static final ConcurrentSkipListMap<Long, Collection<Node.Id>>
WARNINGS_LOGGED = new ConcurrentSkipListMap<>();
private static final long EXHAUSTED_LOG_INTERVAL_MINUTES = 5;
- private static final int PRUNE_SIZE_THRESHOLD = 128;
+ private static final long SCHEDULING_SLACK_MICROS =
MILLISECONDS.toMicros(100);
- private final Node node;
- private int maxConcurrency = 16;
+ public interface Adapter
+ {
+ Topology currentTopology();
- private final ObjectHashSet<DurabilityResults> inProgress = new
ObjectHashSet<>();
- private final TreeMap<RoutingKey, RoutingKey> inProgressRanges = new
TreeMap<>();
- // TODO (desired): prioritise by least recently updated range
- private final Deque<Pending> pending = new ArrayDeque<>();
- private int pendingCounter, prunedAt;
+ long retryDelay(int attempts, TimeUnit units);
+ long elapsed(TimeUnit units);
+ Scheduler scheduler();
- static class Pending
- {
- final SyncPoint<Range> syncPoint;
- final @Nullable DurabilityRequest request;
- final int attempt;
+ void unregister(DurabilityRequest request);
+ void abandon(DurabilityRequest request, PartialSyncPoint syncPoint,
boolean pruned);
+ void done(DurabilityRequest request, PartialSyncPoint syncPoint);
+ void retry(DurabilityRequest request, PartialSyncPoint syncPoint);
- Pending(SyncPoint<Range> syncPoint, @Nullable DurabilityRequest
request, int attempt)
- {
- this.syncPoint = syncPoint;
- this.request = request;
- this.attempt = attempt;
- }
+ DurabilityResults execute(PartialSyncPoint syncPoint, int attempt);
}
- public DurabilityQueue(Node node)
+ public static class NodeAdapter implements Adapter
{
- this.node = node;
- }
+ final Node node;
+ public NodeAdapter(Node node) { this.node = node; }
- synchronized void submit(SyncPoint<Range> syncPoint, @Nullable
DurabilityRequest request)
- {
- if (request != null)
- request.register(syncPoint.syncId, node.elapsed(MICROSECONDS));
+ @Override public Topology currentTopology() { return
node.topology().current(); }
+ @Override public long retryDelay(int attempts, TimeUnit units) {
return node.agent().retryDurabilityDelay(node, attempts, units); }
+ @Override public long elapsed(TimeUnit units) { return
node.elapsed(units); }
+ @Override public Scheduler scheduler() { return node.scheduler(); }
+ @Override public void unregister(DurabilityRequest request) {
node.durability().unregister(request); }
- submit(syncPoint, request, 1);
- }
+ @Override public void abandon(DurabilityRequest request,
PartialSyncPoint syncPoint, boolean pruned) {}
+ @Override public void done(DurabilityRequest request, PartialSyncPoint
syncPoint) {}
- private synchronized void submit(SyncPoint<Range> syncPoint, @Nullable
DurabilityRequest request, int attempt)
- {
- SequentialAsyncExecutor executor = node.someSequentialExecutor();
- if (executor != null && inProgress.size() < maxConcurrency &&
notInProgress(syncPoint.route))
- {
- start(syncPoint, request, attempt, executor);
- }
- else
+ @Override
+ public void retry(DurabilityRequest request, PartialSyncPoint
syncPoint)
{
- ++pendingCounter;
- pending.add(new Pending(syncPoint, request, attempt));
- if (pending.size() >= PRUNE_SIZE_THRESHOLD && pendingCounter >
prunedAt)
- prune();
+ node.durability().shards().request(request,
request.stillWaiting(syncPoint.route));
}
- }
- private boolean notInProgress(Route<Range> route)
- {
- for (Range range : route)
+ @Override
+ public DurabilityResults execute(PartialSyncPoint syncPoint, int
attempt)
{
- Map.Entry<RoutingKey, RoutingKey> e =
inProgressRanges.floorEntry(range.start());
- if (e != null && e.getValue().compareTo(range.start()) > 0)
- return false;
+ return coordinateIncluding(node, syncPoint,
node.someSequentialExecutor(), attempt);
}
- return true;
}
- private void registerInProgress(SyncPoint<Range> syncPoint,
ExecuteSyncPoint.DurabilityResults submitted)
- {
- inProgress.add(submitted);
- for (Range range : syncPoint.route)
- inProgressRanges.put(range.start(), range.end());
- }
+ private static final PendingComparators BY_RANGE = new
PendingComparators();
+ private static final Comparator<Pending> BY_PRIORITY = (a, b) -> {
+ if ((a.status == ACTIVE) != (b.status == ACTIVE))
+ return a.status == ACTIVE ? -1 : 1;
+ if (a.startAt != b.startAt)
+ return a.startAt < b.startAt ? -1 : 1;
+ return a.syncPoint.syncId.compareTo(b.syncPoint.syncId);
+ };
+
+ private static final Comparator<Pending> BY_ID = (a, b) -> {
+ return a.syncPoint.syncId.compareTo(b.syncPoint.syncId);
+ };
- private void unregisterInProgress(SyncPoint<Range> syncPoint,
ExecuteSyncPoint.DurabilityResults submitted)
+ private static class PendingComparators implements
IntervalBTree.IntervalComparators<PendingRange>
{
- inProgress.remove(submitted);
- for (Range range : syncPoint.route)
+ @Override public Comparator<PendingRange> totalOrder()
{
- RoutingKey end = inProgressRanges.remove(range.start());
- Invariants.require(range.end().equals(end), "Expected exact range
to be in progress, but found different end (%s vs %s)", range.end(), end);
+ return (a, b) -> {
+ int c = a.range.compare(b.range);
+ if (c == 0) c =
a.pending.syncPoint.syncId.compareTo(b.pending.syncPoint.syncId);
+ return c;
+ };
}
+ @Override public Comparator<PendingRange> endWithEndSorter() { return
(a, b) -> a.range.end().compareTo(b.range.end()); }
+ @Override public SymmetricComparator<PendingRange>
startWithStartSeeker() { return (a, b) ->
startWithStart(a.range.start().compareTo(b.range.start())); }
+ @Override public SymmetricComparator<PendingRange>
startWithEndSeeker() { return (a, b) ->
startWithEnd(a.range.start().compareTo(b.range.end())); }
+ @Override public SymmetricComparator<PendingRange>
endWithStartSeeker() { return (a, b) ->
endWithStart(a.range.end().compareTo(b.range.start())); }
}
- private static class SortForPruning implements Comparable<SortForPruning>
+ static class PendingRange
{
final Range range;
- final SyncPoint<Range> syncPoint;
+ final Pending pending;
- SortForPruning(Range range, SyncPoint<Range> syncPoint)
+ PendingRange(Range range, Pending pending)
{
this.range = range;
- this.syncPoint = syncPoint;
+ this.pending = pending;
}
@Override
- public int compareTo(@Nonnull SortForPruning that)
+ public String toString()
{
- int c = this.range.start().compareTo(that.range.start());
- if (c == 0) c = -this.range.end().compareTo(that.range.end());
- if (c == 0) return
this.syncPoint.syncId.compareTo(that.syncPoint.syncId);
- return c;
+ return pending + "@" + range;
}
}
- private static class OverlapsForPruning
+ enum Status { QUEUED, ACTIVE, COMPLETING, RESTARTING, ABANDONED, DONE }
+
+ static class Pending extends IntrusivePriorityHeap.Node
{
- final SyncPoint<Range> syncPoint;
- final NavigableMap<TxnId, SyncPoint<Range>> overlaps = new TreeMap<>();
+ final @Nullable DurabilityRequest request;
+ PartialSyncPoint syncPoint;
+ Object[] byRange;
+
+ long startAt;
+ int attempt = 1;
+ Status status = QUEUED;
+
+ DurabilityResult achieved;
+ List<Pending> deferred;
+ Pending parent;
- private OverlapsForPruning(SyncPoint<Range> syncPoint)
+ Pending(SyncPoint syncPoint, @Nullable DurabilityRequest request, long
startAt)
{
this.syncPoint = syncPoint;
+ this.request = request;
+ this.startAt = startAt;
+ updateByRange();
}
- void add(SyncPoint<Range> overlap)
+ void updateByRange()
{
- overlaps.put(overlap.syncId, overlap);
+ List<PendingRange> pendingRanges = new
ArrayList<>(syncPoint.route.size());
+ for (Range range : syncPoint.route)
+ pendingRanges.add(new PendingRange(range, this));
+ this.byRange = IntervalBTree.build(pendingRanges, BY_RANGE);
}
- }
- private synchronized void prune()
- {
- prunedAt = pending.size();
- pendingCounter = 0;
- List<SortForPruning> sorted = new ArrayList<>();
- for (Pending p : pending)
+ @Override
+ protected boolean isInHeap()
{
- for (Range range : p.syncPoint.route)
- sorted.add(new SortForPruning(range, p.syncPoint));
+ return super.isInHeap();
}
- sorted.sort(SortForPruning::compareTo);
- Map<TxnId, OverlapsForPruning> overlaps = new TreeMap<>();
- int i = 0;
- while (i < sorted.size())
+
+ boolean isAncestor(Pending pending)
{
- SortForPruning entry = sorted.get(i);
- for (int j = i + 1; j < sorted.size() ; ++j)
- {
- SortForPruning next = sorted.get(j);
- if (next.range.start().compareTo(entry.range.end()) >= 0)
- break;
+ Pending p = parent;
+ while (p != null && p != pending)
+ p = p.parent;
+ return p != null;
+ }
- overlaps.computeIfAbsent(next.syncPoint.syncId, ignore -> new
OverlapsForPruning(next.syncPoint)).add(entry.syncPoint);
- overlaps.computeIfAbsent(entry.syncPoint.syncId, ignore -> new
OverlapsForPruning(entry.syncPoint)).add(next.syncPoint);
- }
- ++i;
+ @Override
+ public String toString()
+ {
+ return syncPoint.syncId.toString() + ":" + status;
}
- Set<TxnId> remove = new HashSet<>();
- for (OverlapsForPruning e : overlaps.values())
+ private void addDeferred(Pending add)
{
- SyncPoint<Range> syncPoint = e.syncPoint;
- Ranges supersedes =
e.overlaps.tailMap(syncPoint.syncId).values().stream().map(s ->
s.route.toRanges()).reduce(Ranges.EMPTY, Ranges::with);
- Ranges superseding =
e.overlaps.headMap(syncPoint.syncId).values().stream().map(s ->
s.route.toRanges()).reduce(Ranges.EMPTY, Ranges::with);
- if (superseding.containsAll(syncPoint.route) &&
supersedes.containsAll(syncPoint.route))
- remove.add(syncPoint.syncId);
+ Invariants.require(add.parent == null);
+ Invariants.require(status.compareTo(ACTIVE) <= 0);
+ Invariants.require(BY_PRIORITY.compare(this, add) < 0);
+ Invariants.require(deferred == null || !deferred.contains(add));
+ Invariants.require(!isAncestor(add));
+ if (deferred == null)
+ deferred = new ArrayList<>();
+ deferred.add(add);
+ add.parent = this;
}
+ private void removeDeferred(Pending remove)
+ {
+ Invariants.require(remove.status == QUEUED);
+ Invariants.require(remove.parent == this);
+ remove.parent = null;
+ boolean removed = deferred.remove(remove);
+ Invariants.require(removed);
+ if (deferred.isEmpty())
+ deferred = null;
+ }
- logger.info("Pruned {} sync points awaiting durability to {}",
pending.size(), pending.size() - remove.size());
- if (!remove.isEmpty())
+ private void unsetParent()
{
- List<Pending> newPending = new ArrayList<>(pending.size());
- for (Pending p : pending)
- {
- if (!remove.contains(p.syncPoint.syncId))
- newPending.add(p);
- }
- pending.clear();
- pending.addAll(newPending);
+ Invariants.require(parent.deferred == null);
+ parent = null;
}
}
- private void start(SyncPoint<Range> exclusiveSyncPoint, @Nullable
DurabilityRequest request, int attempt, SequentialAsyncExecutor executor)
+ // TODO (desired): prioritise by least recently updated range
+ static final class PendingQueue extends IntrusivePriorityHeap<Pending>
{
- logger.debug("{}: Awaiting durability for {}",
exclusiveSyncPoint.syncId, exclusiveSyncPoint.route.toRanges());
- DurabilityResults coordinate = coordinateIncluding(node,
exclusiveSyncPoint, executor, attempt);
- registerInProgress(exclusiveSyncPoint, coordinate);
+ final Comparator<Pending> comparator;
+
+ PendingQueue(Comparator<Pending> comparator)
+ {
+ this.comparator = comparator;
+ }
+
+ Pending poll() { heapify(); return super.pollNode(); }
+ Pending peek() { heapify(); return super.peekNode(); }
+ @Override public int compare(Pending o1, Pending o2) { return
comparator.compare(o1, o2); }
+ @Override protected void append(Pending node) { super.append(node); }
+ @Override protected void remove(Pending node) { super.remove(node); }
+ @Override protected void clear() { super.clear(); }
+ }
+
+ private final Adapter adapter;
+ private int maxConcurrency = 16;
+ private int overlapPruneThreshold = 8;
+ private int globalPruneThreshold = 256;
+
+ int activeCount;
+ int pendingCount;
+ private final PendingQueue queued = new
PendingQueue(Comparator.comparing(p -> p.startAt));
+ private Object[] pendingByRange = IntervalBTree.empty();
+ private final ArrayDeque<Pending> requeue = new ArrayDeque<>();
+
+ private long processingQueueAt = Long.MAX_VALUE;
+ private Cancellable cancelProcessingQueue;
+
+ public DurabilityQueue(Node node)
+ {
+ this.adapter = new NodeAdapter(node);
+ }
+
+ public DurabilityQueue(Adapter adapter)
+ {
+ this.adapter = adapter;
+ }
+
+ void submit(SyncPoint syncPoint, @Nullable DurabilityRequest request)
+ {
+ long nowMicros = adapter.elapsed(MICROSECONDS);
if (request != null)
- request.reportAttempt(exclusiveSyncPoint.syncId,
node.elapsed(MICROSECONDS));
+ request.register(syncPoint.syncId, nowMicros);
- coordinate.onQuorum().invoke((success, fail) -> {
- synchronized (this)
- {
- unregisterInProgress(exclusiveSyncPoint, coordinate);
- maybeSubmitPending();
- }
- });
- coordinate.onDone().invoke((success, fail) -> {
- TxnId txnId = exclusiveSyncPoint.syncId;
- Ranges ranges = exclusiveSyncPoint.route.toRanges();
+ Pending pending = new Pending(syncPoint, request, nowMicros);
+ synchronized (this)
+ {
+ register(pending);
+ enqueue(pending);
+ }
+ }
- String requestor = null;
- boolean isDone;
- if (request == null)
+ private void enqueue(Pending enqueue)
+ {
+ while (enqueue != null)
+ {
+ Invariants.require(enqueue.status == QUEUED);
+ Invariants.require(enqueue.parent == null);
+ if (enqueue.achieved != null && enqueue.request == null &&
enqueue.achieved.min.remote.compareTo(Quorum) >= 0 && fullySuperseded(enqueue))
{
- isDone = success != null && success.achievedRemote == All;
+ abandon(enqueue, true, false);
+ logger.debug("{}: already achieved quorum durability and there
are superseding durability requests pending; not retrying",
enqueue.syncPoint.syncId);
+ enqueue = requeue.poll();
+ continue;
}
- else
+
+ Conflicts conflicts = conflicts(enqueue);
+ Pending next = conflicts.next;
+ if (next != null && next != enqueue)
{
- requestor = " requested by " + request.requestedBy;
- isDone = request.isDone(ranges);
- if (!isDone && request.including != null)
+ if (BY_PRIORITY.compare(next, enqueue) <= 0)
{
- Topology topology = node.topology().current();
- SortedArrayList<Node.Id> removed =
topology.removedIds().intersecting(request.including);
- SortedArrayList<Node.Id> hardRemoved =
topology.hardRemovedIds().intersecting(request.including);
- if (!removed.isEmpty() || !hardRemoved.isEmpty())
+ next.addDeferred(enqueue);
+ enqueue = requeue.poll();
+ continue;
+ }
+
+ Invariants.require(next.status == QUEUED);
+ List<Pending> addDeferred = next.deferred;
+ next.deferred = null;
+ if (addDeferred != null)
+ {
+ setParent(enqueue.deferred, enqueue);
+ if (!enqueue.syncPoint.route.containsAll((AbstractRanges)
next.syncPoint.route))
+ {
+ int count = 0;
+ for (int i = 0 ; i < addDeferred.size() ; ++i)
+ {
+ Pending p = addDeferred.get(i);
+ if (p.syncPoint.route.intersects((AbstractRanges)
enqueue.syncPoint.route))
+ addDeferred.set(count++, p);
+ else
+ requeue(p);
+ }
+
+ if (count == 0) addDeferred = null;
+ else truncate(addDeferred, count);
+ }
+ if (addDeferred != null)
{
- String message = String.format("%s: Cannot achieve
durability requested by %s as (%s/%s) are (removed/hard removed)",
exclusiveSyncPoint.syncId, request, removed, hardRemoved);
- logger.info(message);
- node.durability().unregister(request);
- // TODO (desired): more specific exception?
- request.result.tryFailure(new
RuntimeException(message));
- return;
+ if (enqueue.deferred == null) enqueue.deferred =
addDeferred;
+ else enqueue.deferred.addAll(addDeferred);
}
}
+ if (next.parent != null)
+ next.parent.removeDeferred(next);
+ enqueue.addDeferred(next);
+ if (next.isInHeap())
+ queued.remove(next);
}
- if (fail != null)
+
+ queued.append(enqueue);
+ maybePrune(conflicts.supersedes);
+ enqueue = requeue.poll();
+ }
+
+ processQueue();
+ }
+
+ private void maybePrune(List<Pending> superseded)
+ {
+ if (!shouldPrune(superseded))
+ return;
+
+ // first remove any that have already achieved quorum and have no
associated request, or where the request has been finished
+ for (Pending p : superseded)
+ {
+ if (p.status != QUEUED) continue;
+ if (p.request != null && !p.request.isDone()) continue;
+ if (p.request == null && (p.achieved == null ||
p.achieved.min.remote.compareTo(Quorum) < 0)) continue;
+
+ if (p.parent != null)
+ p.parent.removeDeferred(p);
+ abandon(p, true, true);
+ return;
Review Comment:
The assumption is that we if we are adding only one item, we should only
exceed the overlap threshold by one. In reality this isn't necessarily true,
given how we define overlaps, so we could keep going if you like.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]