dcapwell commented on code in PR #50: URL: https://github.com/apache/cassandra-accord/pull/50#discussion_r1240434401
########## accord-core/src/main/java/accord/local/DurableBefore.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.local; + +import java.util.function.BiFunction; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import accord.api.RoutingKey; +import accord.local.Status.Durability; +import accord.primitives.Participants; +import accord.primitives.Range; +import accord.primitives.Ranges; +import accord.primitives.Routables; +import accord.primitives.TxnId; +import accord.utils.Invariants; +import accord.utils.ReducingIntervalMap; +import accord.utils.ReducingRangeMap; + +import static accord.local.Status.Durability.Majority; +import static accord.local.Status.Durability.NotDurable; +import static accord.local.Status.Durability.Universal; + +public class DurableBefore extends ReducingRangeMap<DurableBefore.Entry> +{ + public static class SerializerSupport + { + public static DurableBefore create(boolean inclusiveEnds, RoutingKey[] ends, Entry[] values) + { + return new DurableBefore(inclusiveEnds, ends, values); + } + } + + public static class Entry + { + public final @Nonnull TxnId majorityBefore, universalBefore; + + public Entry(@Nonnull TxnId majority, @Nonnull TxnId universalBefore) + { + Invariants.checkArgument(majority.compareTo(universalBefore) >= 0); Review Comment: ```suggestion Invariants.checkArgument(majority.compareTo(universalBefore) >= 0, "majority %s < universal %s", majority, universalBefore); ``` ########## accord-core/src/main/java/accord/messages/Accept.java: ########## @@ -241,10 +239,12 @@ public void process() @Override public AcceptReply apply(SafeCommandStore safeStore) { - SafeCommand safeCommand = safeStore.command(txnId); + SafeCommand safeCommand = safeStore.get(txnId, someKey); switch (Commands.acceptInvalidate(safeStore, safeCommand, ballot)) { default: Review Comment: default is truncated? maybe we should fail? ```suggestion default: throw new IllegalArgumentException("Unknown status: " + outcome); ``` ########## accord-core/src/main/java/accord/messages/ReadTxnData.java: ########## @@ -218,6 +232,14 @@ void maybeRead(SafeCommandStore safeStore, SafeCommand safeCommand) } } + @Override + protected synchronized void readComplete(CommandStore commandStore, @Nullable Data result, @Nullable Ranges unavailable) + { + // TODO (expected): lots of undesirable costs associated with the obsoletion tracker +// commandStore.execute(contextFor(txnId), safeStore -> safeStore.command(txnId).removeListener(obsoleteTracker)); Review Comment: can you remove this comment? ########## accord-core/src/main/java/accord/coordinate/Recover.java: ########## @@ -211,22 +214,27 @@ private void recover() switch (acceptOrCommit.status) { default: throw new IllegalStateException(); Review Comment: ```suggestion default: throw new IllegalStateException("Unknown status: " + acceptOrCommit.status); ``` ########## accord-core/src/main/java/accord/coordinate/CoordinateGloballyDurable.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.coordinate; + +import accord.coordinate.tracking.QuorumTracker; +import accord.coordinate.tracking.RequestStatus; +import accord.local.Node; +import accord.local.DurableBefore; +import accord.messages.Callback; +import accord.messages.QueryDurableBefore; +import accord.messages.SetGloballyDurable; +import accord.primitives.TxnId; +import accord.topology.Topologies; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults.SettableResult; + +/** + * Review Comment: add comment or remove ########## accord-core/src/test/java/accord/topology/TopologyRandomizer.java: ########## @@ -288,6 +294,70 @@ public synchronized void maybeUpdateTopology() { updateTopology(); } + public synchronized void rangeDurable() + { + Invariants.checkState(nodeLookup != null); Review Comment: ```suggestion Invariants.nonNull(nodeLookup); ``` ########## accord-core/src/main/java/accord/local/SafeCommandStore.java: ########## @@ -137,33 +115,117 @@ public static TestKind shouldHaveWitnessed(Kind kind) } + /** + * If the transaction exists (with some associated data) in the CommandStore, return it. Otherwise return null. + * + * This is useful for operations that do not retain a route, but do expect to operate on existing local state; + * this guards against recreating a previously truncated command when we do not otherwise have enough information + * to prevent it. + */ + public @Nullable SafeCommand ifInitialised(TxnId txnId) + { + SafeCommand safeCommand = get(txnId); + Command command = safeCommand.current(); + if (command.saveStatus() == Uninitialised) + return null; + return maybeTruncate(safeCommand, command); + } + + public SafeCommand get(TxnId txnId, RoutingKey unseekable) + { + SafeCommand safeCommand = get(txnId); + Command command = safeCommand.current(); + if (command.saveStatus() == Uninitialised) + { + if (commandStore().durableBefore().isUniversal(txnId, unseekable)) + return new TruncatedSafeCommand(txnId); + } + return maybeTruncate(safeCommand, command); + } + + // decidedExecuteAt == null if not yet PreCommitted + + /** + * Retrieve a SafeCommand. If it is initialised, optionally use its present contents to determine if it should be + * truncated, and apply the truncation before returning the command. + * This behaviour may be overridden by implementations if they know any truncation would already have been applied. + * + * If it is not initialised, use the provided parameters to determine if the record may have been expunged; + * if not, create it. + * + * We do not distinguish between participants, home keys, and non-participating home keys for now, even though + * these fundamentally have different implications. Logically, we may erase a home shard's record as soon as + * the transaction has been made durable at a majority of replicas of every shard, and state for any participating + * keys may be erased as soon as their non-faulty peers have recorded the outcome. + * + * However if in some cases we don't know which commands are home keys or participants we need to wait to erase + * a transaction until both of these criteria are met for every key. + * + * TODO (desired): Introduce static types that permit us to propagate this information safely. + */ + public SafeCommand get(TxnId txnId, Unseekables<?> unseekables) Review Comment: can you move these 2 methods next to the other `get` method? ########## accord-core/src/test/java/accord/topology/TopologyRandomizer.java: ########## @@ -288,6 +294,70 @@ public synchronized void maybeUpdateTopology() { updateTopology(); } + public synchronized void rangeDurable() + { + Invariants.checkState(nodeLookup != null); + Topology current = epochs.get(epochs.size() - 1); + List<Node.Id> nodes = new ArrayList<>(current.nodes()); + Node.Id nodeId = nodes.get(random.nextInt(nodes.size())); + Node node = nodeLookup.apply(nodeId); + Ranges ranges = selectRanges(current.forNode(nodeId).ranges); + CoordinateSyncPoint.exclusive(node, ranges) + .addCallback((success, fail) -> { + if (success != null) + coordinateDurable(node, success); + }); + } + + private static void coordinateDurable(Node node, SyncPoint exclusiveSyncPoint) + { + CoordinateShardDurable.coordinate(node, exclusiveSyncPoint, Collections.emptySet()) + .addCallback((success0, fail0) -> { + if (fail0 != null) coordinateDurable(node, exclusiveSyncPoint); + }); + } + + public synchronized void globallyDurable() + { + Invariants.checkState(nodeLookup != null); Review Comment: ```suggestion Invariants.nonNull(nodeLookup); ``` ########## accord-core/src/main/java/accord/local/DurableBefore.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.local; + +import java.util.function.BiFunction; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import accord.api.RoutingKey; +import accord.local.Status.Durability; +import accord.primitives.Participants; +import accord.primitives.Range; +import accord.primitives.Ranges; +import accord.primitives.Routables; +import accord.primitives.TxnId; +import accord.utils.Invariants; +import accord.utils.ReducingIntervalMap; +import accord.utils.ReducingRangeMap; + +import static accord.local.Status.Durability.Majority; +import static accord.local.Status.Durability.NotDurable; +import static accord.local.Status.Durability.Universal; + +public class DurableBefore extends ReducingRangeMap<DurableBefore.Entry> +{ + public static class SerializerSupport + { + public static DurableBefore create(boolean inclusiveEnds, RoutingKey[] ends, Entry[] values) + { + return new DurableBefore(inclusiveEnds, ends, values); + } + } + + public static class Entry + { + public final @Nonnull TxnId majorityBefore, universalBefore; + + public Entry(@Nonnull TxnId majority, @Nonnull TxnId universalBefore) + { + Invariants.checkArgument(majority.compareTo(universalBefore) >= 0); + this.majorityBefore = majority; + this.universalBefore = universalBefore; + } + + private static Entry max(Entry a, Entry b) + { + return reduce(a, b, TxnId::max); + } + + private static Entry min(Entry a, Entry b) + { + return reduce(a, b, TxnId::min); + } + + private static Entry reduce(Entry a, Entry b, BiFunction<TxnId, TxnId, TxnId> reduce) + { + TxnId majority = reduce.apply(a.majorityBefore, b.majorityBefore); + TxnId universal = reduce.apply(a.universalBefore, b.universalBefore); + + if (majority == a.majorityBefore && universal == a.universalBefore) + return a; + if (majority.equals(b.majorityBefore) && universal.equals(b.universalBefore)) + return b; + + return new Entry(majority, universal); + } + + public Durability get(TxnId txnId) + { + if (txnId.compareTo(majorityBefore) < 0) + { + if (txnId.compareTo(universalBefore) < 0) + return Universal; + return Majority; Review Comment: ```suggestion return txnId.compareTo(universalBefore) < 0 ? Universal : Majority; ``` ########## accord-core/src/test/java/accord/topology/TopologyRandomizer.java: ########## @@ -288,6 +294,70 @@ public synchronized void maybeUpdateTopology() { updateTopology(); } + public synchronized void rangeDurable() + { + Invariants.checkState(nodeLookup != null); + Topology current = epochs.get(epochs.size() - 1); + List<Node.Id> nodes = new ArrayList<>(current.nodes()); + Node.Id nodeId = nodes.get(random.nextInt(nodes.size())); + Node node = nodeLookup.apply(nodeId); + Ranges ranges = selectRanges(current.forNode(nodeId).ranges); + CoordinateSyncPoint.exclusive(node, ranges) + .addCallback((success, fail) -> { + if (success != null) + coordinateDurable(node, success); + }); + } + + private static void coordinateDurable(Node node, SyncPoint exclusiveSyncPoint) + { + CoordinateShardDurable.coordinate(node, exclusiveSyncPoint, Collections.emptySet()) + .addCallback((success0, fail0) -> { + if (fail0 != null) coordinateDurable(node, exclusiveSyncPoint); + }); + } + + public synchronized void globallyDurable() + { + Invariants.checkState(nodeLookup != null); + Topology current = epochs.get(epochs.size() - 1); + List<Node.Id> nodes = new ArrayList<>(current.nodes()); + Node.Id nodeId = nodes.get(random.nextInt(nodes.size())); + Node node = nodeLookup.apply(nodeId); + long epoch = current.epoch == 1 ? 1 : 1 + random.nextInt((int)current.epoch - 1); + node.withEpoch(epoch, () -> { + node.commandStores().any().execute(() -> CoordinateGloballyDurable.coordinate(node, epoch)); + }); + } + + private Ranges selectRanges(Ranges ranges) + { + if (random.nextFloat() < 0.1f) Review Comment: ```suggestion if (random.decide(0.1f)) ``` ########## accord-core/src/test/java/accord/topology/TopologyManagerTest.java: ########## @@ -191,13 +191,76 @@ void forKeysPartiallySynced() // no acks, so all epoch 1 shards should be included Assertions.assertEquals(topologies(topology2, topology1), - service.withUnsyncedEpochs(keys(150, 250).toUnseekables(), 2, 2)); + service.withUnsyncedEpochs(keys(150, 250).toParticipants(), 2, 2)); // first topology acked, so only the second shard should be included - service.onEpochSyncComplete(id(1), 1); - service.onEpochSyncComplete(id(2), 1); - Topologies actual = service.withUnsyncedEpochs(keys(150, 250).toUnseekables(), 2, 2); + service.onEpochSyncComplete(id(1), 2); + service.onEpochSyncComplete(id(2), 2); + Topologies actual = service.withUnsyncedEpochs(keys(150, 250).toParticipants(), 2, 2); Assertions.assertEquals(topologies(topology2, topology(1, shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)))), actual); } + + @Test + void incompleteTopologyHistory() + { + Topology topology5 = topology(5, + shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)), + shard(range(200, 300), idList(4, 5, 6), idSet(4, 5))); + Topology topology6 = topology(6, + shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)), + shard(range(200, 300), idList(4, 5, 6), idSet(5, 6))); + + TopologyManager service = new TopologyManager(SUPPLIER, ID); + service.onTopologyUpdate(topology5); + service.onTopologyUpdate(topology6); + + Assertions.assertSame(topology6, service.getEpochStateUnsafe(6).global()); + Assertions.assertSame(topology5, service.getEpochStateUnsafe(5).global()); + for (int i=1; i<=6; i++) service.onEpochSyncComplete(id(i), 6); + Assertions.assertTrue(service.getEpochStateUnsafe(5).syncComplete()); + Assertions.assertNull(service.getEpochStateUnsafe(4)); + + service.onEpochSyncComplete(id(1), 4); + } + + private static void markTopologySynced(TopologyManager service, long epoch) + { + service.getEpochStateUnsafe(epoch).global().nodes().forEach(id -> service.onEpochSyncComplete(id, epoch)); + } + + private static void addAndMarkSynced(TopologyManager service, Topology topology) + { + service.onTopologyUpdate(topology); + markTopologySynced(service, topology.epoch()); + } + + @Test + void truncateTopologyHistory() + { + Range range = range(100, 200); + TopologyManager service = new TopologyManager(SUPPLIER, ID); + addAndMarkSynced(service, topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)))); + addAndMarkSynced(service, topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)))); + addAndMarkSynced(service, topology(3, shard(range, idList(1, 2, 3), idSet(1, 2)))); + addAndMarkSynced(service, topology(4, shard(range, idList(1, 2, 3), idSet(1, 3)))); + + Assertions.assertTrue(service.hasEpoch(1)); + Assertions.assertTrue(service.hasEpoch(2)); + Assertions.assertTrue(service.hasEpoch(3)); + Assertions.assertTrue(service.hasEpoch(4)); + + service.truncateTopologyUntil(3); + Assertions.assertFalse(service.hasEpoch(1)); + Assertions.assertFalse(service.hasEpoch(2)); + Assertions.assertTrue(service.hasEpoch(3)); + Assertions.assertTrue(service.hasEpoch(4)); + + } + + @Test + void truncateTopologyCantTruncateUnsyncedEpochs() + { + Review Comment: can you fill this test out? ########## accord-core/src/main/java/accord/messages/PreAccept.java: ########## @@ -90,6 +90,8 @@ protected PreAcceptReply applyIfDoesNotCoordinate(SafeCommandStore safeStore) { // we only preaccept in the coordination epoch, but we might contact other epochs for dependencies Ranges ranges = safeStore.ranges().allBetween(minUnsyncedEpoch, txnId); + if (txnId.rw() == ExclusiveSyncPoint) + safeStore.commandStore().markExclusiveSyncPoint(safeStore, txnId, ranges); Review Comment: can you explain this change? ########## accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java: ########## @@ -66,14 +66,15 @@ void disjointElectorate() throws Throwable Txn txn1 = writeTxn(keys); getUninterruptibly(node1.coordinate(txnId1, txn1)); getUninterruptibly(node1.commandStores().forEach(contextFor(txnId1), keys, 1, 1, commands -> { - Command command = commands.command(txnId1).current(); + Command command = commands.get(txnId1, keys.toParticipants()).current(); Assertions.assertTrue(command.partialDeps().isEmpty()); })); cluster.configServices(4).forEach(config -> { try { - getUninterruptibly(config.reportTopology(topology2)); + config.fetchTopologyForEpoch(2); Review Comment: if you rebase this should be `config.reportTopology(topology2);` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

