Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r225330113 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -22,121 +22,194 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import com.codahale.metrics.ExponentiallyDecayingReservoir; import javax.management.MBeanServer; import javax.management.ObjectName; -import com.codahale.metrics.Snapshot; +import com.google.common.annotations.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.PingMessage; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.net.MessagingService.Verb.PING; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring + * latency and populating the {@link #scores} map. */ -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { - private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); - - private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values - private static final int WINDOW_SIZE = 100; - - private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); - private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); - private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); + private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class); + + // Subclass specific functionality + protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); + protected boolean registered = false; + // The scores map is updated via copy in updateScores + // We keep it in the base class for performance reasons (so it can be easily aliased) + protected volatile Map<InetAddressAndPort, Double> scores = new HashMap<>(); + + // Rate limit how often we generate latency probes + protected long nextAllowedProbeGenerationTime; + protected long nextProbeGenerationTime; + protected int currentProbePosition = 0; + protected final List<InetAddressAndPort> latencyProbeSequence = new ArrayList<>(); + + // DES general functionality + protected volatile int dynamicUpdateInterval = -1; + protected volatile int dynamicLatencyProbeInterval = -1; + protected volatile double dynamicBadnessThreshold = 0; // the score for a merged set of endpoints must be this much worse than the score for separate endpoints to // warrant not merging two ranges into a single range private static final double RANGE_MERGING_PREFERENCE = 1.5; private String mbeanName; - private boolean registered = false; - - private volatile HashMap<InetAddressAndPort, Double> scores = new HashMap<>(); - private final ConcurrentHashMap<InetAddressAndPort, ExponentiallyDecayingReservoir> samples = new ConcurrentHashMap<>(); + private boolean mbeanRegistered = false; public final IEndpointSnitch subsnitch; - private volatile ScheduledFuture<?> updateSchedular; - private volatile ScheduledFuture<?> resetSchedular; + private volatile ScheduledFuture<?> updateScheduler; + private volatile ScheduledFuture<?> latencyProbeScheduler; - private final Runnable update; - private final Runnable reset; + private final Runnable update = this::updateScores; + private final Runnable latencyProbe = this::maybeSendLatencyProbe; public DynamicEndpointSnitch(IEndpointSnitch snitch) { this(snitch, null); } - public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance) + protected DynamicEndpointSnitch(IEndpointSnitch snitch, String instance) { mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch"; if (instance != null) mbeanName += ",instance=" + instance; subsnitch = snitch; - update = new Runnable() - { - public void run() - { - updateScores(); - } - }; - reset = new Runnable() - { - public void run() - { - // we do this so that a host considered bad has a chance to recover, otherwise would we never try - // to read from it, which would cause its score to never change - reset(); - } - }; if (DatabaseDescriptor.isDaemonInitialized()) { - updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS); - resetSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, dynamicResetInterval, dynamicResetInterval, TimeUnit.MILLISECONDS); + applyConfigChanges(); registerMBean(); } + nextProbeGenerationTime = System.nanoTime(); + } + + /** + * Records a latency. This MUST be cheap as it is called in the fast path + */ + abstract public void receiveTiming(InetAddressAndPort address, long latency, LatencyMeasurementType measurementType); + + /** + * Called periodically from {@link DynamicEndpointSnitch#updateScores()}. This method returns a new score map + * that will be assigned to the {@link DynamicEndpointSnitch#scores} map. + * This is generally more expensive than {@link DynamicEndpointSnitch#receiveTiming} + */ + abstract public Map<InetAddressAndPort, Double> calculateScores(); + + /** + * Signals that we actually tried to rank vs this host. This way any latency probing can just focus + * on hosts that this peer would potentially talk to. + * @param address + */ + abstract protected void markRequested(InetAddressAndPort address); + + /** + * Dump the underlying metrics backing the DES's decisions for a given host + */ + abstract public List<Double> dumpTimings(String hostname) throws UnknownHostException; + + /** + * Populates the provided probe sequence using the underlying metrics + */ + abstract void updateLatencyProbeSequence(List<InetAddressAndPort> sequence); --- End diff -- Again this seems like a super class concern.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org