This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1e508d34 CEP-15: Simplify handling of Insufficient replies from Commit 
and Apply
1e508d34 is described below

commit 1e508d340935fef496f58606a14717bed59e8af4
Author: Aleksey Yeschenko <alek...@apache.org>
AuthorDate: Fri Oct 13 15:47:32 2023 +0100

    CEP-15: Simplify handling of Insufficient replies from Commit and Apply
    
    patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
    CASSANDRA-18928
---
 .../src/main/java/accord/coordinate/Persist.java   |  22 ++---
 .../src/main/java/accord/messages/Commit.java      |  15 ---
 .../src/main/java/accord/messages/Defer.java       | 107 ---------------------
 3 files changed, 11 insertions(+), 133 deletions(-)

diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java 
b/accord-core/src/main/java/accord/coordinate/Persist.java
index 0607722f..9a132683 100644
--- a/accord-core/src/main/java/accord/coordinate/Persist.java
+++ b/accord-core/src/main/java/accord/coordinate/Persist.java
@@ -28,7 +28,6 @@ import accord.local.Node.Id;
 import accord.messages.Apply;
 import accord.messages.Apply.ApplyReply;
 import accord.messages.Callback;
-import accord.messages.Commit;
 import accord.messages.InformDurable;
 import accord.primitives.*;
 import accord.topology.Topologies;
@@ -37,7 +36,6 @@ import static 
accord.coordinate.tracking.RequestStatus.Success;
 import static accord.local.Status.Durability.Majority;
 import static accord.messages.Apply.executes;
 import static accord.messages.Apply.participates;
-import static accord.messages.Commit.Kind.Maximal;
 
 public class Persist implements Callback<ApplyReply>
 {
@@ -47,6 +45,8 @@ public class Persist implements Callback<ApplyReply>
     final Txn txn;
     final Timestamp executeAt;
     final Deps deps;
+    final Writes writes;
+    final Result result;
     final QuorumTracker tracker;
     final Set<Id> persistedOn;
     boolean isDone;
@@ -60,7 +60,7 @@ public class Persist implements Callback<ApplyReply>
     public static void persist(Node node, Topologies executes, TxnId txnId, 
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, 
Result result)
     {
         Topologies participates = participates(node, route, txnId, executeAt, 
executes);
-        Persist persist = new Persist(node, executes, txnId, route, txn, 
executeAt, deps);
+        Persist persist = new Persist(node, executes, txnId, route, txn, 
executeAt, deps, writes, result);
         node.send(participates.nodes(), to -> Apply.applyMinimal(to, 
participates, executes, txnId, route, txn, executeAt, deps, writes, result), 
persist);
     }
 
@@ -68,7 +68,7 @@ public class Persist implements Callback<ApplyReply>
     {
         Topologies executes = executes(node, route, executeAt);
         Topologies participates = participates(node, route, txnId, executeAt, 
executes);
-        Persist persist = new Persist(node, participates, txnId, route, txn, 
executeAt, deps);
+        Persist persist = new Persist(node, participates, txnId, route, txn, 
executeAt, deps, writes, result);
         node.send(participates.nodes(), to -> Apply.applyMaximal(to, 
participates, executes, txnId, route, txn, executeAt, deps, writes, result), 
persist);
     }
 
@@ -76,19 +76,21 @@ public class Persist implements Callback<ApplyReply>
     {
         Topologies executes = executes(node, sendTo, executeAt);
         Topologies participates = participates(node, sendTo, txnId, executeAt, 
executes);
-        Persist persist = new Persist(node, participates, txnId, route, txn, 
executeAt, deps);
+        Persist persist = new Persist(node, participates, txnId, route, txn, 
executeAt, deps, writes, result);
         node.send(participates.nodes(), to -> Apply.applyMaximal(to, 
participates, executes, txnId, route, txn, executeAt, deps, writes, result), 
persist);
     }
 
-    private Persist(Node node, Topologies topologies, TxnId txnId, 
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps)
+    private Persist(Node node, Topologies topologies, TxnId txnId, 
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, 
Result result)
     {
         this.node = node;
         this.txnId = txnId;
+        this.route = route;
         this.txn = txn;
+        this.executeAt = executeAt;
         this.deps = deps;
-        this.route = route;
+        this.writes = writes;
+        this.result = result;
         this.tracker = new QuorumTracker(topologies);
-        this.executeAt = executeAt;
         this.persistedOn = new HashSet<>();
     }
 
@@ -112,9 +114,7 @@ public class Persist implements Callback<ApplyReply>
                 }
                 break;
             case Insufficient:
-                Topologies topologies = node.topology().preciseEpochs(route, 
txnId.epoch(), executeAt.epoch());
-                // TODO (easy, cleanup): use static method in Commit
-                node.send(from, new Commit(Maximal, from, 
topologies.forEpoch(txnId.epoch()), topologies, txnId, txn, route, null, 
executeAt, deps, false));
+                Apply.sendMaximal(node, from, txnId, route, txn, executeAt, 
deps, writes, result);
         }
     }
 
