Github user aweisberg commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/224#discussion_r188452249
--- Diff: src/java/org/apache/cassandra/service/StorageProxy.java ---
@@ -1364,68 +1363,72 @@ public static void sendToHintedEndpoints(final
Mutation mutation,
submitHint(mutation, endpointsToHint, responseHandler);
if (insertLocal)
- performLocally(stage, Optional.of(mutation), mutation::apply,
responseHandler);
+ {
+ Preconditions.checkNotNull(localReplica);
+ performLocally(stage, localReplica, Optional.of(mutation),
mutation::apply, responseHandler);
+ }
if (localDc != null)
{
- for (InetAddressAndPort destination : localDc)
- MessagingService.instance().sendRR(message, destination,
responseHandler, true);
+ for (Replica destination : localDc)
+ MessagingService.instance().sendWriteRR(message,
destination, responseHandler, true);
}
if (dcGroups != null)
{
// for each datacenter, send the message to one node to relay
the write to other replicas
- for (Collection<InetAddressAndPort> dcTargets :
dcGroups.values())
+ for (Replicas dcTargets : dcGroups.values())
sendMessagesToNonlocalDC(message, dcTargets,
responseHandler);
}
}
- private static void checkHintOverload(InetAddressAndPort destination)
+ private static void checkHintOverload(Replica destination)
{
// avoid OOMing due to excess hints. we need to do this check
even for "live" nodes, since we can
// still generate hints for those if it's overloaded or simply
dead but not yet known-to-be-dead.
// The idea is that if we have over maxHintsInProgress hints in
flight, this is probably due to
// a small number of nodes causing problems, so we should avoid
shutting down writes completely to
// healthy nodes. Any node with no hintsInProgress is considered
healthy.
if (StorageMetrics.totalHintsInProgress.getCount() >
maxHintsInProgress
- && (getHintsInProgressFor(destination).get() > 0 &&
shouldHint(destination)))
+ && (getHintsInProgressFor(destination.getEndpoint()).get()
> 0 && shouldHint(destination)))
{
throw new OverloadedException("Too many in flight hints: " +
StorageMetrics.totalHintsInProgress.getCount() +
" destination: " + destination +
- " destination hints: " +
getHintsInProgressFor(destination).get());
+ " destination hints: " +
getHintsInProgressFor(destination.getEndpoint()).get());
}
}
private static void sendMessagesToNonlocalDC(MessageOut<? extends
IMutation> message,
-
Collection<InetAddressAndPort> targets,
+ Replicas targets,
AbstractWriteResponseHandler<IMutation> handler)
{
- Iterator<InetAddressAndPort> iter = targets.iterator();
+ Iterator<Replica> iter = targets.iterator();
int[] messageIds = new int[targets.size()];
- InetAddressAndPort target = iter.next();
+ Replica target = iter.next();
int idIdx = 0;
// Add the other destinations of the same message as a
FORWARD_HEADER entry
while (iter.hasNext())
{
- InetAddressAndPort destination = iter.next();
- int id = MessagingService.instance().addCallback(handler,
- message,
- destination,
-
message.getTimeout(),
-
handler.consistencyLevel,
- true);
+ Replica destination = iter.next();
+ int id = MessagingService.instance().addWriteCallback(handler,
+ message,
+
destination,
+
message.getTimeout(),
+
handler.consistencyLevel,
+ true);
messageIds[idIdx++] = id;
logger.trace("Adding FWD message to {}@{}", id, destination);
}
- message =
message.withParameter(ParameterType.FORWARD_TO.FORWARD_TO, new
ForwardToContainer(targets, messageIds));
+ Replicas.checkFull(targets);
+ message =
message.withParameter(ParameterType.FORWARD_TO.FORWARD_TO, new
ForwardToContainer(targets.asEndpointList(), messageIds));
--- End diff --
A candidate for lazily converting.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]