dcapwell commented on code in PR #50:
URL: https://github.com/apache/cassandra-accord/pull/50#discussion_r1240434401


##########
accord-core/src/main/java/accord/local/DurableBefore.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import java.util.function.BiFunction;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import accord.api.RoutingKey;
+import accord.local.Status.Durability;
+import accord.primitives.Participants;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.primitives.Routables;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import accord.utils.ReducingIntervalMap;
+import accord.utils.ReducingRangeMap;
+
+import static accord.local.Status.Durability.Majority;
+import static accord.local.Status.Durability.NotDurable;
+import static accord.local.Status.Durability.Universal;
+
+public class DurableBefore extends ReducingRangeMap<DurableBefore.Entry>
+{
+    public static class SerializerSupport
+    {
+        public static DurableBefore create(boolean inclusiveEnds, RoutingKey[] 
ends, Entry[] values)
+        {
+            return new DurableBefore(inclusiveEnds, ends, values);
+        }
+    }
+
+    public static class Entry
+    {
+        public final @Nonnull TxnId majorityBefore, universalBefore;
+
+        public Entry(@Nonnull TxnId majority, @Nonnull TxnId universalBefore)
+        {
+            Invariants.checkArgument(majority.compareTo(universalBefore) >= 0);

Review Comment:
   ```suggestion
               Invariants.checkArgument(majority.compareTo(universalBefore) >= 
0, "majority %s < universal %s", majority, universalBefore);
   ```



##########
accord-core/src/main/java/accord/messages/Accept.java:
##########
@@ -241,10 +239,12 @@ public void process()
         @Override
         public AcceptReply apply(SafeCommandStore safeStore)
         {
-            SafeCommand safeCommand = safeStore.command(txnId);
+            SafeCommand safeCommand = safeStore.get(txnId, someKey);
             switch (Commands.acceptInvalidate(safeStore, safeCommand, ballot))
             {
                 default:

Review Comment:
   default is truncated?  maybe we should fail?
   
   ```suggestion
                   default:  throw new IllegalArgumentException("Unknown 
status: " + outcome);
   ```



##########
accord-core/src/main/java/accord/messages/ReadTxnData.java:
##########
@@ -218,6 +232,14 @@ void maybeRead(SafeCommandStore safeStore, SafeCommand 
safeCommand)
         }
     }
 
+    @Override
+    protected synchronized void readComplete(CommandStore commandStore, 
@Nullable Data result, @Nullable Ranges unavailable)
+    {
+        // TODO (expected): lots of undesirable costs associated with the 
obsoletion tracker
+//        commandStore.execute(contextFor(txnId), safeStore -> 
safeStore.command(txnId).removeListener(obsoleteTracker));

Review Comment:
   can you remove this comment?



##########
accord-core/src/main/java/accord/coordinate/Recover.java:
##########
@@ -211,22 +214,27 @@ private void recover()
             switch (acceptOrCommit.status)
             {
                 default: throw new IllegalStateException();

Review Comment:
   ```suggestion
                   default: throw new IllegalStateException("Unknown status: " 
+ acceptOrCommit.status);
   ```



##########
accord-core/src/main/java/accord/coordinate/CoordinateGloballyDurable.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.coordinate.tracking.QuorumTracker;
+import accord.coordinate.tracking.RequestStatus;
+import accord.local.Node;
+import accord.local.DurableBefore;
+import accord.messages.Callback;
+import accord.messages.QueryDurableBefore;
+import accord.messages.SetGloballyDurable;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults.SettableResult;
+
+/**
+ *

Review Comment:
   add comment or remove



##########
accord-core/src/test/java/accord/topology/TopologyRandomizer.java:
##########
@@ -288,6 +294,70 @@ public synchronized void maybeUpdateTopology() {
         updateTopology();
     }
 
+    public synchronized void rangeDurable()
+    {
+        Invariants.checkState(nodeLookup != null);

Review Comment:
   ```suggestion
           Invariants.nonNull(nodeLookup);
   ```



##########
accord-core/src/main/java/accord/local/SafeCommandStore.java:
##########
@@ -137,33 +115,117 @@ public static TestKind shouldHaveWitnessed(Kind kind)
 
     }
 
+    /**
+     * If the transaction exists (with some associated data) in the 
CommandStore, return it. Otherwise return null.
+     *
+     * This is useful for operations that do not retain a route, but do expect 
to operate on existing local state;
+     * this guards against recreating a previously truncated command when we 
do not otherwise have enough information
+     * to prevent it.
+     */
+    public @Nullable SafeCommand ifInitialised(TxnId txnId)
+    {
+        SafeCommand safeCommand = get(txnId);
+        Command command = safeCommand.current();
+        if (command.saveStatus() == Uninitialised)
+            return null;
+        return maybeTruncate(safeCommand, command);
+    }
+
+    public SafeCommand get(TxnId txnId, RoutingKey unseekable)
+    {
+        SafeCommand safeCommand = get(txnId);
+        Command command = safeCommand.current();
+        if (command.saveStatus() == Uninitialised)
+        {
+            if (commandStore().durableBefore().isUniversal(txnId, unseekable))
+                return new TruncatedSafeCommand(txnId);
+        }
+        return maybeTruncate(safeCommand, command);
+    }
+
+    // decidedExecuteAt == null if not yet PreCommitted
+
+    /**
+     * Retrieve a SafeCommand. If it is initialised, optionally use its 
present contents to determine if it should be
+     * truncated, and apply the truncation before returning the command.
+     * This behaviour may be overridden by implementations if they know any 
truncation would already have been applied.
+     *
+     * If it is not initialised, use the provided parameters to determine if 
the record may have been expunged;
+     * if not, create it.
+     *
+     * We do not distinguish between participants, home keys, and 
non-participating home keys for now, even though
+     * these fundamentally have different implications. Logically, we may 
erase a home shard's record as soon as
+     * the transaction has been made durable at a majority of replicas of 
every shard, and state for any participating
+     * keys may be erased as soon as their non-faulty peers have recorded the 
outcome.
+     *
+     * However if in some cases we don't know which commands are home keys or 
participants we need to wait to erase
+     * a transaction until both of these criteria are met for every key.
+     *
+     * TODO (desired): Introduce static types that permit us to propagate this 
information safely.
+     */
+    public SafeCommand get(TxnId txnId, Unseekables<?> unseekables)

Review Comment:
   can you move these 2 methods next to the other `get` method?



##########
accord-core/src/test/java/accord/topology/TopologyRandomizer.java:
##########
@@ -288,6 +294,70 @@ public synchronized void maybeUpdateTopology() {
         updateTopology();
     }
 
+    public synchronized void rangeDurable()
+    {
+        Invariants.checkState(nodeLookup != null);
+        Topology current = epochs.get(epochs.size() - 1);
+        List<Node.Id> nodes = new ArrayList<>(current.nodes());
+        Node.Id nodeId = nodes.get(random.nextInt(nodes.size()));
+        Node node = nodeLookup.apply(nodeId);
+        Ranges ranges = selectRanges(current.forNode(nodeId).ranges);
+        CoordinateSyncPoint.exclusive(node, ranges)
+                           .addCallback((success, fail) -> {
+                            if (success != null)
+                                coordinateDurable(node, success);
+        });
+    }
+
+    private static void coordinateDurable(Node node, SyncPoint 
exclusiveSyncPoint)
+    {
+        CoordinateShardDurable.coordinate(node, exclusiveSyncPoint, 
Collections.emptySet())
+                              .addCallback((success0, fail0) -> {
+                                  if (fail0 != null) coordinateDurable(node, 
exclusiveSyncPoint);
+                              });
+    }
+
+    public synchronized void globallyDurable()
+    {
+        Invariants.checkState(nodeLookup != null);

Review Comment:
   ```suggestion
           Invariants.nonNull(nodeLookup);
   ```



##########
accord-core/src/main/java/accord/local/DurableBefore.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import java.util.function.BiFunction;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import accord.api.RoutingKey;
+import accord.local.Status.Durability;
+import accord.primitives.Participants;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.primitives.Routables;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import accord.utils.ReducingIntervalMap;
+import accord.utils.ReducingRangeMap;
+
+import static accord.local.Status.Durability.Majority;
+import static accord.local.Status.Durability.NotDurable;
+import static accord.local.Status.Durability.Universal;
+
+public class DurableBefore extends ReducingRangeMap<DurableBefore.Entry>
+{
+    public static class SerializerSupport
+    {
+        public static DurableBefore create(boolean inclusiveEnds, RoutingKey[] 
ends, Entry[] values)
+        {
+            return new DurableBefore(inclusiveEnds, ends, values);
+        }
+    }
+
+    public static class Entry
+    {
+        public final @Nonnull TxnId majorityBefore, universalBefore;
+
+        public Entry(@Nonnull TxnId majority, @Nonnull TxnId universalBefore)
+        {
+            Invariants.checkArgument(majority.compareTo(universalBefore) >= 0);
+            this.majorityBefore = majority;
+            this.universalBefore = universalBefore;
+        }
+
+        private static Entry max(Entry a, Entry b)
+        {
+            return reduce(a, b, TxnId::max);
+        }
+
+        private static Entry min(Entry a, Entry b)
+        {
+            return reduce(a, b, TxnId::min);
+        }
+
+        private static Entry reduce(Entry a, Entry b, BiFunction<TxnId, TxnId, 
TxnId> reduce)
+        {
+            TxnId majority = reduce.apply(a.majorityBefore, b.majorityBefore);
+            TxnId universal = reduce.apply(a.universalBefore, 
b.universalBefore);
+
+            if (majority == a.majorityBefore && universal == a.universalBefore)
+                return a;
+            if (majority.equals(b.majorityBefore) && 
universal.equals(b.universalBefore))
+                return b;
+
+            return new Entry(majority, universal);
+        }
+
+        public Durability get(TxnId txnId)
+        {
+            if (txnId.compareTo(majorityBefore) < 0)
+            {
+                if (txnId.compareTo(universalBefore) < 0)
+                    return Universal;
+                return Majority;

Review Comment:
   ```suggestion
                   return txnId.compareTo(universalBefore) < 0 ? Universal : 
Majority;
   ```



##########
accord-core/src/test/java/accord/topology/TopologyRandomizer.java:
##########
@@ -288,6 +294,70 @@ public synchronized void maybeUpdateTopology() {
         updateTopology();
     }
 
+    public synchronized void rangeDurable()
+    {
+        Invariants.checkState(nodeLookup != null);
+        Topology current = epochs.get(epochs.size() - 1);
+        List<Node.Id> nodes = new ArrayList<>(current.nodes());
+        Node.Id nodeId = nodes.get(random.nextInt(nodes.size()));
+        Node node = nodeLookup.apply(nodeId);
+        Ranges ranges = selectRanges(current.forNode(nodeId).ranges);
+        CoordinateSyncPoint.exclusive(node, ranges)
+                           .addCallback((success, fail) -> {
+                            if (success != null)
+                                coordinateDurable(node, success);
+        });
+    }
+
+    private static void coordinateDurable(Node node, SyncPoint 
exclusiveSyncPoint)
+    {
+        CoordinateShardDurable.coordinate(node, exclusiveSyncPoint, 
Collections.emptySet())
+                              .addCallback((success0, fail0) -> {
+                                  if (fail0 != null) coordinateDurable(node, 
exclusiveSyncPoint);
+                              });
+    }
+
+    public synchronized void globallyDurable()
+    {
+        Invariants.checkState(nodeLookup != null);
+        Topology current = epochs.get(epochs.size() - 1);
+        List<Node.Id> nodes = new ArrayList<>(current.nodes());
+        Node.Id nodeId = nodes.get(random.nextInt(nodes.size()));
+        Node node = nodeLookup.apply(nodeId);
+        long epoch = current.epoch == 1 ? 1 : 1 + 
random.nextInt((int)current.epoch - 1);
+        node.withEpoch(epoch, () -> {
+            node.commandStores().any().execute(() -> 
CoordinateGloballyDurable.coordinate(node, epoch));
+        });
+    }
+
+    private Ranges selectRanges(Ranges ranges)
+    {
+        if (random.nextFloat() < 0.1f)

Review Comment:
   ```suggestion
           if (random.decide(0.1f))
   ```



##########
accord-core/src/test/java/accord/topology/TopologyManagerTest.java:
##########
@@ -191,13 +191,76 @@ void forKeysPartiallySynced()
 
         // no acks, so all epoch 1 shards should be included
         Assertions.assertEquals(topologies(topology2, topology1),
-                                service.withUnsyncedEpochs(keys(150, 
250).toUnseekables(), 2, 2));
+                                service.withUnsyncedEpochs(keys(150, 
250).toParticipants(), 2, 2));
 
         // first topology acked, so only the second shard should be included
-        service.onEpochSyncComplete(id(1), 1);
-        service.onEpochSyncComplete(id(2), 1);
-        Topologies actual = service.withUnsyncedEpochs(keys(150, 
250).toUnseekables(), 2, 2);
+        service.onEpochSyncComplete(id(1), 2);
+        service.onEpochSyncComplete(id(2), 2);
+        Topologies actual = service.withUnsyncedEpochs(keys(150, 
250).toParticipants(), 2, 2);
         Assertions.assertEquals(topologies(topology2, topology(1, 
shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)))),
                                 actual);
     }
+
+    @Test
+    void incompleteTopologyHistory()
+    {
+        Topology topology5 = topology(5,
+                                      shard(range(100, 200), idList(1, 2, 3), 
idSet(1, 2)),
+                                      shard(range(200, 300), idList(4, 5, 6), 
idSet(4, 5)));
+        Topology topology6 = topology(6,
+                                      shard(range(100, 200), idList(1, 2, 3), 
idSet(1, 2)),
+                                      shard(range(200, 300), idList(4, 5, 6), 
idSet(5, 6)));
+
+        TopologyManager service = new TopologyManager(SUPPLIER, ID);
+        service.onTopologyUpdate(topology5);
+        service.onTopologyUpdate(topology6);
+
+        Assertions.assertSame(topology6, 
service.getEpochStateUnsafe(6).global());
+        Assertions.assertSame(topology5, 
service.getEpochStateUnsafe(5).global());
+        for (int i=1; i<=6; i++) service.onEpochSyncComplete(id(i), 6);
+        Assertions.assertTrue(service.getEpochStateUnsafe(5).syncComplete());
+        Assertions.assertNull(service.getEpochStateUnsafe(4));
+
+        service.onEpochSyncComplete(id(1), 4);
+    }
+
+    private static void markTopologySynced(TopologyManager service, long epoch)
+    {
+        service.getEpochStateUnsafe(epoch).global().nodes().forEach(id -> 
service.onEpochSyncComplete(id, epoch));
+    }
+
+    private static void addAndMarkSynced(TopologyManager service, Topology 
topology)
+    {
+        service.onTopologyUpdate(topology);
+        markTopologySynced(service, topology.epoch());
+    }
+
+    @Test
+    void truncateTopologyHistory()
+    {
+        Range range = range(100, 200);
+        TopologyManager service = new TopologyManager(SUPPLIER, ID);
+        addAndMarkSynced(service, topology(1, shard(range, idList(1, 2, 3), 
idSet(1, 2))));
+        addAndMarkSynced(service, topology(2, shard(range, idList(1, 2, 3), 
idSet(2, 3))));
+        addAndMarkSynced(service, topology(3, shard(range, idList(1, 2, 3), 
idSet(1, 2))));
+        addAndMarkSynced(service, topology(4, shard(range, idList(1, 2, 3), 
idSet(1, 3))));
+
+        Assertions.assertTrue(service.hasEpoch(1));
+        Assertions.assertTrue(service.hasEpoch(2));
+        Assertions.assertTrue(service.hasEpoch(3));
+        Assertions.assertTrue(service.hasEpoch(4));
+
+        service.truncateTopologyUntil(3);
+        Assertions.assertFalse(service.hasEpoch(1));
+        Assertions.assertFalse(service.hasEpoch(2));
+        Assertions.assertTrue(service.hasEpoch(3));
+        Assertions.assertTrue(service.hasEpoch(4));
+
+    }
+
+    @Test
+    void truncateTopologyCantTruncateUnsyncedEpochs()
+    {
+

Review Comment:
   can you fill this test out?



##########
accord-core/src/main/java/accord/messages/PreAccept.java:
##########
@@ -90,6 +90,8 @@ protected PreAcceptReply 
applyIfDoesNotCoordinate(SafeCommandStore safeStore)
     {
         // we only preaccept in the coordination epoch, but we might contact 
other epochs for dependencies
         Ranges ranges = safeStore.ranges().allBetween(minUnsyncedEpoch, txnId);
+        if (txnId.rw() == ExclusiveSyncPoint)
+            safeStore.commandStore().markExclusiveSyncPoint(safeStore, txnId, 
ranges);

Review Comment:
   can you explain this change?



##########
accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java:
##########
@@ -66,14 +66,15 @@ void disjointElectorate() throws Throwable
             Txn txn1 = writeTxn(keys);
             getUninterruptibly(node1.coordinate(txnId1, txn1));
             
getUninterruptibly(node1.commandStores().forEach(contextFor(txnId1), keys, 1, 
1, commands -> {
-                Command command = commands.command(txnId1).current();
+                Command command = commands.get(txnId1, 
keys.toParticipants()).current();
                 Assertions.assertTrue(command.partialDeps().isEmpty());
             }));
 
             cluster.configServices(4).forEach(config -> {
                 try
                 {
-                    getUninterruptibly(config.reportTopology(topology2));
+                    config.fetchTopologyForEpoch(2);

Review Comment:
   if you rebase this should be `config.reportTopology(topology2);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to