Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/283#discussion_r225330113
  
    --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
    @@ -22,121 +22,194 @@
     import java.net.InetAddress;
     import java.net.UnknownHostException;
     import java.util.*;
    -import java.util.concurrent.ConcurrentHashMap;
     import java.util.concurrent.ScheduledFuture;
     import java.util.concurrent.TimeUnit;
     import java.util.stream.Collectors;
     
    -import com.codahale.metrics.ExponentiallyDecayingReservoir;
     import javax.management.MBeanServer;
     import javax.management.ObjectName;
     
    -import com.codahale.metrics.Snapshot;
    +import com.google.common.annotations.VisibleForTesting;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
     import org.apache.cassandra.concurrent.ScheduledExecutors;
     import org.apache.cassandra.config.DatabaseDescriptor;
     import org.apache.cassandra.gms.ApplicationState;
     import org.apache.cassandra.gms.EndpointState;
     import org.apache.cassandra.gms.Gossiper;
     import org.apache.cassandra.gms.VersionedValue;
    +import org.apache.cassandra.net.IAsyncCallback;
    +import org.apache.cassandra.net.LatencyMeasurementType;
    +import org.apache.cassandra.net.MessageIn;
    +import org.apache.cassandra.net.MessageOut;
     import org.apache.cassandra.net.MessagingService;
    +import org.apache.cassandra.net.PingMessage;
     import org.apache.cassandra.service.StorageService;
     import org.apache.cassandra.utils.FBUtilities;
     
    +import static org.apache.cassandra.net.MessagingService.Verb.PING;
    +import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
    +import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
    +
    +
     /**
      * A dynamic snitch that sorts endpoints by latency with an adapted phi 
failure detector
    + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) 
are responsible for actually measuring
    + * latency and populating the {@link #scores} map.
      */
    -public class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
    +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
     {
    -    private static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
    -
    -    private static final double ALPHA = 0.75; // set to 0.75 to make EDS 
more biased to towards the newer values
    -    private static final int WINDOW_SIZE = 100;
    -
    -    private volatile int dynamicUpdateInterval = 
DatabaseDescriptor.getDynamicUpdateInterval();
    -    private volatile int dynamicResetInterval = 
DatabaseDescriptor.getDynamicResetInterval();
    -    private volatile double dynamicBadnessThreshold = 
DatabaseDescriptor.getDynamicBadnessThreshold();
    +    private static final Logger logger = 
LoggerFactory.getLogger(DynamicEndpointSnitch.class);
    +
    +    // Subclass specific functionality
    +    protected static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
    +    protected boolean registered = false;
    +    // The scores map is updated via copy in updateScores
    +    // We keep it in the base class for performance reasons (so it can be 
easily aliased)
    +    protected volatile Map<InetAddressAndPort, Double> scores = new 
HashMap<>();
    +
    +    // Rate limit how often we generate latency probes
    +    protected long nextAllowedProbeGenerationTime;
    +    protected long nextProbeGenerationTime;
    +    protected int currentProbePosition = 0;
    +    protected final List<InetAddressAndPort> latencyProbeSequence = new 
ArrayList<>();
    +
    +    // DES general functionality
    +    protected volatile int dynamicUpdateInterval = -1;
    +    protected volatile int dynamicLatencyProbeInterval = -1;
    +    protected volatile double dynamicBadnessThreshold = 0;
     
         // the score for a merged set of endpoints must be this much worse 
than the score for separate endpoints to
         // warrant not merging two ranges into a single range
         private static final double RANGE_MERGING_PREFERENCE = 1.5;
     
         private String mbeanName;
    -    private boolean registered = false;
    -
    -    private volatile HashMap<InetAddressAndPort, Double> scores = new 
HashMap<>();
    -    private final ConcurrentHashMap<InetAddressAndPort, 
ExponentiallyDecayingReservoir> samples = new ConcurrentHashMap<>();
    +    private boolean mbeanRegistered = false;
     
         public final IEndpointSnitch subsnitch;
     
    -    private volatile ScheduledFuture<?> updateSchedular;
    -    private volatile ScheduledFuture<?> resetSchedular;
    +    private volatile ScheduledFuture<?> updateScheduler;
    +    private volatile ScheduledFuture<?> latencyProbeScheduler;
     
    -    private final Runnable update;
    -    private final Runnable reset;
    +    private final Runnable update = this::updateScores;
    +    private final Runnable latencyProbe = this::maybeSendLatencyProbe;
     
         public DynamicEndpointSnitch(IEndpointSnitch snitch)
         {
             this(snitch, null);
         }
     
    -    public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
    +    protected DynamicEndpointSnitch(IEndpointSnitch snitch, String 
instance)
         {
             mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch";
             if (instance != null)
                 mbeanName += ",instance=" + instance;
             subsnitch = snitch;
    -        update = new Runnable()
    -        {
    -            public void run()
    -            {
    -                updateScores();
    -            }
    -        };
    -        reset = new Runnable()
    -        {
    -            public void run()
    -            {
    -                // we do this so that a host considered bad has a chance 
to recover, otherwise would we never try
    -                // to read from it, which would cause its score to never 
change
    -                reset();
    -            }
    -        };
     
             if (DatabaseDescriptor.isDaemonInitialized())
             {
    -            updateSchedular = 
ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, 
dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS);
    -            resetSchedular = 
ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, 
dynamicResetInterval, dynamicResetInterval, TimeUnit.MILLISECONDS);
    +            applyConfigChanges();
                 registerMBean();
             }
    +        nextProbeGenerationTime = System.nanoTime();
    +    }
    +
    +    /**
    +     * Records a latency. This MUST be cheap as it is called in the fast 
path
    +     */
    +    abstract public void receiveTiming(InetAddressAndPort address, long 
latency, LatencyMeasurementType measurementType);
    +
    +    /**
    +     * Called periodically from {@link 
DynamicEndpointSnitch#updateScores()}. This method returns a new score map
    +     * that will be assigned to the {@link DynamicEndpointSnitch#scores} 
map.
    +     * This is generally more expensive than {@link 
DynamicEndpointSnitch#receiveTiming}
    +     */
    +    abstract public Map<InetAddressAndPort, Double> calculateScores();
    +
    +    /**
    +     * Signals that we actually tried to rank vs this host. This way any 
latency probing can just focus
    +     * on hosts that this peer would potentially talk to.
    +     * @param address
    +     */
    +    abstract protected void markRequested(InetAddressAndPort address);
    +
    +    /**
    +     * Dump the underlying metrics backing the DES's decisions for a given 
host
    +     */
    +    abstract public List<Double> dumpTimings(String hostname) throws 
UnknownHostException;
    +
    +    /**
    +     * Populates the provided probe sequence using the underlying metrics
    +     */
    +    abstract void updateLatencyProbeSequence(List<InetAddressAndPort> 
sequence);
    --- End diff --
    
    Again this seems like a super class concern.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to