aweisberg commented on code in PR #80: URL: https://github.com/apache/cassandra-accord/pull/80#discussion_r1500847251
########## accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.coordinate; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import accord.api.Result; +import accord.coordinate.tracking.QuorumTracker; +import accord.local.CommandStore; +import accord.local.Node; +import accord.messages.Callback; +import accord.messages.GetEphemeralReadDeps; +import accord.messages.GetEphemeralReadDeps.GetEphemeralReadDepsOk; +import accord.primitives.Deps; +import accord.primitives.FullRoute; +import accord.primitives.Txn; +import accord.primitives.TxnId; +import accord.topology.Topologies; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +import static accord.coordinate.tracking.RequestStatus.Failed; +import static accord.coordinate.tracking.RequestStatus.Success; + +public class CoordinateEphemeralRead extends AbstractCoordinatePreAccept<Result, GetEphemeralReadDepsOk> +{ + public static AsyncResult<Result> coordinate(Node node, FullRoute<?> route, TxnId txnId, Txn txn) + { + TopologyMismatch mismatch = TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(txnId.epoch()), txnId, route.homeKey(), txn.keys()); + if (mismatch != null) + return AsyncResults.failure(mismatch); + + Topologies topologies = node.topology().withUnsyncedEpochs(route, txnId, txnId); + CoordinateEphemeralRead coordinate = new CoordinateEphemeralRead(node, topologies, route, txnId, txn); + coordinate.start(); + return coordinate; + } + + private final QuorumTracker tracker; + private final List<GetEphemeralReadDepsOk> oks; + private long executeAtEpoch; + + CoordinateEphemeralRead(Node node, Topologies topologies, FullRoute<?> route, TxnId txnId, Txn txn) + { + super(node, route, txnId, txn); + this.tracker = new QuorumTracker(topologies); + this.executeAtEpoch = txnId.epoch(); + this.oks = new ArrayList<>(topologies.estimateUniqueNodes()); + } + + @Override + void contact(Set<Node.Id> nodes, Topologies topologies, Callback<GetEphemeralReadDepsOk> callback) + { + CommandStore commandStore = CommandStore.maybeCurrent(); + if (commandStore == null) commandStore = node.commandStores().select(route.homeKey()); + node.send(nodes, to -> new GetEphemeralReadDeps(to, topologies, route, txnId, txn.keys(), executeAtEpoch), commandStore, callback); + } + + @Override + long executeAtEpoch() + { + return executeAtEpoch; + } + + @Override + public void onSuccessInternal(Node.Id from, GetEphemeralReadDepsOk ok) + { + oks.add(ok); + if (ok.latestEpoch > executeAtEpoch) + executeAtEpoch = ok.latestEpoch; + + if (tracker.recordSuccess(from) == Success) + onPreAcceptedOrNewEpoch(); + } + + @Override + boolean onExtraSuccessInternal(Node.Id from, GetEphemeralReadDepsOk ok) + { + if (ok.latestEpoch > executeAtEpoch) + executeAtEpoch = ok.latestEpoch; + + oks.add(ok); + return true; + } + + @Override + public void onFailureInternal(Node.Id from, Throwable failure) + { + if (tracker.recordFailure(from) == Failed) + setFailure(new Timeout(txnId, route.homeKey())); + } + + @Override + void onNewEpochTopologyMismatch(TopologyMismatch mismatch) + { + accept(null, mismatch); + } + + @Override + void onPreAccepted(Topologies topologies) + { + Deps deps = Deps.merge(oks, ok -> ok.deps); + new ExecuteEphemeralRead(node, topologies, route, txnId, txn, executeAtEpoch, deps, this).start(); Review Comment: It's here https://github.com/apache/cassandra/blob/00172b5fa9a935897a30b918964c2eaa4ccfd556/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java#L155 Thinking through this again. An ephemeral read needs to not read something already migrated back to Paxos, or read something not yet migrated to Accord. In Paxos -> Accord I think this works because there are no ephemeral reads so the interop checks at prepare/propose should catch it. ### Accord -> Paxos In Accord -> Paxos a coordinator that doesn't know about the migration can create an ephemeral read and get it accepted because Accord doesn't know about migration only the integration does and the integration only checks when the read actually tries to run. The current approach just assumes that all keys are no longer readable/writable in the migration epoch regardless of whether the key has been migrated. Because ephemeral reads aren't visible to other transactions (such as the barrier migrating the key to Paxos) the ephemeral read could run after the key is migrated to Paxos. It could run at any time really so it could end up executing in a later epoch even if the `executeAt` is in an earlier epoch. However since it is a single key and single replica read maybe we can validate that the ephemeral read is safe in the integration by checking if a Paxos write occurred? It would need to block other Paxos writes while it reads the key. This is basically substituting for the ephemeral read not blocking other txns inside Accord. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

