bdeggleston commented on code in PR #4060: URL: https://github.com/apache/cassandra/pull/4060#discussion_r2033710929
########## src/java/org/apache/cassandra/replication/Shard.java: ########## @@ -52,7 +52,10 @@ public class Shard this.sinceEpoch = sinceEpoch; this.logs = new NonBlockingHashMapLong<>(); this.currentLocalLog = startNewLog(localHostId, logIdProvider.getAsInt(), participants); - logs.put(currentLocalLog.logId.asLong(), currentLocalLog); + long logId = currentLocalLog.logId.asLong(); + // We should never create a Shard for MutationId.none(), right? Review Comment: That' right, but the comment isn't needed ########## src/java/org/apache/cassandra/replication/TrackedWriteRequest.java: ########## @@ -65,45 +72,167 @@ public class TrackedWriteRequest { private static final Logger logger = LoggerFactory.getLogger(TrackedWriteRequest.class); + private final ForwardedWriteRequest.DirectAcknowledge ackTo; + + public TrackedWriteRequest(ForwardedWriteRequest.DirectAcknowledge ackTo) + { + this.ackTo = ackTo; + } + + public TrackedWriteRequest() + { + // Coordinator is a replica, so no need to acknowledge elsewhere + this.ackTo = null; + } + /** * Coordinate write of a tracked mutation. Assumes the replica is a coordinator. * * @param mutation the mutation to be applied * @param consistencyLevel the consistency level for the write operation * @param requestTime object holding times when request got enqueued and started execution */ - public static TrackedWriteResponseHandler perform( + public AbstractWriteResponseHandler<?> perform( Review Comment: can you revert these methods to static? They don't keep any state and there's no need to allocate this for each write. The `ackTo` field you added can just be passed into the methods that need it ########## src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java: ########## @@ -58,6 +64,16 @@ public void onResponse(Message<NoPayload> msg) if (msg != null) MutationTrackingService.instance.witnessedRemoteMutation(keyspace, token, mutationId, msg.from()); + // Local write needs to be ack'd to client-coordinator + if (msg == null && ackTo != null) Review Comment: would the forwarded write verb handler be a better place for this? ########## src/java/org/apache/cassandra/service/ForwardedWriteResponseHandler.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.exceptions.WriteFailureException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.NoPayload; +import org.apache.cassandra.transport.Dispatcher; + +// rm? Review Comment: yeah it seems like this just adds debug logging around response receipt? ########## src/java/org/apache/cassandra/replication/MutationTrackingService.java: ########## @@ -173,6 +173,7 @@ MutationId nextMutationId(Token token) void witnessedRemoteMutation(Token token, MutationId mutationId, InetAddressAndPort onHost) { + assert !mutationId.isNone(); Review Comment: This is a good idea. Can you add these checks (using Preconditions) to the other methods of this class that handle mutation ids and add checks, or if it makes sense, short circuit the method if the mutation id is none. ########## src/java/org/apache/cassandra/replication/ForwardedWriteRequest.java: ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.NodeProximity; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.service.ForwardedWriteResponseHandler; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.FBUtilities; + +public class ForwardedWriteRequest +{ + private static final Logger logger = LoggerFactory.getLogger(ForwardedWriteRequest.class); + + static class FanOutMessage + { + final Verb verb; + final Mutation mutation; + final Set<NodeId> recipients; + + public FanOutMessage(Verb verb, Mutation mutation, Set<NodeId> recipients) + { + this.verb = verb; + this.mutation = mutation; + this.recipients = recipients; + } + + private static final IVersionedSerializer<FanOutMessage> serializer = new IVersionedSerializer<FanOutMessage>() + { + @Override + public void serialize(FanOutMessage t, DataOutputPlus out, int version) throws IOException + { + Version v = Version.minCommonSerializationVersion(); + out.writeInt(t.verb.id); + Mutation.serializer.serialize(t.mutation, out, version); + out.writeInt(t.recipients.size()); + for (NodeId recipient : t.recipients) + NodeId.serializer.serialize(recipient, out, v); + } + + @Override + public FanOutMessage deserialize(DataInputPlus in, int version) throws IOException + { + Version v = Version.minCommonSerializationVersion(); + Verb verb = Verb.fromId(in.readInt()); + Mutation mutation = Mutation.serializer.deserialize(in, version); + int numRecipients = in.readInt(); + Set<NodeId> recipients = new HashSet<>(numRecipients); + for (int i = 0; i < numRecipients; i++) + recipients.add(NodeId.serializer.deserialize(in, v)); + return new FanOutMessage(verb, mutation, recipients); + } + + @Override + public long serializedSize(FanOutMessage t, int version) + { + long size = 0; + Version v = Version.minCommonSerializationVersion(); + size += TypeSizes.INT_SIZE; + size += Mutation.serializer.serializedSize(t.mutation, version); + size += TypeSizes.INT_SIZE; + for (NodeId recipient : t.recipients) + size += NodeId.serializer.serializedSize(recipient, v); + return size; + } + }; + } + + // For now, just supporting a single mutation to multiple recipients. This will develop in the future for different + // kinds of mutations that each go to different recipients (see PaxosCommit). + FanOutMessage message; + + private ForwardedWriteRequest(FanOutMessage message) + { + this.message = message; + } + + public ForwardedWriteRequest(Verb verb, Mutation mutation, ReplicaPlan.ForWrite plan) + { + ClusterMetadata cm = ClusterMetadata.current(); + Set<NodeId> recipients = new HashSet<>(plan.liveAndDown().size()); + for (Replica replica : plan.liveAndDown()) + recipients.add(cm.directory.peerId(replica.endpoint())); + this.message = new FanOutMessage(verb, mutation, recipients); + } + + private Replica getLeader() + { + NodeProximity proximity = DatabaseDescriptor.getNodeProximity(); + ClusterMetadata cm = ClusterMetadata.current(); + Token token = message.mutation.key().getToken(); + Keyspace keyspace = Keyspace.open(message.mutation.getKeyspaceName()); + EndpointsForRange endpoints = cm.placements.get(keyspace.getMetadata().params.replication).writes.forRange(token).get(); + logger.debug("Finding best leader from replicas {}", endpoints); + + // TODO: Should match ReplicaPlans.findCounterLeaderReplica, including DC-local priority, current health, severity, etc. + return proximity.sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), endpoints).get(0); + } + + public void sendToLeader(ForwardedWriteResponseHandler handler) + { + Replica leader = getLeader(); + logger.debug("Selected {} as leader for mutation with key {}", leader.endpoint(), message.mutation.key()); + Token token = message.mutation.key().getToken(); + Keyspace keyspace = Keyspace.open(message.mutation.getKeyspaceName()); + EndpointsForRange endpoints = ClusterMetadata.current().placements.get(keyspace.getMetadata().params.replication).writes.forRange(token).get(); + + // Add callbacks for replicas to respond directly to coordinator + Message<ForwardedWriteRequest> toLeader = Message.out(Verb.FORWARDING_WRITE, this); + for (Replica endpoint : endpoints) + { + logger.debug("Adding forwarding callback for response from {} id {}", endpoint, toLeader.id()); + MessagingService.instance().callbacks.addWithExpiration(handler, toLeader, endpoint); + } + + MessagingService.instance().send(toLeader, leader.endpoint()); + } + + private void executeOnLeader(DirectAcknowledge ackTo) + { + new TrackedWriteRequest(ackTo).performForwarding(this); + } + + public static final Serializer serializer = new Serializer(); + + public static class Serializer implements IVersionedSerializer<ForwardedWriteRequest> + { + @Override + public void serialize(ForwardedWriteRequest request, DataOutputPlus out, int version) throws IOException + { + FanOutMessage.serializer.serialize(request.message, out, version); + } + + @Override + public ForwardedWriteRequest deserialize(DataInputPlus in, int version) throws IOException + { + FanOutMessage message = FanOutMessage.serializer.deserialize(in, version); + return new ForwardedWriteRequest(message); + } + + @Override + public long serializedSize(ForwardedWriteRequest request, int version) + { + return FanOutMessage.serializer.serializedSize(request.message, version); + } + } + + public static final VerbHandler verbHandler = new VerbHandler(); + + public static class VerbHandler implements IVerbHandler<ForwardedWriteRequest> + { + @Override + public void doVerb(Message<ForwardedWriteRequest> incoming) + { + logger.debug("Received incoming ForwardedWriteRequest {} id {}", incoming, incoming.id()); + Mutation mutation = incoming.payload.message.mutation; + assert mutation.id().isNone(); + + Stage.REQUEST_RESPONSE.submit(() -> { + // Once we support epoch changes, check epoch from coordinator here, after potential queueing on the Stage + try + { + incoming.payload.executeOnLeader(DirectAcknowledge.toCoordinator(incoming.from(), incoming.id())); + } + catch (Exception e) + { + logger.error("Exception while executing {} on leader", this, e); + MessagingService.instance().respondWithFailure(RequestFailureReason.UNKNOWN, incoming); + } + }); + } + } + + public static class DirectAcknowledge Review Comment: wdyt about naming this `CoordinatorResponseInfo` or `ReplicaCoordinatorResponseInfo`? DirectAcknowledge is a little ambiguous (DirectAcknowledgementInfo) would also work ########## src/java/org/apache/cassandra/replication/ForwardedWriteHandler.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.NoPayload; +import org.apache.cassandra.net.RequestCallback; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.transport.Dispatcher; +import org.apache.cassandra.utils.FBUtilities; + +// See org.apache.cassandra.service.TrackedWriteResponseHandler.onResponse, this class should probably merge with that one +public abstract class ForwardedWriteHandler implements RequestCallback<NoPayload> Review Comment: I think this might be clearer if the forwarded write messaging stuff (request/callback/response/verbHandler/serializers) were all nested under a common ForwardedWrite class. That's a pretty common pattern and keeps the logic for that part of the request flow in a single place. Typically the callback will be the top level class since it has the most state, the verbHandler and serializers are static anonymous implementations, and then you have static inner `Request` and `Response` classes, each with their own inline serializers. ########## src/java/org/apache/cassandra/replication/ForwardedWriteRequest.java: ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.NodeProximity; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.service.ForwardedWriteResponseHandler; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.FBUtilities; + +public class ForwardedWriteRequest +{ + private static final Logger logger = LoggerFactory.getLogger(ForwardedWriteRequest.class); + + static class FanOutMessage + { + final Verb verb; + final Mutation mutation; + final Set<NodeId> recipients; + + public FanOutMessage(Verb verb, Mutation mutation, Set<NodeId> recipients) + { + this.verb = verb; + this.mutation = mutation; + this.recipients = recipients; + } + + private static final IVersionedSerializer<FanOutMessage> serializer = new IVersionedSerializer<FanOutMessage>() + { + @Override + public void serialize(FanOutMessage t, DataOutputPlus out, int version) throws IOException + { + Version v = Version.minCommonSerializationVersion(); + out.writeInt(t.verb.id); + Mutation.serializer.serialize(t.mutation, out, version); + out.writeInt(t.recipients.size()); + for (NodeId recipient : t.recipients) + NodeId.serializer.serialize(recipient, out, v); + } + + @Override + public FanOutMessage deserialize(DataInputPlus in, int version) throws IOException + { + Version v = Version.minCommonSerializationVersion(); + Verb verb = Verb.fromId(in.readInt()); + Mutation mutation = Mutation.serializer.deserialize(in, version); + int numRecipients = in.readInt(); + Set<NodeId> recipients = new HashSet<>(numRecipients); + for (int i = 0; i < numRecipients; i++) + recipients.add(NodeId.serializer.deserialize(in, v)); + return new FanOutMessage(verb, mutation, recipients); + } + + @Override + public long serializedSize(FanOutMessage t, int version) + { + long size = 0; + Version v = Version.minCommonSerializationVersion(); + size += TypeSizes.INT_SIZE; + size += Mutation.serializer.serializedSize(t.mutation, version); + size += TypeSizes.INT_SIZE; + for (NodeId recipient : t.recipients) + size += NodeId.serializer.serializedSize(recipient, v); + return size; + } + }; + } + + // For now, just supporting a single mutation to multiple recipients. This will develop in the future for different + // kinds of mutations that each go to different recipients (see PaxosCommit). + FanOutMessage message; + + private ForwardedWriteRequest(FanOutMessage message) + { + this.message = message; + } + + public ForwardedWriteRequest(Verb verb, Mutation mutation, ReplicaPlan.ForWrite plan) + { + ClusterMetadata cm = ClusterMetadata.current(); + Set<NodeId> recipients = new HashSet<>(plan.liveAndDown().size()); + for (Replica replica : plan.liveAndDown()) + recipients.add(cm.directory.peerId(replica.endpoint())); + this.message = new FanOutMessage(verb, mutation, recipients); + } + + private Replica getLeader() + { + NodeProximity proximity = DatabaseDescriptor.getNodeProximity(); + ClusterMetadata cm = ClusterMetadata.current(); + Token token = message.mutation.key().getToken(); + Keyspace keyspace = Keyspace.open(message.mutation.getKeyspaceName()); + EndpointsForRange endpoints = cm.placements.get(keyspace.getMetadata().params.replication).writes.forRange(token).get(); + logger.debug("Finding best leader from replicas {}", endpoints); + + // TODO: Should match ReplicaPlans.findCounterLeaderReplica, including DC-local priority, current health, severity, etc. + return proximity.sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), endpoints).get(0); + } + + public void sendToLeader(ForwardedWriteResponseHandler handler) + { + Replica leader = getLeader(); + logger.debug("Selected {} as leader for mutation with key {}", leader.endpoint(), message.mutation.key()); + Token token = message.mutation.key().getToken(); + Keyspace keyspace = Keyspace.open(message.mutation.getKeyspaceName()); + EndpointsForRange endpoints = ClusterMetadata.current().placements.get(keyspace.getMetadata().params.replication).writes.forRange(token).get(); + + // Add callbacks for replicas to respond directly to coordinator + Message<ForwardedWriteRequest> toLeader = Message.out(Verb.FORWARDING_WRITE, this); + for (Replica endpoint : endpoints) + { + logger.debug("Adding forwarding callback for response from {} id {}", endpoint, toLeader.id()); + MessagingService.instance().callbacks.addWithExpiration(handler, toLeader, endpoint); + } + + MessagingService.instance().send(toLeader, leader.endpoint()); + } + + private void executeOnLeader(DirectAcknowledge ackTo) + { + new TrackedWriteRequest(ackTo).performForwarding(this); + } + + public static final Serializer serializer = new Serializer(); + + public static class Serializer implements IVersionedSerializer<ForwardedWriteRequest> + { + @Override + public void serialize(ForwardedWriteRequest request, DataOutputPlus out, int version) throws IOException + { + FanOutMessage.serializer.serialize(request.message, out, version); + } + + @Override + public ForwardedWriteRequest deserialize(DataInputPlus in, int version) throws IOException + { + FanOutMessage message = FanOutMessage.serializer.deserialize(in, version); + return new ForwardedWriteRequest(message); + } + + @Override + public long serializedSize(ForwardedWriteRequest request, int version) + { + return FanOutMessage.serializer.serializedSize(request.message, version); + } + } + + public static final VerbHandler verbHandler = new VerbHandler(); + + public static class VerbHandler implements IVerbHandler<ForwardedWriteRequest> + { + @Override + public void doVerb(Message<ForwardedWriteRequest> incoming) + { + logger.debug("Received incoming ForwardedWriteRequest {} id {}", incoming, incoming.id()); + Mutation mutation = incoming.payload.message.mutation; + assert mutation.id().isNone(); + + Stage.REQUEST_RESPONSE.submit(() -> { Review Comment: I don't think this work should be passed to the REQUEST_RESPONSE stage. `executeOnLeader` applies the mutation, so this should stay in the mutation stage where this verb is handled. As long as we're not blocking on responses, I don't think there's a reason we can't send the relevant messages from the mutation stage either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org