This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit ac0e1bf7e873bf4ae750b00b1d0c2967bac1aeec Author: David Capwell <dcapw...@apache.org> AuthorDate: Mon Apr 1 10:16:27 2024 -0700 (Accord) Cassandra bootstrap no longer using the range txn and instead uses the sync point empty txn for reads patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19503 --- modules/accord | 2 +- .../cassandra/service/accord/AccordJournal.java | 10 ++++++ .../service/accord/AccordMessageSink.java | 2 ++ .../test/accord/AccordBootstrapTest.java | 2 +- .../cassandra/service/accord/MockJournal.java | 39 ++++++++++++++++------ 5 files changed, 42 insertions(+), 13 deletions(-) diff --git a/modules/accord b/modules/accord index f78d1da27b..1a5cb4f100 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit f78d1da27b09f89417dd29bde0529f12cd744e3d +Subproject commit 1a5cb4f10002fb3650ad464b3a77664f18e2a901 diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 0562da1139..b659cf4733 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -1408,6 +1408,7 @@ public class AccordJournal implements IJournal, Shutdownable return presentMessages; } + @Override public Set<MessageType> all() { Set<Type> types = EnumSet.allOf(Type.class); @@ -1514,6 +1515,15 @@ public class AccordJournal implements IJournal, Shutdownable return confirmed; } + @Override + public Set<MessageType> all() + { + logger.debug("Checking all messages for {}", txnId); + Set<MessageType> confirmed = provider.all(); + logger.debug("Confirmed {} messages for {}", confirmed, txnId); + return confirmed; + } + @Override public PreAccept preAccept() { diff --git a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java index d72644811a..5a514219e3 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java +++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java @@ -126,6 +126,8 @@ public class AccordMessageSink implements MessageSink builder.put(MessageType.GET_DEPS_RSP, Verb.ACCORD_GET_DEPS_RSP); builder.put(MessageType.GET_EPHEMERAL_READ_DEPS_REQ, Verb.ACCORD_GET_EPHMRL_READ_DEPS_REQ); builder.put(MessageType.GET_EPHEMERAL_READ_DEPS_RSP, Verb.ACCORD_GET_EPHMRL_READ_DEPS_RSP); + builder.put(MessageType.GET_MAX_CONFLICT_REQ, Verb.ACCORD_GET_MAX_CONFLICT_REQ); + builder.put(MessageType.GET_MAX_CONFLICT_RSP, Verb.ACCORD_GET_MAX_CONFLICT_RSP); builder.put(MessageType.COMMIT_SLOW_PATH_REQ, Verb.ACCORD_COMMIT_REQ); builder.put(MessageType.COMMIT_MAXIMAL_REQ, Verb.ACCORD_COMMIT_REQ); builder.put(MessageType.STABLE_FAST_PATH_REQ, Verb.ACCORD_COMMIT_REQ); diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java index f040e9d4db..2241a8c911 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java @@ -91,7 +91,7 @@ public class AccordBootstrapTest extends TestBaseImpl // withProperty(BOOTSTRAP_SCHEMA_DELAY_MS.getKey(), Integer.toString(90 * 1000), // () -> withProperty("cassandra.join_ring", false, () -> newInstance.startup(cluster))); // newInstance.nodetoolResult("join").asserts().success(); - newInstance.nodetoolResult("describecms").asserts().success(); // just make sure we're joined, remove later + newInstance.nodetoolResult("cms", "describe").asserts().success(); // just make sure we're joined, remove later } private static AccordService service() diff --git a/test/unit/org/apache/cassandra/service/accord/MockJournal.java b/test/unit/org/apache/cassandra/service/accord/MockJournal.java index 575b996e1e..8a68163ede 100644 --- a/test/unit/org/apache/cassandra/service/accord/MockJournal.java +++ b/test/unit/org/apache/cassandra/service/accord/MockJournal.java @@ -18,6 +18,7 @@ package org.apache.cassandra.service.accord; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -36,6 +37,8 @@ import accord.messages.Propagate; import accord.primitives.Ballot; import accord.primitives.TxnId; import org.agrona.collections.ObjectHashSet; +import org.apache.cassandra.service.accord.AccordJournal.Key; +import org.apache.cassandra.service.accord.AccordJournal.Type; import static accord.messages.MessageType.ACCEPT_REQ; import static accord.messages.MessageType.APPLY_MAXIMAL_REQ; @@ -52,7 +55,7 @@ import static accord.messages.MessageType.STABLE_MAXIMAL_REQ; public class MockJournal implements IJournal { - private final Map<AccordJournal.Key, Message> writes = new HashMap<>(); + private final Map<Key, Message> writes = new HashMap<>(); @Override public SerializerSupport.MessageProvider makeMessageProvider(TxnId txnId) { @@ -61,27 +64,41 @@ public class MockJournal implements IJournal @Override public Set<MessageType> test(Set<MessageType> messages) { - Set<AccordJournal.Key> keys = new ObjectHashSet<>(messages.size() + 1, 0.9f); + Set<Key> keys = new ObjectHashSet<>(messages.size() + 1, 0.9f); for (MessageType message : messages) - for (AccordJournal.Type synonymousType : AccordJournal.Type.synonymousTypesFromMessageType(message)) - keys.add(new AccordJournal.Key(txnId, synonymousType)); - Set<AccordJournal.Key> presentKeys = Sets.intersection(writes.keySet(), keys); + for (Type synonymousType : Type.synonymousTypesFromMessageType(message)) + keys.add(new Key(txnId, synonymousType)); + Set<Key> presentKeys = Sets.intersection(writes.keySet(), keys); Set<MessageType> presentMessages = new ObjectHashSet<>(presentKeys.size() + 1, 0.9f); - for (AccordJournal.Key key : presentKeys) + for (Key key : presentKeys) presentMessages.add(key.type.outgoingType); return presentMessages; } - private <T extends Message> T get(AccordJournal.Key key) + @Override + public Set<MessageType> all() + { + Set<Type> types = EnumSet.allOf(Type.class); + Set<Key> keys = new ObjectHashSet<>(types.size() + 1, 0.9f); + for (Type type : types) + keys.add(new Key(txnId, type)); + Set<Key> presentKeys = Sets.intersection(writes.keySet(), keys); + Set<MessageType> presentMessages = new ObjectHashSet<>(presentKeys.size() + 1, 0.9f); + for (Key key : presentKeys) + presentMessages.add(key.type.outgoingType); + return presentMessages; + } + + private <T extends Message> T get(Key key) { return (T) writes.get(key); } private <T extends Message> T get(MessageType messageType) { - for (AccordJournal.Type type : AccordJournal.Type.synonymousTypesFromMessageType(messageType)) + for (Type type : Type.synonymousTypesFromMessageType(messageType)) { - T value = get(new AccordJournal.Key(txnId, type)); + T value = get(new Key(txnId, type)); if (value != null) return value; } return null; @@ -164,8 +181,8 @@ public class MockJournal implements IJournal @Override public void appendMessageBlocking(Message message) { - AccordJournal.Type type = AccordJournal.Type.fromMessageType(message.type()); - AccordJournal.Key key = new AccordJournal.Key(type.txnId(message), type); + Type type = Type.fromMessageType(message.type()); + Key key = new Key(type.txnId(message), type); writes.put(key, message); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org