This is an automated email from the ASF dual-hosted git repository. aleksey pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new c524b6d Pre-requisite changes for CASSANDRA-18888 (#74) c524b6d is described below commit c524b6d3de3923ccb6314715bd987f3b891348ab Author: Aleksey Yeschenko <alek...@apache.org> AuthorDate: Wed Jan 17 15:14:00 2024 +0000 Pre-requisite changes for CASSANDRA-18888 (#74) patch by Aleksey Yeschenko; reviewed by Ariel Weisberg for CASSANDRA-18888 --- .../src/main/java/accord/api/MessageSink.java | 2 +- accord-core/src/main/java/accord/local/Node.java | 29 +++--- .../java/accord/messages/AbstractEpochRequest.java | 8 +- .../src/main/java/accord/messages/Commit.java | 7 ++ .../java/accord/messages/InformHomeDurable.java | 6 ++ .../{LocalMessage.java => LocalRequest.java} | 19 +++- .../src/main/java/accord/messages/MessageType.java | 114 ++++++++++++--------- .../src/main/java/accord/messages/Propagate.java | 32 +++++- .../src/main/java/accord/messages/Request.java | 5 +- .../src/main/java/accord/messages/TxnRequest.java | 8 +- .../main/java/accord/messages/WaitOnCommit.java | 8 +- accord-core/src/test/java/accord/Utils.java | 8 +- .../accord/burn/BurnTestConfigurationService.java | 6 ++ .../src/test/java/accord/impl/basic/Cluster.java | 16 +-- .../test/java/accord/impl/list/ListRequest.java | 6 ++ .../test/java/accord/impl/mock/MockCluster.java | 4 +- .../src/test/java/accord/local/CommandsTest.java | 6 ++ .../src/test/java/accord/utils/MessageTask.java | 6 ++ .../src/main/java/accord/maelstrom/Cluster.java | 4 +- .../java/accord/maelstrom/MaelstromRequest.java | 6 ++ .../src/main/java/accord/maelstrom/Main.java | 4 +- 21 files changed, 208 insertions(+), 96 deletions(-) diff --git a/accord-core/src/main/java/accord/api/MessageSink.java b/accord-core/src/main/java/accord/api/MessageSink.java index 47a3fa0..d05f304 100644 --- a/accord-core/src/main/java/accord/api/MessageSink.java +++ b/accord-core/src/main/java/accord/api/MessageSink.java @@ -28,7 +28,7 @@ import accord.messages.Request; public interface MessageSink { void send(Id to, Request request); - void send(Id to, Request request, AgentExecutor executor, Callback callback); + void send(Id to, Request request, AgentExecutor executor, Callback<?> callback); void reply(Id replyingToNode, ReplyContext replyContext, Reply reply); void replyWithUnknownFailure(Id replyingToNode, ReplyContext replyContext, Throwable failure); } diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 9e67cfa..6fc883e 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -65,7 +65,7 @@ import accord.coordinate.Persist; import accord.coordinate.RecoverWithRoute; import accord.messages.Apply; import accord.messages.Callback; -import accord.messages.LocalMessage; +import accord.messages.LocalRequest; import accord.messages.Reply; import accord.messages.ReplyContext; import accord.messages.Request; @@ -147,7 +147,7 @@ public class Node implements ConfigurationService.Listener, NodeTimeService private final Id id; private final MessageSink messageSink; - private final LocalMessage.Handler localMessageHandler; + private final LocalRequest.Handler localRequestHandler; private final ConfigurationService configService; private final TopologyManager topology; private final CommandStores commandStores; @@ -168,7 +168,7 @@ public class Node implements ConfigurationService.Listener, NodeTimeService // TODO (expected, liveness): monitor the contents of this collection for stalled coordination, and excise them private final Map<TxnId, AsyncResult<? extends Outcome>> coordinating = new ConcurrentHashMap<>(); - public Node(Id id, MessageSink messageSink, LocalMessage.Handler localMessageHandler, + public Node(Id id, MessageSink messageSink, LocalRequest.Handler localRequestHandler, ConfigurationService configService, LongSupplier nowSupplier, ToLongFunction<TimeUnit> nowTimeUnit, Supplier<DataStore> dataSupplier, ShardDistributor shardDistributor, Agent agent, RandomSource random, Scheduler scheduler, TopologySorter.Supplier topologySorter, Function<Node, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory, Execute.Factory executionFactory, Persist.Factory persistFactory, Apply.Factory applyFactory, @@ -177,7 +177,7 @@ public class Node implements ConfigurationService.Listener, NodeTimeService this.id = id; this.localConfig = localConfig; this.messageSink = messageSink; - this.localMessageHandler = localMessageHandler; + this.localRequestHandler = localRequestHandler; this.configService = configService; this.executionFactory = executionFactory; this.persistFactory = persistFactory; @@ -529,9 +529,9 @@ public class Node implements ConfigurationService.Listener, NodeTimeService messageSink.send(to, send); } - public void localMessage(LocalMessage message) + public void localRequest(LocalRequest message) { - localMessageHandler.handle(message, this); + localRequestHandler.handle(message, this); } public void reply(Id replyingToNode, ReplyContext replyContext, Reply send, Throwable failure) @@ -706,19 +706,16 @@ public class Node implements ConfigurationService.Listener, NodeTimeService return future; } - public void receive (Request request, Id from, ReplyContext replyContext) + public void receive(Request request, Id from, ReplyContext replyContext) { - long knownEpoch = request.knownEpoch(); - if (knownEpoch > topology.epoch()) + long waitForEpoch = request.waitForEpoch(); + if (waitForEpoch > topology.epoch()) { - configService.fetchTopologyForEpoch(knownEpoch); - long waitForEpoch = request.waitForEpoch(); - if (waitForEpoch > topology.epoch()) - { - topology().awaitEpoch(waitForEpoch).addCallback(() -> receive(request, from, replyContext)); - return; - } + configService.fetchTopologyForEpoch(waitForEpoch); + topology().awaitEpoch(waitForEpoch).addCallback(() -> receive(request, from, replyContext)); + return; } + Runnable processMsg = () -> { try { diff --git a/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java b/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java index 482fcf3..040364f 100644 --- a/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java +++ b/accord-core/src/main/java/accord/messages/AbstractEpochRequest.java @@ -37,11 +37,17 @@ public abstract class AbstractEpochRequest<R extends Reply> implements PreLoadCo } @Override - public void process(Node on, Node.Id replyTo, ReplyContext replyContext) + public void preProcess(Node on, Node.Id replyTo, ReplyContext replyContext) { this.node = on; this.replyTo = replyTo; this.replyContext = replyContext; + } + + @Override + public void process(Node on, Node.Id replyTo, ReplyContext replyContext) + { + preProcess(on, replyTo, replyContext); process(); } diff --git a/accord-core/src/main/java/accord/messages/Commit.java b/accord-core/src/main/java/accord/messages/Commit.java index 14b26fd..bb05856 100644 --- a/accord-core/src/main/java/accord/messages/Commit.java +++ b/accord-core/src/main/java/accord/messages/Commit.java @@ -323,9 +323,16 @@ public class Commit extends TxnRequest<ReadNack> return waitForEpoch; } + @Override + public void preProcess(Node on, Id from, ReplyContext replyContext) + { + // no-op + } + @Override public void process(Node node, Id from, ReplyContext replyContext) { + node.forEachLocal(this, scope, txnId.epoch(), invalidateUntilEpoch, safeStore -> { // it's fine for this to operate on a non-participating home key, since invalidation is a terminal state, // so it doesn't matter if we resurrect a redundant entry diff --git a/accord-core/src/main/java/accord/messages/InformHomeDurable.java b/accord-core/src/main/java/accord/messages/InformHomeDurable.java index 6f2491d..ae871fd 100644 --- a/accord-core/src/main/java/accord/messages/InformHomeDurable.java +++ b/accord-core/src/main/java/accord/messages/InformHomeDurable.java @@ -53,6 +53,12 @@ public class InformHomeDurable implements Request this.persistedOn = ImmutableSet.copyOf(persistedOn); // Persisted on might be mutated later } + @Override + public void preProcess(Node node, Id replyToNode, ReplyContext replyContext) + { + // no-op + } + @Override public void process(Node node, Id replyToNode, ReplyContext replyContext) { diff --git a/accord-core/src/main/java/accord/messages/LocalMessage.java b/accord-core/src/main/java/accord/messages/LocalRequest.java similarity index 65% rename from accord-core/src/main/java/accord/messages/LocalMessage.java rename to accord-core/src/main/java/accord/messages/LocalRequest.java index 07b1189..d12e35a 100644 --- a/accord-core/src/main/java/accord/messages/LocalMessage.java +++ b/accord-core/src/main/java/accord/messages/LocalRequest.java @@ -19,13 +19,24 @@ package accord.messages; import accord.local.Node; import accord.local.PreLoadContext; +import accord.local.SafeCommandStore; +import accord.utils.MapReduceConsume; -public interface LocalMessage extends Message, PreLoadContext +import java.util.function.BiConsumer; + +public interface LocalRequest<R> extends Request, PreLoadContext, MapReduceConsume<SafeCommandStore, Void> { + /** + * Process the request without executing the callback + */ + void process(Node on); + + void process(Node on, BiConsumer<R, Throwable> callback); + + BiConsumer<R, Throwable> callback(); + interface Handler { - void handle(LocalMessage message, Node node); + void handle(LocalRequest<?> message, Node node); } - - void process(Node node); } diff --git a/accord-core/src/main/java/accord/messages/MessageType.java b/accord-core/src/main/java/accord/messages/MessageType.java index 56ea2ed..8111f5f 100644 --- a/accord-core/src/main/java/accord/messages/MessageType.java +++ b/accord-core/src/main/java/accord/messages/MessageType.java @@ -31,49 +31,52 @@ import static accord.messages.MessageType.Kind.LOCAL; */ public class MessageType { - public static final MessageType SIMPLE_RSP = mt(REMOTE, false); - public static final MessageType FAILURE_RSP = mt(REMOTE, false); - public static final MessageType PRE_ACCEPT_REQ = mt(REMOTE, true ); - public static final MessageType PRE_ACCEPT_RSP = mt(REMOTE, false); - public static final MessageType ACCEPT_REQ = mt(REMOTE, true ); - public static final MessageType ACCEPT_RSP = mt(REMOTE, false); - public static final MessageType ACCEPT_INVALIDATE_REQ = mt(REMOTE, true ); - public static final MessageType GET_DEPS_REQ = mt(REMOTE, false); - public static final MessageType GET_DEPS_RSP = mt(REMOTE, false); - public static final MessageType COMMIT_MINIMAL_REQ = mt(REMOTE, true ); - public static final MessageType COMMIT_MAXIMAL_REQ = mt(REMOTE, true ); - public static final MessageType COMMIT_INVALIDATE_REQ = mt(REMOTE, true ); - public static final MessageType APPLY_MINIMAL_REQ = mt(REMOTE, true ); - public static final MessageType APPLY_MAXIMAL_REQ = mt(REMOTE, true ); - public static final MessageType APPLY_RSP = mt(REMOTE, false); - public static final MessageType READ_REQ = mt(REMOTE, false); - public static final MessageType READ_RSP = mt(REMOTE, false); - public static final MessageType BEGIN_RECOVER_REQ = mt(REMOTE, true ); - public static final MessageType BEGIN_RECOVER_RSP = mt(REMOTE, false); - public static final MessageType BEGIN_INVALIDATE_REQ = mt(REMOTE, true ); - public static final MessageType BEGIN_INVALIDATE_RSP = mt(REMOTE, false); - public static final MessageType WAIT_ON_COMMIT_REQ = mt(REMOTE, false); - public static final MessageType WAIT_ON_COMMIT_RSP = mt(REMOTE, false); - public static final MessageType WAIT_UNTIL_APPLIED_REQ = mt(REMOTE, false); - public static final MessageType INFORM_OF_TXN_REQ = mt(REMOTE, true ); - public static final MessageType INFORM_DURABLE_REQ = mt(REMOTE, true ); - public static final MessageType INFORM_HOME_DURABLE_REQ = mt(REMOTE, true ); - public static final MessageType CHECK_STATUS_REQ = mt(REMOTE, false); - public static final MessageType CHECK_STATUS_RSP = mt(REMOTE, false); - public static final MessageType FETCH_DATA_REQ = mt(REMOTE, false); - public static final MessageType FETCH_DATA_RSP = mt(REMOTE, false); - public static final MessageType SET_SHARD_DURABLE_REQ = mt(REMOTE, true ); - public static final MessageType SET_GLOBALLY_DURABLE_REQ = mt(REMOTE, true ); - public static final MessageType QUERY_DURABLE_BEFORE_REQ = mt(REMOTE, false); - public static final MessageType QUERY_DURABLE_BEFORE_RSP = mt(REMOTE, false); - public static final MessageType APPLY_THEN_WAIT_UNTIL_APPLIED_REQ= mt(REMOTE, true ); - - public static final MessageType PROPAGATE_PRE_ACCEPT_MSG = mt(LOCAL, true ); - public static final MessageType PROPAGATE_COMMIT_MSG = mt(LOCAL, true ); - public static final MessageType PROPAGATE_APPLY_MSG = mt(LOCAL, true ); - public static final MessageType PROPAGATE_OTHER_MSG = mt(LOCAL, true ); + public static final MessageType SIMPLE_RSP = remote("SIMPLE_RSP", false); + public static final MessageType FAILURE_RSP = remote("FAILURE_RSP", false); + public static final MessageType PRE_ACCEPT_REQ = remote("PRE_ACCEPT_REQ", true ); + public static final MessageType PRE_ACCEPT_RSP = remote("PRE_ACCEPT_RSP", false); + public static final MessageType ACCEPT_REQ = remote("ACCEPT_REQ", true ); + public static final MessageType ACCEPT_RSP = remote("ACCEPT_RSP", false); + public static final MessageType ACCEPT_INVALIDATE_REQ = remote("ACCEPT_INVALIDATE_REQ", true ); + public static final MessageType GET_DEPS_REQ = remote("GET_DEPS_REQ", false); + public static final MessageType GET_DEPS_RSP = remote("GET_DEPS_RSP", false); + public static final MessageType COMMIT_MINIMAL_REQ = remote("COMMIT_MINIMAL_REQ", true ); + public static final MessageType COMMIT_MAXIMAL_REQ = remote("COMMIT_MAXIMAL_REQ", true ); + public static final MessageType COMMIT_INVALIDATE_REQ = remote("COMMIT_INVALIDATE_REQ", true ); + public static final MessageType APPLY_MINIMAL_REQ = remote("APPLY_MINIMAL_REQ", true ); + public static final MessageType APPLY_MAXIMAL_REQ = remote("APPLY_MAXIMAL_REQ", true ); + public static final MessageType APPLY_RSP = remote("APPLY_RSP", false); + public static final MessageType READ_REQ = remote("READ_REQ", false); + public static final MessageType READ_RSP = remote("READ_RSP", false); + public static final MessageType BEGIN_RECOVER_REQ = remote("BEGIN_RECOVER_REQ", true ); + public static final MessageType BEGIN_RECOVER_RSP = remote("BEGIN_RECOVER_RSP", false); + public static final MessageType BEGIN_INVALIDATE_REQ = remote("BEGIN_INVALIDATE_REQ", true ); + public static final MessageType BEGIN_INVALIDATE_RSP = remote("BEGIN_INVALIDATE_RSP", false); + public static final MessageType WAIT_ON_COMMIT_REQ = remote("WAIT_ON_COMMIT_REQ", false); + public static final MessageType WAIT_ON_COMMIT_RSP = remote("WAIT_ON_COMMIT_RSP", false); + public static final MessageType WAIT_UNTIL_APPLIED_REQ = remote("WAIT_UNTIL_APPLIED_REQ", false); + public static final MessageType INFORM_OF_TXN_REQ = remote("INFORM_OF_TXN_REQ", true ); + public static final MessageType INFORM_DURABLE_REQ = remote("INFORM_DURABLE_REQ", true ); + public static final MessageType INFORM_HOME_DURABLE_REQ = remote("INFORM_HOME_DURABLE_REQ", true ); + public static final MessageType CHECK_STATUS_REQ = remote("CHECK_STATUS_REQ", false); + public static final MessageType CHECK_STATUS_RSP = remote("CHECK_STATUS_RSP", false); + public static final MessageType FETCH_DATA_REQ = remote("FETCH_DATA_REQ", false); + public static final MessageType FETCH_DATA_RSP = remote("FETCH_DATA_RSP", false); + public static final MessageType SET_SHARD_DURABLE_REQ = remote("SET_SHARD_DURABLE_REQ", true ); + public static final MessageType SET_GLOBALLY_DURABLE_REQ = remote("SET_GLOBALLY_DURABLE_REQ", true ); + public static final MessageType QUERY_DURABLE_BEFORE_REQ = remote("QUERY_DURABLE_BEFORE_REQ", false); + public static final MessageType QUERY_DURABLE_BEFORE_RSP = remote("QUERY_DURABLE_BEFORE_RSP", false); + public static final MessageType APPLY_THEN_WAIT_UNTIL_APPLIED_REQ = remote("APPLY_THEN_WAIT_UNTIL_APPLIED_REQ", true ); + + public static final MessageType PROPAGATE_PRE_ACCEPT_MSG = local("PROPAGATE_PRE_ACCEPT_MSG", true); + public static final MessageType PROPAGATE_COMMIT_MSG = local("PROPAGATE_COMMIT_MSG", true); + public static final MessageType PROPAGATE_APPLY_MSG = local("PROPAGATE_APPLY_MSG", true); + public static final MessageType PROPAGATE_OTHER_MSG = local("PROPAGATE_OTHER_MSG", true); + /** + * LOCAL messages are not sent to remote nodes. + */ public enum Kind { LOCAL, REMOTE } public static final List<MessageType> values; @@ -98,14 +101,17 @@ public class MessageType values = builder.build(); } - protected static MessageType mt(Kind kind, boolean hasSideEffects) + protected static MessageType local(String name, boolean hasSideEffects) { - return new MessageType(kind, hasSideEffects); + return new MessageType(name, LOCAL, hasSideEffects); } - /** - * LOCAL messages are not sent to remote nodes. - */ + protected static MessageType remote(String name, boolean hasSideEffects) + { + return new MessageType(name, REMOTE, hasSideEffects); + } + + private final String name; private final Kind kind; /** @@ -113,10 +119,22 @@ public class MessageType */ private final boolean hasSideEffects; - protected MessageType(Kind kind, boolean hasSideEffects) + protected MessageType(String name, Kind kind, boolean hasSideEffects) { - this.hasSideEffects = hasSideEffects; + this.name = name; this.kind = kind; + this.hasSideEffects = hasSideEffects; + } + + public String name() + { + return name; + } + + @Override + public String toString() + { + return name(); } public boolean isLocal() diff --git a/accord-core/src/main/java/accord/messages/Propagate.java b/accord-core/src/main/java/accord/messages/Propagate.java index a0821bc..e1a3679 100644 --- a/accord-core/src/main/java/accord/messages/Propagate.java +++ b/accord-core/src/main/java/accord/messages/Propagate.java @@ -41,7 +41,6 @@ import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.primitives.Writes; import accord.utils.Invariants; -import accord.utils.MapReduceConsume; import javax.annotation.Nullable; import java.util.function.BiConsumer; @@ -54,7 +53,7 @@ import static accord.local.Status.PreApplied; import static accord.messages.CheckStatus.WithQuorum.HasQuorum; import static accord.primitives.Routables.Slice.Minimal; -public class Propagate implements MapReduceConsume<SafeCommandStore, Void>, EpochSupplier, LocalMessage +public class Propagate implements EpochSupplier, LocalRequest<Status.Known> { public static class SerializerSupport { @@ -83,7 +82,13 @@ public class Propagate implements MapReduceConsume<SafeCommandStore, Void>, Epoc @Nullable public final Writes writes; @Nullable public final Result result; - transient final BiConsumer<Status.Known, Throwable> callback; + protected transient BiConsumer<Status.Known, Throwable> callback; + + @Override + public BiConsumer<Status.Known, Throwable> callback() + { + return callback; + } Propagate( TxnId txnId, @@ -123,6 +128,13 @@ public class Propagate implements MapReduceConsume<SafeCommandStore, Void>, Epoc this.callback = callback; } + @Override + public void process(Node on, BiConsumer<Status.Known, Throwable> callback) + { + this.callback = callback; + process(on); + } + @SuppressWarnings({"rawtypes", "unchecked"}) public static void propagate(Node node, TxnId txnId, long sourceEpoch, WithQuorum withQuorum, Route route, @Nullable Status.Known target, CheckStatus.CheckStatusOkFull full, BiConsumer<Status.Known, Throwable> callback) { @@ -190,7 +202,7 @@ public class Propagate implements MapReduceConsume<SafeCommandStore, Void>, Epoc Propagate propagate = new Propagate(txnId, route, full.maxKnowledgeSaveStatus, full.maxSaveStatus, full.durability, full.homeKey, progressKey, achieved, full.map, isTruncated, partialTxn, committedDeps, toEpoch, full.executeAtIfKnown(), full.writes, full.result, callback); - node.localMessage(propagate); + node.localRequest(propagate); } @Override @@ -212,6 +224,18 @@ public class Propagate implements MapReduceConsume<SafeCommandStore, Void>, Epoc return Keys.EMPTY; } + @Override + public void preProcess(Node on, Node.Id from, ReplyContext replyContext) + { + throw new UnsupportedOperationException(); + } + + @Override + public void process(Node on, Node.Id from, ReplyContext replyContext) + { + throw new UnsupportedOperationException(); + } + @Override public void process(Node node) { diff --git a/accord-core/src/main/java/accord/messages/Request.java b/accord-core/src/main/java/accord/messages/Request.java index 4cd9861..0c92371 100644 --- a/accord-core/src/main/java/accord/messages/Request.java +++ b/accord-core/src/main/java/accord/messages/Request.java @@ -23,7 +23,8 @@ import accord.local.Node.Id; public interface Request extends Message { - void process(Node on, Id from, ReplyContext replyContext); default long waitForEpoch() { return 0; } - default long knownEpoch() { return waitForEpoch(); } + void preProcess(Node on, Id from, ReplyContext replyContext); + void process(Node on, Id from, ReplyContext replyContext); + } diff --git a/accord-core/src/main/java/accord/messages/TxnRequest.java b/accord-core/src/main/java/accord/messages/TxnRequest.java index 2520ad8..4cd3942 100644 --- a/accord-core/src/main/java/accord/messages/TxnRequest.java +++ b/accord-core/src/main/java/accord/messages/TxnRequest.java @@ -136,12 +136,18 @@ public abstract class TxnRequest<R> implements Request, PreLoadContext, MapReduc } @Override - public void process(Node on, Id replyTo, ReplyContext replyContext) + public void preProcess(Node on, Id replyTo, ReplyContext replyContext) { this.node = on; this.replyTo = replyTo; this.replyContext = replyContext; this.progressKey = progressKey(); // TODO (low priority, clarity): not every class that extends TxnRequest needs this set + } + + @Override + public void process(Node on, Id replyTo, ReplyContext replyContext) + { + preProcess(on, replyTo, replyContext); process(); } diff --git a/accord-core/src/main/java/accord/messages/WaitOnCommit.java b/accord-core/src/main/java/accord/messages/WaitOnCommit.java index ca52b69..9d0105d 100644 --- a/accord-core/src/main/java/accord/messages/WaitOnCommit.java +++ b/accord-core/src/main/java/accord/messages/WaitOnCommit.java @@ -70,11 +70,17 @@ public class WaitOnCommit implements Request, MapReduceConsume<SafeCommandStore, } @Override - public void process(Node node, Id replyToNode, ReplyContext replyContext) + public void preProcess(Node node, Id replyToNode, ReplyContext replyContext) { this.node = node; this.replyTo = replyToNode; this.replyContext = replyContext; + } + + @Override + public void process(Node node, Id replyToNode, ReplyContext replyContext) + { + preProcess(node, replyToNode, replyContext); node.mapReduceConsumeLocal(this, scope, txnId.epoch(), txnId.epoch(), this); } diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java index 40d4df0..787d9de 100644 --- a/accord-core/src/test/java/accord/Utils.java +++ b/accord-core/src/test/java/accord/Utils.java @@ -30,9 +30,10 @@ import com.google.common.collect.Sets; import accord.api.Key; import accord.api.MessageSink; import accord.api.Scheduler; +import accord.config.LocalConfig; +import accord.config.MutableLocalConfig; import accord.coordinate.TxnExecute; import accord.coordinate.TxnPersist; -import accord.config.LocalConfig; import accord.impl.InMemoryCommandStores; import accord.impl.IntKey; import accord.impl.SimpleProgressLog; @@ -48,8 +49,7 @@ import accord.local.Node; import accord.local.NodeTimeService; import accord.local.ShardDistributor; import accord.messages.Apply; -import accord.config.MutableLocalConfig; -import accord.messages.LocalMessage; +import accord.messages.LocalRequest; import accord.primitives.Keys; import accord.primitives.Range; import accord.primitives.Ranges; @@ -165,7 +165,7 @@ public class Utils LocalConfig localConfig = new MutableLocalConfig(); Node node = new Node(nodeId, messageSink, - LocalMessage::process, + LocalRequest::process, new MockConfigurationService(messageSink, EpochFunction.noop(), topology), clock, NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock), diff --git a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java index 01ea96d..aca0e48 100644 --- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java +++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java @@ -66,6 +66,12 @@ public class BurnTestConfigurationService extends AbstractConfigurationService.M this.epoch = epoch; } + @Override + public void preProcess(Node on, Node.Id from, ReplyContext replyContext) + { + // no-op + } + @Override public void process(Node on, Node.Id from, ReplyContext replyContext) { diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index a9dbcf5..6d1e226 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -37,8 +37,6 @@ import java.util.function.IntSupplier; import java.util.function.LongSupplier; import java.util.function.Supplier; -import accord.config.LocalConfig; -import accord.impl.MessageListener; import org.junit.jupiter.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,25 +45,27 @@ import accord.api.MessageSink; import accord.api.Scheduler; import accord.burn.BurnTestConfigurationService; import accord.burn.TopologyUpdates; +import accord.burn.random.FrequentLargeRange; +import accord.config.LocalConfig; +import accord.config.MutableLocalConfig; import accord.coordinate.TxnExecute; import accord.coordinate.TxnPersist; -import accord.burn.random.FrequentLargeRange; import accord.impl.CoordinateDurabilityScheduling; +import accord.impl.MessageListener; import accord.impl.PrefixedIntHashKey; import accord.impl.SimpleProgressLog; import accord.impl.SizeOfIntersectionSorter; import accord.impl.TopologyFactory; import accord.impl.list.ListStore; import accord.local.AgentExecutor; -import accord.local.Node; import accord.local.Node.Id; +import accord.local.Node; import accord.local.NodeTimeService; import accord.local.ShardDistributor; import accord.messages.Apply; -import accord.config.MutableLocalConfig; -import accord.messages.LocalMessage; -import accord.messages.MessageType; +import accord.messages.LocalRequest; import accord.messages.Message; +import accord.messages.MessageType; import accord.messages.Reply; import accord.messages.Request; import accord.messages.SafeCallback; @@ -292,7 +292,7 @@ public class Cluster implements Scheduler executorMap.put(id, nodeExecutor); BurnTestConfigurationService configService = new BurnTestConfigurationService(id, nodeExecutor, randomSupplier, topology, nodeMap::get, topologyUpdates); BooleanSupplier isLoadedCheck = random.biasedUniformBools(0.5f); - Node node = new Node(id, messageSink, LocalMessage::process, configService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier), + Node node = new Node(id, messageSink, LocalRequest::process, configService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier), () -> new ListStore(id), new ShardDistributor.EvenSplit<>(8, ignore -> new PrefixedIntHashKey.Splitter()), nodeExecutor.agent(), randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER, diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/test/java/accord/impl/list/ListRequest.java index 75713a0..cd58a8a 100644 --- a/accord-core/src/test/java/accord/impl/list/ListRequest.java +++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java @@ -238,6 +238,12 @@ public class ListRequest implements Request this.listener = listener; } + @Override + public void preProcess(Node node, Id client, ReplyContext replyContext) + { + // no-op + } + @Override public void process(Node node, Id client, ReplyContext replyContext) { diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java index 19bf3e6..199ec1a 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java +++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java @@ -50,7 +50,7 @@ import accord.local.ShardDistributor; import accord.messages.Apply; import accord.config.MutableLocalConfig; import accord.messages.Callback; -import accord.messages.LocalMessage; +import accord.messages.LocalRequest; import accord.messages.Reply; import accord.messages.Request; import accord.messages.SafeCallback; @@ -127,7 +127,7 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node> LocalConfig localConfig = new MutableLocalConfig(); Node node = new Node(id, messageSink, - LocalMessage::process, + LocalRequest::process, configurationService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier), diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java b/accord-core/src/test/java/accord/local/CommandsTest.java index c938981..6731a74 100644 --- a/accord-core/src/test/java/accord/local/CommandsTest.java +++ b/accord-core/src/test/java/accord/local/CommandsTest.java @@ -101,6 +101,12 @@ class CommandsTest cluster(rs::fork, nodes, initialTopology, nodeMap -> new Request() { + @Override + public void preProcess(Node on, Node.Id from, ReplyContext replyContext) + { + // no-op + } + @Override public void process(Node node, Node.Id from, ReplyContext replyContext) { diff --git a/accord-core/src/test/java/accord/utils/MessageTask.java b/accord-core/src/test/java/accord/utils/MessageTask.java index 528a6e4..d53128c 100644 --- a/accord-core/src/test/java/accord/utils/MessageTask.java +++ b/accord-core/src/test/java/accord/utils/MessageTask.java @@ -94,6 +94,12 @@ public class MessageTask extends AsyncResults.SettableResult<Void> implements Ru this.desc = desc; } + @Override + public void preProcess(Node on, Node.Id from, ReplyContext replyContext) + { + // no-op + } + @Override public void process(Node on, Node.Id from, ReplyContext replyContext) { diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java index f69e4ed..047240a 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java @@ -55,7 +55,7 @@ import accord.local.NodeTimeService; import accord.local.ShardDistributor; import accord.messages.Apply; import accord.messages.Callback; -import accord.messages.LocalMessage; +import accord.messages.LocalRequest; import accord.messages.Reply; import accord.messages.Reply.FailureReply; import accord.messages.ReplyContext; @@ -320,7 +320,7 @@ public class Cluster implements Scheduler MessageSink messageSink = sinks.create(node, randomSupplier.get()); LongSupplier nowSupplier = nowSupplierSupplier.get(); LocalConfig localConfig = new MutableLocalConfig(); - lookup.put(node, new Node(node, messageSink, LocalMessage::process, new SimpleConfigService(topology), + lookup.put(node, new Node(node, messageSink, LocalRequest::process, new SimpleConfigService(topology), nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, nowSupplier), MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()), MaelstromAgent.INSTANCE, diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java index f9aad09..5dfcf3e 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java @@ -46,6 +46,12 @@ public class MaelstromRequest extends Body implements Request this.txn = txn; } + @Override + public void preProcess(Node node, Id client, ReplyContext replyContext) + { + // no-op + } + @Override public void process(Node node, Id client, ReplyContext replyContext) { diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java index 5d2a459..c79b3b4 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java @@ -48,7 +48,7 @@ import accord.local.ShardDistributor; import accord.maelstrom.Packet.Type; import accord.messages.Apply; import accord.messages.Callback; -import accord.messages.LocalMessage; +import accord.messages.LocalRequest; import accord.messages.Reply; import accord.messages.Reply.FailureReply; import accord.messages.ReplyContext; @@ -179,7 +179,7 @@ public class Main topology = topologyFactory.toTopology(init.cluster); sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err); LocalConfig localConfig = new MutableLocalConfig(); - on = new Node(init.self, sink, LocalMessage::process, new SimpleConfigService(topology), + on = new Node(init.self, sink, LocalRequest::process, new SimpleConfigService(topology), System::currentTimeMillis, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, System::currentTimeMillis), MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()), MaelstromAgent.INSTANCE, new DefaultRandom(), scheduler, SizeOfIntersectionSorter.SUPPLIER, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org