belliottsmith commented on code in PR #257:
URL: https://github.com/apache/cassandra-accord/pull/257#discussion_r2505507677


##########
accord-core/src/main/java/accord/local/durability/DurabilityQueue.java:
##########
@@ -73,325 +86,702 @@ public class DurabilityQueue
     private static final Logger logger = 
LoggerFactory.getLogger(DurabilityQueue.class);
     private static final ConcurrentSkipListMap<Long, Collection<Node.Id>> 
WARNINGS_LOGGED = new ConcurrentSkipListMap<>();
     private static final long EXHAUSTED_LOG_INTERVAL_MINUTES = 5;
-    private static final int PRUNE_SIZE_THRESHOLD = 128;
+    private static final long SCHEDULING_SLACK_MICROS = 
MILLISECONDS.toMicros(100);
 
-    private final Node node;
-    private int maxConcurrency = 16;
+    public interface Adapter
+    {
+        Topology currentTopology();
 
-    private final ObjectHashSet<DurabilityResults> inProgress = new 
ObjectHashSet<>();
-    private final TreeMap<RoutingKey, RoutingKey> inProgressRanges = new 
TreeMap<>();
-    // TODO (desired): prioritise by least recently updated range
-    private final Deque<Pending> pending = new ArrayDeque<>();
-    private int pendingCounter, prunedAt;
+        long retryDelay(int attempts, TimeUnit units);
+        long elapsed(TimeUnit units);
+        Scheduler scheduler();
 
-    static class Pending
-    {
-        final SyncPoint<Range> syncPoint;
-        final @Nullable DurabilityRequest request;
-        final int attempt;
+        void unregister(DurabilityRequest request);
+        void abandon(DurabilityRequest request, PartialSyncPoint syncPoint, 
boolean pruned);
+        void done(DurabilityRequest request, PartialSyncPoint syncPoint);
+        void retry(DurabilityRequest request, PartialSyncPoint syncPoint);
 
-        Pending(SyncPoint<Range> syncPoint, @Nullable DurabilityRequest 
request, int attempt)
-        {
-            this.syncPoint = syncPoint;
-            this.request = request;
-            this.attempt = attempt;
-        }
+        DurabilityResults execute(PartialSyncPoint syncPoint, int attempt);
     }
 
-    public DurabilityQueue(Node node)
+    public static class NodeAdapter implements Adapter
     {
-        this.node = node;
-    }
+        final Node node;
+        public NodeAdapter(Node node) { this.node = node; }
 
-    synchronized void submit(SyncPoint<Range> syncPoint, @Nullable 
DurabilityRequest request)
-    {
-        if (request != null)
-            request.register(syncPoint.syncId, node.elapsed(MICROSECONDS));
+        @Override public Topology currentTopology() { return 
node.topology().current(); }
+        @Override public long retryDelay(int attempts, TimeUnit units) { 
return node.agent().retryDurabilityDelay(node, attempts, units); }
+        @Override public long elapsed(TimeUnit units) { return 
node.elapsed(units); }
+        @Override public Scheduler scheduler() { return node.scheduler(); }
+        @Override public void unregister(DurabilityRequest request) { 
node.durability().unregister(request); }
 
-        submit(syncPoint, request, 1);
-    }
+        @Override public void abandon(DurabilityRequest request, 
PartialSyncPoint syncPoint, boolean pruned) {}
+        @Override public void done(DurabilityRequest request, PartialSyncPoint 
syncPoint) {}
 
-    private synchronized void submit(SyncPoint<Range> syncPoint, @Nullable 
DurabilityRequest request, int attempt)
-    {
-        SequentialAsyncExecutor executor = node.someSequentialExecutor();
-        if (executor != null && inProgress.size() < maxConcurrency && 
notInProgress(syncPoint.route))
-        {
-            start(syncPoint, request, attempt, executor);
-        }
-        else
+        @Override
+        public void retry(DurabilityRequest request, PartialSyncPoint 
syncPoint)
         {
-            ++pendingCounter;
-            pending.add(new Pending(syncPoint, request, attempt));
-            if (pending.size() >= PRUNE_SIZE_THRESHOLD && pendingCounter > 
prunedAt)
-                prune();
+            node.durability().shards().request(request, 
request.stillWaiting(syncPoint.route));
         }
-    }
 
-    private boolean notInProgress(Route<Range> route)
-    {
-        for (Range range : route)
+        @Override
+        public DurabilityResults execute(PartialSyncPoint syncPoint, int 
attempt)
         {
-            Map.Entry<RoutingKey, RoutingKey> e = 
inProgressRanges.floorEntry(range.start());
-            if (e != null && e.getValue().compareTo(range.start()) > 0)
-                return false;
+            return coordinateIncluding(node, syncPoint, 
node.someSequentialExecutor(), attempt);
         }
-        return true;
     }
 
-    private void registerInProgress(SyncPoint<Range> syncPoint, 
ExecuteSyncPoint.DurabilityResults submitted)
-    {
-        inProgress.add(submitted);
-        for (Range range : syncPoint.route)
-            inProgressRanges.put(range.start(), range.end());
-    }
+    private static final PendingComparators BY_RANGE = new 
PendingComparators();
+    private static final Comparator<Pending> BY_PRIORITY = (a, b) -> {
+        if ((a.status == ACTIVE) != (b.status == ACTIVE))
+            return a.status == ACTIVE ? -1 : 1;
+        if (a.startAt != b.startAt)
+            return a.startAt < b.startAt ? -1 : 1;
+        return a.syncPoint.syncId.compareTo(b.syncPoint.syncId);
+    };
+
+    private static final Comparator<Pending> BY_ID = (a, b) -> {
+        return a.syncPoint.syncId.compareTo(b.syncPoint.syncId);
+    };
 
-    private void unregisterInProgress(SyncPoint<Range> syncPoint, 
ExecuteSyncPoint.DurabilityResults submitted)
+    private static class PendingComparators implements 
IntervalBTree.IntervalComparators<PendingRange>
     {
-        inProgress.remove(submitted);
-        for (Range range : syncPoint.route)
+        @Override public Comparator<PendingRange> totalOrder()
         {
-            RoutingKey end = inProgressRanges.remove(range.start());
-            Invariants.require(range.end().equals(end), "Expected exact range 
to be in progress, but found different end (%s vs %s)", range.end(), end);
+            return (a, b) -> {
+                int c = a.range.compare(b.range);
+                if (c == 0) c = 
a.pending.syncPoint.syncId.compareTo(b.pending.syncPoint.syncId);
+                return c;
+            };
         }
+        @Override public Comparator<PendingRange> endWithEndSorter() { return 
(a, b) -> a.range.end().compareTo(b.range.end()); }
+        @Override public SymmetricComparator<PendingRange> 
startWithStartSeeker() { return (a, b) -> 
startWithStart(a.range.start().compareTo(b.range.start())); }
+        @Override public SymmetricComparator<PendingRange> 
startWithEndSeeker() { return (a, b) -> 
startWithEnd(a.range.start().compareTo(b.range.end())); }
+        @Override public SymmetricComparator<PendingRange> 
endWithStartSeeker() { return (a, b) -> 
endWithStart(a.range.end().compareTo(b.range.start())); }
     }
 
-    private static class SortForPruning implements Comparable<SortForPruning>
+    static class PendingRange
     {
         final Range range;
-        final SyncPoint<Range> syncPoint;
+        final Pending pending;
 
-        SortForPruning(Range range, SyncPoint<Range> syncPoint)
+        PendingRange(Range range, Pending pending)
         {
             this.range = range;
-            this.syncPoint = syncPoint;
+            this.pending = pending;
         }
 
         @Override
-        public int compareTo(@Nonnull SortForPruning that)
+        public String toString()
         {
-            int c = this.range.start().compareTo(that.range.start());
-            if (c == 0) c = -this.range.end().compareTo(that.range.end());
-            if (c == 0) return 
this.syncPoint.syncId.compareTo(that.syncPoint.syncId);
-            return c;
+            return pending + "@" + range;
         }
     }
 
-    private static class OverlapsForPruning
+    enum Status { QUEUED, ACTIVE, COMPLETING, RESTARTING, ABANDONED, DONE }
+
+    static class Pending extends IntrusivePriorityHeap.Node
     {
-        final SyncPoint<Range> syncPoint;
-        final NavigableMap<TxnId, SyncPoint<Range>> overlaps = new TreeMap<>();
+        final @Nullable DurabilityRequest request;
+        PartialSyncPoint syncPoint;
+        Object[] byRange;
+
+        long startAt;
+        int attempt = 1;
+        Status status = QUEUED;
+
+        DurabilityResult achieved;
+        List<Pending> deferred;
+        Pending parent;
 
-        private OverlapsForPruning(SyncPoint<Range> syncPoint)
+        Pending(SyncPoint syncPoint, @Nullable DurabilityRequest request, long 
startAt)
         {
             this.syncPoint = syncPoint;
+            this.request = request;
+            this.startAt = startAt;
+            updateByRange();
         }
 
-        void add(SyncPoint<Range> overlap)
+        void updateByRange()
         {
-            overlaps.put(overlap.syncId, overlap);
+            List<PendingRange> pendingRanges = new 
ArrayList<>(syncPoint.route.size());
+            for (Range range : syncPoint.route)
+                pendingRanges.add(new PendingRange(range, this));
+            this.byRange = IntervalBTree.build(pendingRanges, BY_RANGE);
         }
-    }
 
-    private synchronized void prune()
-    {
-        prunedAt = pending.size();
-        pendingCounter = 0;
-        List<SortForPruning> sorted = new ArrayList<>();
-        for (Pending p : pending)
+        @Override
+        protected boolean isInHeap()
         {
-            for (Range range : p.syncPoint.route)
-                sorted.add(new SortForPruning(range, p.syncPoint));
+            return super.isInHeap();
         }
-        sorted.sort(SortForPruning::compareTo);
-        Map<TxnId, OverlapsForPruning> overlaps = new TreeMap<>();
-        int i = 0;
-        while (i < sorted.size())
+
+        boolean isAncestor(Pending pending)
         {
-            SortForPruning entry = sorted.get(i);
-            for (int j = i + 1; j < sorted.size() ; ++j)
-            {
-                SortForPruning next = sorted.get(j);
-                if (next.range.start().compareTo(entry.range.end()) >= 0)
-                    break;
+            Pending p = parent;
+            while (p != null && p != pending)
+                p = p.parent;
+            return p != null;
+        }
 
-                overlaps.computeIfAbsent(next.syncPoint.syncId, ignore -> new 
OverlapsForPruning(next.syncPoint)).add(entry.syncPoint);
-                overlaps.computeIfAbsent(entry.syncPoint.syncId, ignore -> new 
OverlapsForPruning(entry.syncPoint)).add(next.syncPoint);
-            }
-            ++i;
+        @Override
+        public String toString()
+        {
+            return syncPoint.syncId.toString() + ":" + status;
         }
 
-        Set<TxnId> remove = new HashSet<>();
-        for (OverlapsForPruning e : overlaps.values())
+        private void addDeferred(Pending add)
         {
-            SyncPoint<Range> syncPoint = e.syncPoint;
-            Ranges supersedes = 
e.overlaps.tailMap(syncPoint.syncId).values().stream().map(s -> 
s.route.toRanges()).reduce(Ranges.EMPTY, Ranges::with);
-            Ranges superseding = 
e.overlaps.headMap(syncPoint.syncId).values().stream().map(s -> 
s.route.toRanges()).reduce(Ranges.EMPTY, Ranges::with);
-            if (superseding.containsAll(syncPoint.route) && 
supersedes.containsAll(syncPoint.route))
-                remove.add(syncPoint.syncId);
+            Invariants.require(add.parent == null);
+            Invariants.require(status.compareTo(ACTIVE) <= 0);
+            Invariants.require(BY_PRIORITY.compare(this, add) < 0);
+            Invariants.require(deferred == null || !deferred.contains(add));
+            Invariants.require(!isAncestor(add));
+            if (deferred == null)
+                deferred = new ArrayList<>();
+            deferred.add(add);
+            add.parent = this;
         }
 
+        private void removeDeferred(Pending remove)
+        {
+            Invariants.require(remove.status == QUEUED);
+            Invariants.require(remove.parent == this);
+            remove.parent = null;
+            boolean removed = deferred.remove(remove);
+            Invariants.require(removed);
+            if (deferred.isEmpty())
+                deferred = null;
+        }
 
-        logger.info("Pruned {} sync points awaiting durability to {}", 
pending.size(), pending.size() - remove.size());
-        if (!remove.isEmpty())
+        private void unsetParent()
         {
-            List<Pending> newPending = new ArrayList<>(pending.size());
-            for (Pending p : pending)
-            {
-                if (!remove.contains(p.syncPoint.syncId))
-                    newPending.add(p);
-            }
-            pending.clear();
-            pending.addAll(newPending);
+            Invariants.require(parent.deferred == null);
+            parent = null;
         }
     }
 
-    private void start(SyncPoint<Range> exclusiveSyncPoint, @Nullable 
DurabilityRequest request, int attempt, SequentialAsyncExecutor executor)
+    // TODO (desired): prioritise by least recently updated range
+    static final class PendingQueue extends IntrusivePriorityHeap<Pending>
     {
-        logger.debug("{}: Awaiting durability for {}", 
exclusiveSyncPoint.syncId, exclusiveSyncPoint.route.toRanges());
-        DurabilityResults coordinate = coordinateIncluding(node, 
exclusiveSyncPoint, executor, attempt);
-        registerInProgress(exclusiveSyncPoint, coordinate);
+        final Comparator<Pending> comparator;
+
+        PendingQueue(Comparator<Pending> comparator)
+        {
+            this.comparator = comparator;
+        }
+
+        Pending poll() { heapify(); return super.pollNode(); }
+        Pending peek() { heapify(); return super.peekNode(); }
+        @Override public int compare(Pending o1, Pending o2) { return 
comparator.compare(o1, o2); }
+        @Override protected void append(Pending node) { super.append(node); }
+        @Override protected void remove(Pending node) { super.remove(node); }
+        @Override protected void clear() { super.clear(); }
+    }
+
+    private final Adapter adapter;
+    private int maxConcurrency = 16;
+    private int overlapPruneThreshold = 8;
+    private int globalPruneThreshold = 256;
+
+    int activeCount;
+    int pendingCount;
+    private final PendingQueue queued = new 
PendingQueue(Comparator.comparing(p -> p.startAt));
+    private Object[] pendingByRange = IntervalBTree.empty();
+    private final ArrayDeque<Pending> requeue = new ArrayDeque<>();
+
+    private long processingQueueAt = Long.MAX_VALUE;
+    private Cancellable cancelProcessingQueue;
+
+    public DurabilityQueue(Node node)
+    {
+        this.adapter = new NodeAdapter(node);
+    }
+
+    public DurabilityQueue(Adapter adapter)
+    {
+        this.adapter = adapter;
+    }
+
+    void submit(SyncPoint syncPoint, @Nullable DurabilityRequest request)
+    {
+        long nowMicros = adapter.elapsed(MICROSECONDS);
         if (request != null)
-            request.reportAttempt(exclusiveSyncPoint.syncId, 
node.elapsed(MICROSECONDS));
+            request.register(syncPoint.syncId, nowMicros);
 
-        coordinate.onQuorum().invoke((success, fail) -> {
-            synchronized (this)
-            {
-                unregisterInProgress(exclusiveSyncPoint, coordinate);
-                maybeSubmitPending();
-            }
-        });
-        coordinate.onDone().invoke((success, fail) -> {
-            TxnId txnId = exclusiveSyncPoint.syncId;
-            Ranges ranges = exclusiveSyncPoint.route.toRanges();
+        Pending pending = new Pending(syncPoint, request, nowMicros);
+        synchronized (this)
+        {
+            register(pending);
+            enqueue(pending);
+        }
+    }
 
-            String requestor = null;
-            boolean isDone;
-            if (request == null)
+    private void enqueue(Pending enqueue)
+    {
+        while (enqueue != null)
+        {
+            Invariants.require(enqueue.status == QUEUED);
+            Invariants.require(enqueue.parent == null);
+            if (enqueue.achieved != null && enqueue.request == null && 
enqueue.achieved.min.remote.compareTo(Quorum) >= 0 && fullySuperseded(enqueue))
             {
-                 isDone = success != null && success.achievedRemote == All;
+                abandon(enqueue, true, false);
+                logger.debug("{}: already achieved quorum durability and there 
are superseding durability requests pending; not retrying", 
enqueue.syncPoint.syncId);
+                enqueue = requeue.poll();
+                continue;
             }
-            else
+
+            Conflicts conflicts = conflicts(enqueue);
+            Pending next = conflicts.next;
+            if (next != null && next != enqueue)
             {
-                requestor = " requested by " + request.requestedBy;
-                isDone = request.isDone(ranges);
-                if (!isDone && request.including != null)
+                if (BY_PRIORITY.compare(next, enqueue) <= 0)
                 {
-                    Topology topology = node.topology().current();
-                    SortedArrayList<Node.Id> removed = 
topology.removedIds().intersecting(request.including);
-                    SortedArrayList<Node.Id> hardRemoved = 
topology.hardRemovedIds().intersecting(request.including);
-                    if (!removed.isEmpty() || !hardRemoved.isEmpty())
+                    next.addDeferred(enqueue);
+                    enqueue = requeue.poll();
+                    continue;
+                }
+
+                Invariants.require(next.status == QUEUED);
+                List<Pending> addDeferred = next.deferred;
+                next.deferred = null;
+                if (addDeferred != null)
+                {
+                    setParent(enqueue.deferred, enqueue);
+                    if (!enqueue.syncPoint.route.containsAll((AbstractRanges) 
next.syncPoint.route))
+                    {
+                        int count = 0;
+                        for (int i = 0 ; i < addDeferred.size() ; ++i)
+                        {
+                            Pending p = addDeferred.get(i);
+                            if (p.syncPoint.route.intersects((AbstractRanges) 
enqueue.syncPoint.route))
+                                addDeferred.set(count++, p);
+                            else
+                                requeue(p);
+                        }
+
+                        if (count == 0) addDeferred = null;
+                        else truncate(addDeferred, count);
+                    }
+                    if (addDeferred != null)
                     {
-                        String message = String.format("%s: Cannot achieve 
durability requested by %s as (%s/%s) are (removed/hard removed)", 
exclusiveSyncPoint.syncId, request, removed, hardRemoved);
-                        logger.info(message);
-                        node.durability().unregister(request);
-                        // TODO (desired): more specific exception?
-                        request.result.tryFailure(new 
RuntimeException(message));
-                        return;
+                        if (enqueue.deferred == null) enqueue.deferred = 
addDeferred;
+                        else enqueue.deferred.addAll(addDeferred);
                     }
                 }
+                if (next.parent != null)
+                    next.parent.removeDeferred(next);
+                enqueue.addDeferred(next);
+                if (next.isInHeap())
+                    queued.remove(next);
             }
-            if (fail != null)
+
+            queued.append(enqueue);
+            maybePrune(conflicts.supersedes);
+            enqueue = requeue.poll();
+        }
+
+        processQueue();
+    }
+
+    private void maybePrune(List<Pending> superseded)
+    {
+        if (!shouldPrune(superseded))
+            return;
+
+        // first remove any that have already achieved quorum and have no 
associated request, or where the request has been finished
+        for (Pending p : superseded)
+        {
+            if (p.status != QUEUED) continue;
+            if (p.request != null && !p.request.isDone()) continue;
+            if (p.request == null && (p.achieved == null || 
p.achieved.min.remote.compareTo(Quorum) < 0)) continue;
+
+            if (p.parent != null)
+                p.parent.removeDeferred(p);
+            abandon(p, true, true);
+            return;
+        }
+
+        if (shouldPrune(superseded))
+        {
+            superseded.sort(BY_ID);
+            int middle = (superseded.size() - 1)/2;
+            int last = superseded.size() - 1;
+            for (int i = 0 ; i <= last ; ++i)
             {
-                if (logger.isTraceEnabled()) logger.trace("{}: failed awaiting 
durability for {}{}.", txnId, ranges, requestor, fail);
-                if (fail instanceof SyncPointErased || fail instanceof 
TopologyRetiredException)
-                {
-                    if (isDone)
-                        return;
-
-                    // we can't succeed. if this was requested, and the 
request is still waiting, submit another coordination request
-                    // TODO (required): expand this to all unknown exception 
outcomes
-                    if (request != null)
-                        restart(exclusiveSyncPoint, request, attempt + 1);
-                    return;
-                }
+                int position = (i & 1) == 0 ? middle - i/2 : middle + (i+1)/2;
+                Pending remove = superseded.get(position);
+                if (remove.status != QUEUED) continue;
+                if (remove.request != null) continue;
+                if (remove.parent != null)
+                    remove.parent.removeDeferred(remove);
+                abandon(remove, true, true);
+                return;
             }
-            if (!isDone)
+        }
+    }
+
+    private boolean shouldPrune(List<Pending> conflicts)
+    {
+        return conflicts.size() > overlapPruneThreshold || pendingCount > 
globalPruneThreshold;
+    }
+
+    static class Conflicts
+    {
+        final Pending next;
+        final List<Pending> supersedes;
+
+        Conflicts(Pending next, List<Pending> superseded)
+        {
+            this.next = next;
+            this.supersedes = superseded;
+        }
+    }
+    private Conflicts conflicts(Pending submit)
+    {
+        List<Pending> supersedes = new ArrayList<>();
+        Pending next = null;
+        for (PendingRange range : BTree.<PendingRange>iterable(submit.byRange))
+        {
+            next = IntervalBTree.accumulate(pendingByRange, BY_RANGE, range, 
(sup, sub, pr, nxt) -> {
+                Pending test = pr.pending;
+                if (test.status == QUEUED && BY_ID.compare(test, sub) < 0 && 
sub.syncPoint.route.containsAll((AbstractRanges) test.syncPoint.route))
+                    sup.add(test);
+
+                if (test.status.compareTo(ACTIVE) > 0)
+                    return nxt;
+
+                if (nxt == null)
+                    return test;
+                return BY_PRIORITY.compare(nxt, test) <= 0 ? nxt : test;
+            }, supersedes, submit, next);
+        }
+        return new Conflicts(next, supersedes);
+    }
+
+    private Pending minConflict(List<Pending> conflicts)
+    {
+        if (conflicts.isEmpty())
+            return null;
+        Pending min = conflicts.get(0);
+        for (int i = 1 ; i < conflicts.size() ; ++i)
+        {
+            if (BY_PRIORITY.compare(min, conflicts.get(i)) > 0)
+                min = conflicts.get(i);
+        }
+        return min;
+    }
+
+    private boolean fullySuperseded(Pending submit)
+    {
+        for (PendingRange range : BTree.<PendingRange>iterable(submit.byRange))
+        {
+            Ranges missing = IntervalBTree.accumulate(pendingByRange, 
BY_RANGE, range, (s, id, pr, rs) -> {
+                Pending v = pr.pending;
+                if (v.status.compareTo(ACTIVE) > 0) return rs;
+                if (v.syncPoint.syncId.compareTo(id) < 0) return rs;
+                if (s.request != null && s.request.kind == VisibilitySyncPoint 
&& (v.request == null || v.request.kind != VisibilitySyncPoint)) return rs;
+                if (!rs.contains(pr.range)) return rs;
+                return rs.without(Ranges.of(pr.range));
+            }, submit, submit.syncPoint.syncId, Ranges.of(range.range));
+
+            if (!missing.isEmpty())
+                return false;
+        }
+        return true;
+    }
+
+    private void register(Pending pending)
+    {
+        ++pendingCount;
+        pendingByRange = IntervalBTree.update(pendingByRange, pending.byRange, 
BY_RANGE);
+    }
+
+    private void unregister(Pending pending)
+    {
+        Invariants.require(pending.deferred == null);
+        Invariants.require(!pending.isInHeap());
+        --pendingCount;
+        pendingByRange = IntervalBTree.subtract(pendingByRange, 
pending.byRange, BY_RANGE);
+    }
+
+    private void start(Pending pending)
+    {
+        logger.debug("{}: Awaiting durability for {}", 
pending.syncPoint.syncId, pending.syncPoint.route.toRanges());
+        DurabilityResults coordinate = adapter.execute(pending.syncPoint, 
pending.attempt);
+
+        long startedAt = pending.startAt;
+        Invariants.require(pending.status == QUEUED);
+        pending.status = ACTIVE;
+        ++activeCount;
+        if (pending.request != null)
+            pending.request.reportAttempt(pending.syncPoint.syncId, 
adapter.elapsed(MICROSECONDS));
+
+        coordinate.onQuorumOrDone().invoke((success, fail) -> 
completing(pending, startedAt));
+        coordinate.onDone().invoke((success, fail) -> {
+            try
             {
+                completing(pending, startedAt); // should already have been 
done by onQuorum, but to avoid future surprises if invoked out of order
+
+                TxnId txnId = pending.syncPoint.syncId;
+                Ranges ranges = pending.syncPoint.route.toRanges();
+                DurabilityRequest request = pending.request;
                 if (success != null)
-                    fail = success.failure;
+                {
+                    if (pending.achieved == null) pending.achieved = success;
+                    else pending.achieved = pending.achieved.max(success);
+                }
 
-                if (fail instanceof Exhausted || success != null)
+                String requestor = null;
+                Ranges achieved;
+                boolean isDone;
+                if (request == null)
                 {
-                    Collection<Node.Id> failedNodes = success != null ? 
success.excluding : ((Exhausted)fail).failedNodes();
-                    Ranges failedRanges = success != null ? 
exclusiveSyncPoint.route().toRanges() : ((Exhausted)fail).failedRanges();
-                    boolean log = failedNodes == null;
-                    if (!log)
-                    {
-                        Set<Node.Id> unlogged = new HashSet<>(failedNodes);
-                        for (Collection<Node.Id> logged : 
WARNINGS_LOGGED.tailMap(System.nanoTime() - 
MINUTES.toNanos(EXHAUSTED_LOG_INTERVAL_MINUTES)).values())
-                            unlogged.removeAll(logged);
-                        log = !unlogged.isEmpty();
-                    }
-                    if (log)
+                    if (success == null) achieved = Ranges.EMPTY;
+                    else if (success.min.remote == All) achieved = ranges;
+                    else achieved = success.achieved.foldlWithBounds((p, r, s, 
e) -> p.remote == All ? r.with(Ranges.of(s.rangeFactory().newRange(s, e))) : r, 
Ranges.EMPTY, ignore -> false);
+                    isDone = achieved.containsAll(ranges);
+                }
+                else
+                {
+                    requestor = " requested by " + request.requestedBy;
+                    isDone = request.isDone(ranges);
+                    achieved = request.achieved().slice(ranges, Minimal);
+                    if (!isDone && request.require.including != null)
                     {
-                        logger.info("{}: Incomplete durability for {}{}. {} 
were unsuccessful.", txnId, failedRanges, requestor, failedNodes == null ? 
"some nodes" : failedNodes);
-                        WARNINGS_LOGGED.headMap(System.nanoTime() - 
MINUTES.toNanos(EXHAUSTED_LOG_INTERVAL_MINUTES)).clear();
-                        WARNINGS_LOGGED.put(System.nanoTime(), failedNodes == 
null ? Collections.emptyList() : failedNodes);
+                        Topology topology = adapter.currentTopology();
+                        SortedArrayList<Node.Id> removed = 
topology.removedIds().intersecting(request.require.including);
+                        SortedArrayList<Node.Id> hardRemoved = 
topology.hardRemovedIds().intersecting(request.require.including);
+                        if (!removed.isEmpty() || !hardRemoved.isEmpty())
+                        {
+                            abandon(pending, false, false);
+                            String message = String.format("%s: Cannot achieve 
durability requested by %s as (%s/%s) are (removed/hard removed)", txnId, 
request, removed, hardRemoved);
+                            logger.info(message);
+                            // TODO (desired): more specific exception?
+                            request.result.tryFailure(new 
RuntimeException(message));
+                            adapter.unregister(request);
+                            return;
+                        }
                     }
                 }
-                else
+
+                if (isDone)
                 {
-                    if (fail instanceof Timeout) logger.info("{}: Timeout 
awaiting durability for {}{}", txnId, ranges, requestor, fail);
-                    else if (fail != null) logger.info("{}: Failed awaiting 
durability for {}{}; will retry", txnId, ranges, requestor, fail);
+                    done(pending);
                 }
+                else
+                {
+                    if (success != null)
+                        fail = success.failure;
+
+                    if (fail != null)
+                    {
+                        if (logger.isTraceEnabled()) logger.trace("{}: failed 
awaiting durability for {}{}.", txnId, ranges, requestor, fail);
+                        if (fail instanceof SyncPointErased || fail instanceof 
TopologyRetiredException)
+                        {
+                            if (request == null) abandon(pending, false, 
false);
+                            else restart(pending);
+                            return;
+                        }
+                    }
+
+                    if (fail instanceof Exhausted || success != null)
+                    {
+                        Collection<Node.Id> failedNodes = success != null ? 
success.min.excluding : ((Exhausted)fail).failedNodes();
+                        Ranges failedRanges = success != null ? ranges : 
((Exhausted)fail).failedRanges();
+                        boolean log = failedNodes == null;
+                        if (!log)
+                        {
+                            Set<Node.Id> unlogged = new HashSet<>(failedNodes);
+                            for (Collection<Node.Id> logged : 
WARNINGS_LOGGED.tailMap(System.nanoTime() - 
MINUTES.toNanos(EXHAUSTED_LOG_INTERVAL_MINUTES)).values())
+                                unlogged.removeAll(logged);
+                            log = !unlogged.isEmpty();
+                        }
+                        if (log)
+                        {
+                            logger.info("{}: Incomplete durability for {}{}. 
{} were unsuccessful.", txnId, failedRanges, requestor, failedNodes == null ? 
"some nodes" : failedNodes);
+                            WARNINGS_LOGGED.headMap(System.nanoTime() - 
MINUTES.toNanos(EXHAUSTED_LOG_INTERVAL_MINUTES)).clear();
+                            WARNINGS_LOGGED.put(System.nanoTime(), failedNodes 
== null ? Collections.emptyList() : failedNodes);
+                        }
+                    }
+                    else
+                    {
+                        if (fail instanceof Timeout) logger.info("{}: Timeout 
awaiting durability for {}{}", txnId, ranges, requestor, fail);
+                        else if (fail != null) logger.info("{}: Failed 
awaiting durability for {}{}; will retry", txnId, ranges, requestor, fail);
+                    }
+
+                    if (achieved.intersects(ranges))
+                    {
+                        unregister(pending);
+                        pending.syncPoint = 
pending.syncPoint.without(achieved);
+                        pending.updateByRange();
+                        register(pending);
+                    }
 
-                retry(exclusiveSyncPoint, request, attempt + 1);
+                    retry(pending);
+                }
             }
-            else if (request == null)
+            finally
             {
-                // if request != null, the request will log the necessary 
information
-                logger.debug("{}: Successfully achieved durability for {}.", 
txnId, ranges);
+                synchronized (this)

Review Comment:
   lambdas don't have a `this` qualifier, but I can add this if it helps



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to