diff --git a/accord-core/src/main/java/accord/messages/Commit.java 
b/accord-core/src/main/java/accord/messages/Commit.java
index 9d692e6a..97e67cc8 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -50,9 +50,6 @@ import accord.topology.Topologies;
 import accord.topology.Topology;
 import accord.utils.Invariants;
 
-import static accord.local.Status.Committed;
-import static accord.local.Status.Known.DefinitionOnly;
-
 public class Commit extends TxnRequest<ReadNack>
 {
     private static final Logger logger = LoggerFactory.getLogger(Commit.class);
@@ -72,8 +69,6 @@ public class Commit extends TxnRequest<ReadNack>
     public final @Nullable FullRoute<?> route;
     public final ReadTxnData read;
 
-    private transient Defer defer;
-
     public enum Kind { Minimal, Maximal }
 
     // TODO (low priority, clarity): cleanup passing of topologies here - 
maybe fetch them afresh from Node?
@@ -177,12 +172,7 @@ public class Commit extends TxnRequest<ReadNack>
             case Success:
             case Redundant:
                 return null;
-
             case Insufficient:
-                
Invariants.checkState(!safeCommand.current().known().isDefinitionKnown());
-                if (defer == null)
-                    defer = new Defer(DefinitionOnly, Committed.minKnown, 
Commit.this);
-                defer.add(safeStore, safeCommand, safeStore.commandStore());
                 return ReadNack.NotCommitted;
         }
     }
@@ -200,11 +190,6 @@ public class Commit extends TxnRequest<ReadNack>
             node.reply(replyTo, replyContext, reply, failure);
         else if (read != null)
             read.process(node, replyTo, replyContext);
-        if (defer != null)
-        {
-            defer.ack();
-            defer = null;
-        }
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/messages/Defer.java 
b/accord-core/src/main/java/accord/messages/Defer.java
deleted file mode 100644
index acb80fe0..00000000
--- a/accord-core/src/main/java/accord/messages/Defer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.messages;
-
-import java.util.function.Function;
-
-import accord.local.*;
-import accord.local.Status.Known;
-import accord.primitives.TxnId;
-import accord.utils.Invariants;
-import org.agrona.collections.IntHashSet;
-
-import static accord.messages.Defer.Ready.Expired;
-import static accord.messages.Defer.Ready.No;
-import static accord.messages.Defer.Ready.Yes;
-
-class Defer implements Command.TransientListener
-{
-    public enum Ready { No, Yes, Expired }
-
-    final Function<Command, Ready> waitUntil;
-    final TxnRequest<?> request;
-    final IntHashSet waitingOn = new IntHashSet();
-    int waitingOnCount;
-    boolean isDone;
-
-    Defer(Known waitUntil, Known expireAt, TxnRequest<?> request)
-    {
-        this(command -> {
-            if (!waitUntil.isSatisfiedBy(command.known()))
-                return No;
-            if (expireAt.isSatisfiedBy(command.known()))
-                return Expired;
-            return Yes;
-        }, request);
-    }
-
-    Defer(Function<Command, Ready> waitUntil, TxnRequest<?> request)
-    {
-        this.waitUntil = waitUntil;
-        this.request = request;
-    }
-
-    synchronized void add(SafeCommandStore safeStore, SafeCommand safeCommand, 
CommandStore commandStore)
-    {
-        if (isDone)
-            throw new IllegalStateException("Recurrent retry of " + request);
-
-        waitingOn.add(commandStore.id());
-        ++waitingOnCount;
-        safeCommand.addListener(this);
-    }
-
-    @Override
-    public synchronized void onChange(SafeCommandStore safeStore, SafeCommand 
safeCommand)
-    {
-        Command command = safeCommand.current();
-        Ready ready = waitUntil.apply(command);
-        if (ready == No) return;
-
-        if (!safeCommand.removeListener(this))
-            return;
-
-        if (ready == Expired) return;
-
-        int id = safeStore.commandStore().id();
-        // TODO (desired): it would be nice at least for transient listener 
lists to annotate that they are notifying a listener, to avoid redundant 
invocations
-        //    we can then impose this as an invariant check rather than an 
early abort
-        Invariants.checkState(waitingOn.contains(id));
-        waitingOn.remove(id);
-
-        ack();
-    }
-
-    synchronized void ack()
-    {
-        if (-1 == --waitingOnCount)
-        {
-            isDone = true;
-            request.process();
-        }
-    }
-
-    @Override
-    public PreLoadContext listenerPreLoadContext(TxnId caller)
-    {
-        Invariants.checkState(caller.equals(request.txnId));
-        return request;
-    }
-}
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to