Github user jolynch commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/212#discussion_r231696207
  
    --- Diff: 
src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java ---
    @@ -48,81 +51,133 @@
     {
         private static final Logger logger = 
LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
     
    -    private final int targetPercent;
    +    private final boolean blockForRemoteDcs;
         private final long timeoutNanos;
     
    -    public static StartupClusterConnectivityChecker create(int 
targetPercent, int timeoutSecs)
    +    public static StartupClusterConnectivityChecker create(long 
timeoutSecs, boolean blockForRemoteDcs)
         {
    -        timeoutSecs = Math.max(1, timeoutSecs);
    +        if (timeoutSecs < 0)
    +            logger.warn("skipping block-for-peers due to negative timeout. 
You may encounter errors or timeouts on" +
    +                        " the first user query");
             if (timeoutSecs > 100)
                 logger.warn("setting the block-for-peers timeout (in seconds) 
to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
             long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
     
    -        return new StartupClusterConnectivityChecker(targetPercent, 
timeoutNanos);
    +        return new StartupClusterConnectivityChecker(timeoutNanos, 
blockForRemoteDcs);
         }
     
         @VisibleForTesting
    -    StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos)
    +    StartupClusterConnectivityChecker(long timeoutNanos, boolean 
blockForRemoteDcs)
         {
    -        this.targetPercent = Math.min(100, Math.max(0, targetPercent));
    +        this.blockForRemoteDcs = blockForRemoteDcs;
             this.timeoutNanos = timeoutNanos;
         }
     
         /**
          * @param peers The currently known peers in the cluster; argument is 
not modified.
    +     * @param getDatacenterSource A function for mapping peers to their 
datacenter.
          * @return true if the requested percentage of peers are marked ALIVE 
in gossip and have their connections opened;
          * else false.
          */
    -    public boolean execute(Set<InetAddressAndPort> peers)
    +    public boolean execute(Set<InetAddressAndPort> peers, 
Function<InetAddressAndPort, String> getDatacenterSource)
         {
    -        if (targetPercent == 0 || peers == null)
    +        if (peers == null || this.timeoutNanos < 0)
                 return true;
     
             // make a copy of the set, to avoid mucking with the input (in 
case it's a sensitive collection)
             peers = new HashSet<>(peers);
    -        peers.remove(FBUtilities.getBroadcastAddressAndPort());
    +        InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
    +        String localDc = getDatacenterSource.apply(localAddress);
     
    +        peers.remove(localAddress);
             if (peers.isEmpty())
                 return true;
     
    -        logger.info("choosing to block until {}% of the {} known peers are 
marked alive and connections are established; max time to wait = {} seconds",
    -                    targetPercent, peers.size(), 
TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
    +        // make a copy of the datacenter mapping (in case gossip updates 
happen during this method or some such)
    +        Map<InetAddressAndPort, String> datacenterMap = peers.stream()
    +                                                             
.collect(Collectors.toMap(k -> k, getDatacenterSource));
    +        Function<InetAddressAndPort, String> getDatacenter = 
datacenterMap::get;
     
    -        long startNanos = System.nanoTime();
    +        Map<String, Set<InetAddressAndPort>> peersByDc = peers.stream()
    +                                                              
.collect(Collectors.groupingBy(getDatacenter,
    +                                                                           
                  Collectors.toSet()));
    +
    +        if (!blockForRemoteDcs)
    +        {
    +            peersByDc.keySet().retainAll(Collections.singleton(localDc));
    +            logger.info("Blocking coordination until only a single peer is 
DOWN in the local datacenter, timeout={}s",
    +                        TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
    +        }
    +        else
    +        {
    +            logger.info("Blocking coordination until only a single peer is 
DOWN in each datacenter, timeout={}s",
    +                        TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
    +        }
     
             AckMap acks = new AckMap(3);
    -        int target = (int) ((targetPercent / 100.0) * peers.size());
    -        CountDownLatch latch = new CountDownLatch(target);
    +        Map<String, CountDownLatch> latchMap = new 
HashMap<>(peersByDc.size());
    +        for (Map.Entry<String, Set<InetAddressAndPort>> entry: 
peersByDc.entrySet())
    +        {
    +            latchMap.put(entry.getKey(), new 
CountDownLatch(Math.max(entry.getValue().size() - 1, 0)));
    +        }
    +
    +        long startNanos = System.nanoTime();
     
             // set up a listener to react to new nodes becoming alive (in 
gossip), and account for all the nodes that are already alive
    -        Set<InetAddressAndPort> alivePeers = Sets.newSetFromMap(new 
ConcurrentHashMap<>());
    -        AliveListener listener = new AliveListener(alivePeers, latch, 
acks);
    +        Set<InetAddressAndPort> alivePeers = Collections.newSetFromMap(new 
ConcurrentHashMap<>());
    +        AliveListener listener = new AliveListener(alivePeers, latchMap, 
acks, getDatacenter);
             Gossiper.instance.register(listener);
     
             // send out a ping message to open up the non-gossip connections
    -        sendPingMessages(peers, latch, acks);
    +        sendPingMessages(peers, latchMap, acks, getDatacenter);
    --- End diff --
    
    Done.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to