Github user ifesdjeen commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/224#discussion_r197136522
  
    --- Diff: src/java/org/apache/cassandra/service/StorageService.java ---
    @@ -4231,53 +4211,53 @@ private void 
calculateToFromStreams(Collection<Token> newTokens, List<String> ke
                 InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
                 IEndpointSnitch snitch = 
DatabaseDescriptor.getEndpointSnitch();
                 TokenMetadata tokenMetaCloneAllSettled = 
tokenMetadata.cloneAfterAllSettled();
    -            // clone to avoid concurrent modification in 
calculateNaturalEndpoints
    +            // clone to avoid concurrent modification in 
calculateNaturalReplicas
                 TokenMetadata tokenMetaClone = 
tokenMetadata.cloneOnlyTokenMap();
     
                 for (String keyspace : keyspaceNames)
                 {
                     // replication strategy of the current keyspace
                     AbstractReplicationStrategy strategy = 
Keyspace.open(keyspace).getReplicationStrategy();
    -                Multimap<InetAddressAndPort, Range<Token>> 
endpointToRanges = strategy.getAddressRanges();
    +                ReplicaMultimap<InetAddressAndPort, ReplicaSet> 
endpointToRanges = strategy.getAddressReplicas();
     
                     logger.debug("Calculating ranges to stream and request for 
keyspace {}", keyspace);
                     for (Token newToken : newTokens)
                     {
                         // getting collection of the currently used ranges by 
this keyspace
    -                    Collection<Range<Token>> currentRanges = 
endpointToRanges.get(localAddress);
    +                    ReplicaSet currentReplicas = 
endpointToRanges.get(localAddress);
                         // collection of ranges which this node will serve 
after move to the new token
    -                    Collection<Range<Token>> updatedRanges = 
strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
    +                    ReplicaSet updatedReplicas = 
strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
     
                         // ring ranges and endpoints associated with them
                         // this used to determine what nodes should we ping 
about range data
    -                    Multimap<Range<Token>, InetAddressAndPort> 
rangeAddresses = strategy.getRangeAddresses(tokenMetaClone);
    +                    ReplicaMultimap<Range<Token>, ReplicaSet> 
rangeAddresses = strategy.getRangeAddresses(tokenMetaClone);
     
                         // calculated parts of the ranges to request/stream 
from/to nodes in the ring
    -                    Pair<Set<Range<Token>>, Set<Range<Token>>> 
rangesPerKeyspace = calculateStreamAndFetchRanges(currentRanges, updatedRanges);
    +                    Pair<Set<Range<Token>>, Set<Range<Token>>> 
rangesPerKeyspace = calculateStreamAndFetchRanges(currentReplicas, 
updatedReplicas);
     
                         /**
                          * In this loop we are going through all ranges "to 
fetch" and determining
                          * nodes in the ring responsible for data we are 
interested in
                          */
    -                    Multimap<Range<Token>, InetAddressAndPort> 
rangesToFetchWithPreferredEndpoints = ArrayListMultimap.create();
    +                    ReplicaMultimap<Range<Token>, ReplicaList> 
rangesToFetchWithPreferredEndpoints = ReplicaMultimap.list();
                         for (Range<Token> toFetch : rangesPerKeyspace.right)
                         {
                             for (Range<Token> range : rangeAddresses.keySet())
                             {
                                 if (range.contains(toFetch))
                                 {
    -                                List<InetAddressAndPort> endpoints = null;
    +                                ReplicaList endpoints = null;
     
                                     if (useStrictConsistency)
                                     {
    -                                    Set<InetAddressAndPort> oldEndpoints = 
Sets.newHashSet(rangeAddresses.get(range));
    -                                    Set<InetAddressAndPort> newEndpoints = 
Sets.newHashSet(strategy.calculateNaturalEndpoints(toFetch.right, 
tokenMetaCloneAllSettled));
    +                                    ReplicaSet oldEndpoints = new 
ReplicaSet(rangeAddresses.get(range));
    --- End diff --
    
    Might be better to filter on `oldEndpoints` rather than remove from them 
(esp. since we already copy)
    
    Additional benefit would be if we ever decide to switch to immutable 
interface, we'll have to do this anyways.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to