Github user jolynch commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/283#discussion_r225361545
  
    --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
    @@ -154,20 +228,101 @@ private void registerMBean()
     
         public void close()
         {
    -        updateSchedular.cancel(false);
    -        resetSchedular.cancel(false);
    +        if (updateScheduler != null)
    +            updateScheduler.cancel(false);
    +        if (latencyProbeScheduler != null)
    +            latencyProbeScheduler.cancel(false);
     
             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
             try
             {
    -            mbs.unregisterMBean(new ObjectName(mbeanName));
    +            if (mbeanRegistered)
    +                mbs.unregisterMBean(new ObjectName(mbeanName));
             }
             catch (Exception e)
             {
                 throw new RuntimeException(e);
             }
         }
     
    +    /**
    +     * Determines if latency probes need to be sent, and potentially sends 
a single latency probe per invocation
    +     */
    +    protected void maybeSendLatencyProbe()
    +    {
    +        long currentTimeNS = System.nanoTime();
    +        markNextAllowedProbeGenerationTime(currentTimeNS);
    +
    +        Optional<InetAddressAndPort> needsProbe = 
latencyProbeNeeded(currentTimeNS);
    +        needsProbe.ifPresent(this::sendPingMessageToPeer);
    +    }
    +
    +    /**
    +     * Determines which peers need latency at a particular time. Note that 
this takes currentTimeNS for testability
    +     * of this code path.
    +     * @param currentTimeNS The current time to evaluate. Used mostly for 
testing.
    +     * @return An Optional that if present contains a host to probe.
    +     */
    +    @VisibleForTesting
    +    Optional<InetAddressAndPort> latencyProbeNeeded(long currentTimeNS) {
    +        if (currentProbePosition >= latencyProbeSequence.size() && 
(currentTimeNS > nextProbeGenerationTime))
    +        {
    +            nextProbeGenerationTime = nextAllowedProbeGenerationTime;
    +            latencyProbeSequence.clear();
    +
    +            // Delegate to the subclass to actually figure out what the 
probe sequence should be
    +            updateLatencyProbeSequence(latencyProbeSequence);
    +
    +            if (latencyProbeSequence.size() > 0)
    +                Collections.shuffle(latencyProbeSequence);
    +
    +            currentProbePosition = 0;
    +        }
    +
    +        if (currentProbePosition < latencyProbeSequence.size())
    +        {
    +            try
    +            {
    +                return 
Optional.of(latencyProbeSequence.get(currentProbePosition++));
    +            }
    +            catch (IndexOutOfBoundsException ignored) {}
    +        }
    +
    +        return Optional.empty();
    +    }
    +
    +    private void sendPingMessageToPeer(InetAddressAndPort to)
    +    {
    +        logger.trace("Sending a small and large PingMessage to {}", to);
    +        IAsyncCallback latencyProbeHandler = new IAsyncCallback()
    --- End diff --
    
    Hm, ok I'm not entirely sure I follow but I will take a whack at it and get 
back to you!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to