Repository: cassandra Updated Branches: refs/heads/trunk a8f6a6945 -> 286f6a143
Remove compaction Severity from DynamicEndpointSnitch patch by jbellis; reviewed by Jeremiah Jordan for CASSANDRA-11738 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/286f6a14 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/286f6a14 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/286f6a14 Branch: refs/heads/trunk Commit: 286f6a143573de267c1595fe4dd83108ed5356fc Parents: a8f6a69 Author: Jonathan Ellis <jbel...@apache.org> Authored: Fri Jul 22 17:20:29 2016 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Fri Jul 22 17:24:22 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../cassandra/db/compaction/CompactionInfo.java | 20 --- .../locator/DynamicEndpointSnitch.java | 25 ++- .../locator/DynamicEndpointSnitchMBean.java | 25 ++- .../cassandra/metrics/CompactionMetrics.java | 4 - .../cassandra/service/StorageService.java | 20 --- .../utils/BackgroundActivityMonitor.java | 171 ------------------- 7 files changed, 46 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4f71489..efda89e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -12,6 +12,8 @@ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580) * Add supplied username to authentication error messages (CASSANDRA-12076) * Remove pre-startup check for open JMX port (CASSANDRA-12074) + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738) + 3.9 http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java index fe81eac..535217f 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java @@ -138,8 +138,6 @@ public final class CompactionInfo implements Serializable { private volatile boolean stopRequested = false; public abstract CompactionInfo getCompactionInfo(); - double load = StorageMetrics.load.getCount(); - double reportedSeverity = 0d; public void stop() { @@ -150,23 +148,5 @@ public final class CompactionInfo implements Serializable { return stopRequested; } - /** - * report event on the size of the compaction. - */ - public void started() - { - reportedSeverity = getCompactionInfo().getTotal() / load; - StorageService.instance.reportSeverity(reportedSeverity); - } - - /** - * remove the event complete - */ - public void finished() - { - if (reportedSeverity != 0d) - StorageService.instance.reportSeverity(-(reportedSeverity)); - reportedSeverity = 0d; - } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java index 70aecb0..08f6aa6 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java @@ -23,6 +23,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import com.codahale.metrics.ExponentiallyDecayingReservoir; @@ -31,9 +32,14 @@ import javax.management.ObjectName; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; +import org.apache.ftpserver.command.impl.STOR; /** @@ -283,7 +289,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa // finally, add the severity without any weighting, since hosts scale this relative to their own load and the size of the task causing the severity. // "Severity" is basically a measure of compaction activity (CASSANDRA-3722). if (USE_SEVERITY) - score += StorageService.instance.getSeverity(entry.getKey()); + score += getSeverity(entry.getKey()); // lowest score (least amount of badness) wins. newScores.put(entry.getKey(), score); } @@ -333,12 +339,25 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa public void setSeverity(double severity) { - StorageService.instance.reportManualSeverity(severity); + Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, StorageService.instance.valueFactory.severity(severity)); + } + + private double getSeverity(InetAddress endpoint) + { + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (state == null) + return 0.0; + + VersionedValue event = state.getApplicationState(ApplicationState.SEVERITY); + if (event == null) + return 0.0; + + return Double.parseDouble(event.value); } public double getSeverity() { - return StorageService.instance.getSeverity(FBUtilities.getBroadcastAddress()); + return getSeverity(FBUtilities.getBroadcastAddress()); } public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2) http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java index a413bc5..552a16d 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java @@ -29,11 +29,30 @@ public interface DynamicEndpointSnitchMBean { public double getBadnessThreshold(); public String getSubsnitchClassName(); public List<Double> dumpTimings(String hostname) throws UnknownHostException; + /** - * Use this if you want to specify a severity; it can be negative - * Example: Page cache is cold and you want data to be sent - * though it is not preferred one. + * Setting a Severity allows operators to inject preference information into the Dynamic Snitch + * replica selection. + * + * When choosing which replicas to participate in a read request, the DSnitch sorts replicas + * by response latency, and selects the fastest replicas. Latencies are normalized to a score + * from 0 to 1, with lower scores being faster. + * + * The Severity injected here will be added to the normalized score. + * + * Thus, adding a Severity greater than 1 will mean the replica will never be contacted + * (unless needed for ALL or if it is added later for rapid read protection). + * + * Conversely, adding a negative Severity means the replica will *always* be contacted. + * + * (The "Severity" term is historical and dates to when this was used to represent how + * badly background tasks like compaction were affecting a replica's performance. + * See CASSANDRA-3722 for when this was introduced and CASSANDRA-11738 for why it was removed.) */ public void setSeverity(double severity); + + /** + * @return the current manually injected Severity. + */ public double getSeverity(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/metrics/CompactionMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java index 9d2863f..2cddfff 100644 --- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java @@ -140,15 +140,11 @@ public class CompactionMetrics implements CompactionManager.CompactionExecutorSt public void beginCompaction(CompactionInfo.Holder ci) { - // notify - ci.started(); compactions.add(ci); } public void finishCompaction(CompactionInfo.Holder ci) { - // notify - ci.finished(); compactions.remove(ci); bytesCompacted.inc(ci.getCompactionInfo().getTotal()); totalCompactionsCompleted.mark(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 2883e24..d64fc04 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -189,8 +189,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<>(); - private static final BackgroundActivityMonitor bgMonitor = new BackgroundActivityMonitor(); - private final ObjectName jmxObjectName; private Collection<Token> bootstrapTokens = null; @@ -1468,24 +1466,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } /** - * Increment about the known Compaction severity of the events in this node - */ - public void reportSeverity(double incr) - { - bgMonitor.incrCompactionSeverity(incr); - } - - public void reportManualSeverity(double incr) - { - bgMonitor.incrManualSeverity(incr); - } - - public double getSeverity(InetAddress endpoint) - { - return bgMonitor.getSeverity(endpoint); - } - - /** * for a keyspace, return the ranges and corresponding listen addresses. * @param keyspace * @return the endpoint map http://git-wip-us.apache.org/repos/asf/cassandra/blob/286f6a14/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java b/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java deleted file mode 100644 index 1799d10..0000000 --- a/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java +++ /dev/null @@ -1,171 +0,0 @@ -package org.apache.cassandra.utils; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.io.IOException; -import java.io.RandomAccessFile; -import java.lang.management.ManagementFactory; -import java.net.InetAddress; -import java.util.StringTokenizer; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; -import org.apache.cassandra.gms.ApplicationState; -import org.apache.cassandra.gms.EndpointState; -import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.gms.VersionedValue; -import org.apache.cassandra.service.StorageService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.util.concurrent.AtomicDouble; - -public class BackgroundActivityMonitor -{ - private static final Logger logger = LoggerFactory.getLogger(BackgroundActivityMonitor.class); - - public static final int USER_INDEX = 0; - public static final int NICE_INDEX = 1; - public static final int SYS_INDEX = 2; - public static final int IDLE_INDEX = 3; - public static final int IOWAIT_INDEX = 4; - public static final int IRQ_INDEX = 5; - public static final int SOFTIRQ_INDEX = 6; - - private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors(); - private static final String PROC_STAT_PATH = "/proc/stat"; - - private final AtomicDouble compaction_severity = new AtomicDouble(); - private final AtomicDouble manual_severity = new AtomicDouble(); - private final ScheduledExecutorService reportThread = new DebuggableScheduledThreadPoolExecutor("Background_Reporter"); - - private RandomAccessFile statsFile; - private long[] lastReading; - - public BackgroundActivityMonitor() - { - try - { - statsFile = new RandomAccessFile(PROC_STAT_PATH, "r"); - lastReading = readAndCompute(); - } - catch (IOException ex) - { - if (FBUtilities.hasProcFS()) - logger.warn("Couldn't open /proc/stats"); - statsFile = null; - } - reportThread.scheduleAtFixedRate(new BackgroundActivityReporter(), 1, 1, TimeUnit.SECONDS); - } - - private long[] readAndCompute() throws IOException - { - statsFile.seek(0); - StringTokenizer tokenizer = new StringTokenizer(statsFile.readLine()); - String name = tokenizer.nextToken(); - assert name.equalsIgnoreCase("cpu"); - long[] returned = new long[tokenizer.countTokens()]; - for (int i = 0; i < returned.length; i++) - returned[i] = Long.parseLong(tokenizer.nextToken()); - return returned; - } - - private float compareAtIndex(long[] reading1, long[] reading2, int index) - { - long total1 = 0, total2 = 0; - for (int i = 0; i <= SOFTIRQ_INDEX; i++) - { - total1 += reading1[i]; - total2 += reading2[i]; - } - float totalDiff = total2 - total1; - - long intrested1 = reading1[index], intrested2 = reading2[index]; - float diff = intrested2 - intrested1; - if (diff == 0) - return 0f; - return (diff / totalDiff) * 100; // yes it is hard coded to 100 [update - // unit?] - } - - public void incrCompactionSeverity(double sev) - { - compaction_severity.addAndGet(sev); - } - - public void incrManualSeverity(double sev) - { - manual_severity.addAndGet(sev); - } - - public double getIOWait() throws IOException - { - if (statsFile == null) - return -1d; - long[] newComp = readAndCompute(); - double value = compareAtIndex(lastReading, newComp, IOWAIT_INDEX); - lastReading = newComp; - return value; - } - - public double getNormalizedLoadAvg() - { - double avg = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(); - return avg / NUM_CPUS; - } - - public double getSeverity(InetAddress endpoint) - { - VersionedValue event; - EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); - if (state != null && (event = state.getApplicationState(ApplicationState.SEVERITY)) != null) - return Double.parseDouble(event.value); - return 0.0; - } - - public class BackgroundActivityReporter implements Runnable - { - public void run() - { - double report = -1; - try - { - report = getIOWait(); - } - catch (IOException e) - { - // ignore; - if (FBUtilities.hasProcFS()) - logger.warn("Couldn't read /proc/stats"); - } - if (report == -1d) - report = compaction_severity.get(); - - if (!Gossiper.instance.isEnabled()) - return; - report += manual_severity.get(); // add manual severity setting. - VersionedValue updated = StorageService.instance.valueFactory.severity(report); - Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, updated); - } - } -}