Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r225361545 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -154,20 +228,101 @@ private void registerMBean() public void close() { - updateSchedular.cancel(false); - resetSchedular.cancel(false); + if (updateScheduler != null) + updateScheduler.cancel(false); + if (latencyProbeScheduler != null) + latencyProbeScheduler.cancel(false); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { - mbs.unregisterMBean(new ObjectName(mbeanName)); + if (mbeanRegistered) + mbs.unregisterMBean(new ObjectName(mbeanName)); } catch (Exception e) { throw new RuntimeException(e); } } + /** + * Determines if latency probes need to be sent, and potentially sends a single latency probe per invocation + */ + protected void maybeSendLatencyProbe() + { + long currentTimeNS = System.nanoTime(); + markNextAllowedProbeGenerationTime(currentTimeNS); + + Optional<InetAddressAndPort> needsProbe = latencyProbeNeeded(currentTimeNS); + needsProbe.ifPresent(this::sendPingMessageToPeer); + } + + /** + * Determines which peers need latency at a particular time. Note that this takes currentTimeNS for testability + * of this code path. + * @param currentTimeNS The current time to evaluate. Used mostly for testing. + * @return An Optional that if present contains a host to probe. + */ + @VisibleForTesting + Optional<InetAddressAndPort> latencyProbeNeeded(long currentTimeNS) { + if (currentProbePosition >= latencyProbeSequence.size() && (currentTimeNS > nextProbeGenerationTime)) + { + nextProbeGenerationTime = nextAllowedProbeGenerationTime; + latencyProbeSequence.clear(); + + // Delegate to the subclass to actually figure out what the probe sequence should be + updateLatencyProbeSequence(latencyProbeSequence); + + if (latencyProbeSequence.size() > 0) + Collections.shuffle(latencyProbeSequence); + + currentProbePosition = 0; + } + + if (currentProbePosition < latencyProbeSequence.size()) + { + try + { + return Optional.of(latencyProbeSequence.get(currentProbePosition++)); + } + catch (IndexOutOfBoundsException ignored) {} + } + + return Optional.empty(); + } + + private void sendPingMessageToPeer(InetAddressAndPort to) + { + logger.trace("Sending a small and large PingMessage to {}", to); + IAsyncCallback latencyProbeHandler = new IAsyncCallback() --- End diff -- Hm, ok I'm not entirely sure I follow but I will take a whack at it and get back to you!
--- --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org