Github user jolynch commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/212#discussion_r231693230
--- Diff:
src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java ---
@@ -48,81 +51,133 @@
{
private static final Logger logger =
LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
- private final int targetPercent;
+ private final boolean blockForRemoteDcs;
private final long timeoutNanos;
- public static StartupClusterConnectivityChecker create(int
targetPercent, int timeoutSecs)
+ public static StartupClusterConnectivityChecker create(long
timeoutSecs, boolean blockForRemoteDcs)
{
- timeoutSecs = Math.max(1, timeoutSecs);
+ if (timeoutSecs < 0)
+ logger.warn("skipping block-for-peers due to negative timeout.
You may encounter errors or timeouts on" +
+ " the first user query");
if (timeoutSecs > 100)
logger.warn("setting the block-for-peers timeout (in seconds)
to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
- return new StartupClusterConnectivityChecker(targetPercent,
timeoutNanos);
+ return new StartupClusterConnectivityChecker(timeoutNanos,
blockForRemoteDcs);
}
@VisibleForTesting
- StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos)
+ StartupClusterConnectivityChecker(long timeoutNanos, boolean
blockForRemoteDcs)
{
- this.targetPercent = Math.min(100, Math.max(0, targetPercent));
+ this.blockForRemoteDcs = blockForRemoteDcs;
this.timeoutNanos = timeoutNanos;
}
/**
* @param peers The currently known peers in the cluster; argument is
not modified.
+ * @param getDatacenterSource A function for mapping peers to their
datacenter.
* @return true if the requested percentage of peers are marked ALIVE
in gossip and have their connections opened;
* else false.
*/
- public boolean execute(Set<InetAddressAndPort> peers)
+ public boolean execute(Set<InetAddressAndPort> peers,
Function<InetAddressAndPort, String> getDatacenterSource)
{
- if (targetPercent == 0 || peers == null)
+ if (peers == null || this.timeoutNanos < 0)
return true;
// make a copy of the set, to avoid mucking with the input (in
case it's a sensitive collection)
peers = new HashSet<>(peers);
- peers.remove(FBUtilities.getBroadcastAddressAndPort());
+ InetAddressAndPort localAddress =
FBUtilities.getBroadcastAddressAndPort();
+ String localDc = getDatacenterSource.apply(localAddress);
+ peers.remove(localAddress);
if (peers.isEmpty())
return true;
- logger.info("choosing to block until {}% of the {} known peers are
marked alive and connections are established; max time to wait = {} seconds",
- targetPercent, peers.size(),
TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+ // make a copy of the datacenter mapping (in case gossip updates
happen during this method or some such)
+ Map<InetAddressAndPort, String> datacenterMap = peers.stream()
+
.collect(Collectors.toMap(k -> k, getDatacenterSource));
+ Function<InetAddressAndPort, String> getDatacenter =
datacenterMap::get;
- long startNanos = System.nanoTime();
+ Map<String, Set<InetAddressAndPort>> peersByDc = peers.stream()
+
.collect(Collectors.groupingBy(getDatacenter,
+
Collectors.toSet()));
+
+ if (!blockForRemoteDcs)
+ {
+ peersByDc.keySet().retainAll(Collections.singleton(localDc));
+ logger.info("Blocking coordination until only a single peer is
DOWN in the local datacenter, timeout={}s",
+ TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+ }
+ else
+ {
+ logger.info("Blocking coordination until only a single peer is
DOWN in each datacenter, timeout={}s",
+ TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+ }
AckMap acks = new AckMap(3);
- int target = (int) ((targetPercent / 100.0) * peers.size());
- CountDownLatch latch = new CountDownLatch(target);
+ Map<String, CountDownLatch> latchMap = new
HashMap<>(peersByDc.size());
+ for (Map.Entry<String, Set<InetAddressAndPort>> entry:
peersByDc.entrySet())
+ {
+ latchMap.put(entry.getKey(), new
CountDownLatch(Math.max(entry.getValue().size() - 1, 0)));
+ }
+
+ long startNanos = System.nanoTime();
// set up a listener to react to new nodes becoming alive (in
gossip), and account for all the nodes that are already alive
- Set<InetAddressAndPort> alivePeers = Sets.newSetFromMap(new
ConcurrentHashMap<>());
- AliveListener listener = new AliveListener(alivePeers, latch,
acks);
+ Set<InetAddressAndPort> alivePeers = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+ AliveListener listener = new AliveListener(alivePeers, latchMap,
acks, getDatacenter);
Gossiper.instance.register(listener);
// send out a ping message to open up the non-gossip connections
- sendPingMessages(peers, latch, acks);
+ sendPingMessages(peers, latchMap, acks, getDatacenter);
for (InetAddressAndPort peer : peers)
+ {
if (Gossiper.instance.isAlive(peer) && alivePeers.add(peer) &&
acks.incrementAndCheck(peer))
- latch.countDown();
+ {
+ String datacenter = getDatacenter.apply(peer);
+ if (latchMap.containsKey(datacenter))
+ latchMap.get(datacenter).countDown();
+ }
+ }
+
+ boolean succeeded =
Uninterruptibles.awaitUninterruptibly(latchMap.get(localDc), timeoutNanos,
TimeUnit.NANOSECONDS);
+ for (String datacenter: latchMap.keySet())
+ {
+ if (datacenter.equals(localDc))
--- End diff --
Makes sense, removed
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]