Github user bdeggleston commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/224#discussion_r189117028
  
    --- Diff: src/java/org/apache/cassandra/service/StorageProxy.java ---
    @@ -1364,68 +1363,72 @@ public static void sendToHintedEndpoints(final 
Mutation mutation,
                 submitHint(mutation, endpointsToHint, responseHandler);
     
             if (insertLocal)
    -            performLocally(stage, Optional.of(mutation), mutation::apply, 
responseHandler);
    +        {
    +            Preconditions.checkNotNull(localReplica);
    +            performLocally(stage, localReplica, Optional.of(mutation), 
mutation::apply, responseHandler);
    +        }
     
             if (localDc != null)
             {
    -            for (InetAddressAndPort destination : localDc)
    -                MessagingService.instance().sendRR(message, destination, 
responseHandler, true);
    +            for (Replica destination : localDc)
    +                MessagingService.instance().sendWriteRR(message, 
destination, responseHandler, true);
             }
             if (dcGroups != null)
             {
                 // for each datacenter, send the message to one node to relay 
the write to other replicas
    -            for (Collection<InetAddressAndPort> dcTargets : 
dcGroups.values())
    +            for (Replicas dcTargets : dcGroups.values())
                     sendMessagesToNonlocalDC(message, dcTargets, 
responseHandler);
             }
         }
     
    -    private static void checkHintOverload(InetAddressAndPort destination)
    +    private static void checkHintOverload(Replica destination)
         {
             // avoid OOMing due to excess hints.  we need to do this check 
even for "live" nodes, since we can
             // still generate hints for those if it's overloaded or simply 
dead but not yet known-to-be-dead.
             // The idea is that if we have over maxHintsInProgress hints in 
flight, this is probably due to
             // a small number of nodes causing problems, so we should avoid 
shutting down writes completely to
             // healthy nodes.  Any node with no hintsInProgress is considered 
healthy.
             if (StorageMetrics.totalHintsInProgress.getCount() > 
maxHintsInProgress
    -                && (getHintsInProgressFor(destination).get() > 0 && 
shouldHint(destination)))
    +                && (getHintsInProgressFor(destination.getEndpoint()).get() 
> 0 && shouldHint(destination)))
             {
                 throw new OverloadedException("Too many in flight hints: " + 
StorageMetrics.totalHintsInProgress.getCount() +
                                               " destination: " + destination +
    -                                          " destination hints: " + 
getHintsInProgressFor(destination).get());
    +                                          " destination hints: " + 
getHintsInProgressFor(destination.getEndpoint()).get());
             }
         }
     
         private static void sendMessagesToNonlocalDC(MessageOut<? extends 
IMutation> message,
    -                                                 
Collection<InetAddressAndPort> targets,
    +                                                 Replicas targets,
                                                      
AbstractWriteResponseHandler<IMutation> handler)
         {
    -        Iterator<InetAddressAndPort> iter = targets.iterator();
    +        Iterator<Replica> iter = targets.iterator();
             int[] messageIds = new int[targets.size()];
    -        InetAddressAndPort target = iter.next();
    +        Replica target = iter.next();
     
             int idIdx = 0;
             // Add the other destinations of the same message as a 
FORWARD_HEADER entry
             while (iter.hasNext())
             {
    -            InetAddressAndPort destination = iter.next();
    -            int id = MessagingService.instance().addCallback(handler,
    -                                                             message,
    -                                                             destination,
    -                                                             
message.getTimeout(),
    -                                                             
handler.consistencyLevel,
    -                                                             true);
    +            Replica destination = iter.next();
    +            int id = MessagingService.instance().addWriteCallback(handler,
    +                                                                  message,
    +                                                                  
destination,
    +                                                                  
message.getTimeout(),
    +                                                                  
handler.consistencyLevel,
    +                                                                  true);
                 messageIds[idIdx++] = id;
                 logger.trace("Adding FWD message to {}@{}", id, destination);
             }
    -        message = 
message.withParameter(ParameterType.FORWARD_TO.FORWARD_TO, new 
ForwardToContainer(targets, messageIds));
    +        Replicas.checkFull(targets);
    +        message = 
message.withParameter(ParameterType.FORWARD_TO.FORWARD_TO, new 
ForwardToContainer(targets.asEndpointList(), messageIds));
    --- End diff --
    
    fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to