Split out outgoing stream throughput within a DC and inter-DC patch by Vijay and benedict for CASSANDRA-6596
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4112a7fa Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4112a7fa Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4112a7fa Branch: refs/heads/trunk Commit: 4112a7fa279b340fb5f0f28f68ec0bf43f1479bf Parents: 6396a35 Author: vparthasarathy <vijay2...@gmail.com> Authored: Sat Feb 15 12:25:14 2014 -0800 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Mon Jun 30 20:47:58 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 6 +++ .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 10 ++++ .../cassandra/streaming/StreamManager.java | 53 +++++++++++++++----- .../cassandra/streaming/StreamWriter.java | 5 +- 6 files changed, 62 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4112a7fa/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 00fcce7..ff8a7a2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ * Make sure high level sstables get compacted (CASSANDRA-7414) * Fix AssertionError when using empty clustering columns and static columns (CASSANDRA-7455) + * Add inter_dc_stream_throughput_outbound_megabits_per_sec (CASSANDRA-6596) 2.0.9 http://git-wip-us.apache.org/repos/asf/cassandra/blob/4112a7fa/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index ea4d955..f067635 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -480,6 +480,12 @@ compaction_preheat_key_cache: true # When unset, the default is 200 Mbps or 25 MB/s. # stream_throughput_outbound_megabits_per_sec: 200 +# Throttles all streaming file transfer between the datacenters, +# this setting allows users to throttle inter dc stream throughput in addition +# to throttling all network stream traffic as configured with +# stream_throughput_outbound_megabits_per_sec +# inter_dc_stream_throughput_outbound_megabits_per_sec: + # How long the coordinator should wait for read operations to complete read_request_timeout_in_ms: 5000 # How long the coordinator should wait for seq or index scans to complete http://git-wip-us.apache.org/repos/asf/cassandra/blob/4112a7fa/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 7a3185a..aab5025 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -129,6 +129,7 @@ public class Config public Integer max_streaming_retries = 3; public volatile Integer stream_throughput_outbound_megabits_per_sec = 200; + public volatile Integer inter_dc_stream_throughput_outbound_megabits_per_sec = 0; public String[] data_file_directories; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4112a7fa/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 3905eba..badd975 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -914,6 +914,16 @@ public class DatabaseDescriptor conf.stream_throughput_outbound_megabits_per_sec = value; } + public static int getInterDCStreamThroughputOutboundMegabitsPerSec() + { + return conf.inter_dc_stream_throughput_outbound_megabits_per_sec; + } + + public static void setInterDCStreamThroughputOutboundMegabitsPerSec(int value) + { + conf.inter_dc_stream_throughput_outbound_megabits_per_sec = value; + } + public static String[] getAllDataFileLocations() { return conf.data_file_directories; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4112a7fa/src/java/org/apache/cassandra/streaming/StreamManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java index 3fe6179..366f3ff 100644 --- a/src/java/org/apache/cassandra/streaming/StreamManager.java +++ b/src/java/org/apache/cassandra/streaming/StreamManager.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.streaming; +import java.net.InetAddress; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -32,8 +33,8 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.RateLimiter; -import org.cliffc.high_scale_lib.NonBlockingHashMap; +import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.streaming.management.StreamEventJMXNotifier; import org.apache.cassandra.streaming.management.StreamStateCompositeData; @@ -47,25 +48,53 @@ public class StreamManager implements StreamManagerMBean { public static final StreamManager instance = new StreamManager(); - private static final RateLimiter limiter = RateLimiter.create(Double.MAX_VALUE); - /** * Gets streaming rate limiter. * When stream_throughput_outbound_megabits_per_sec is 0, this returns rate limiter * with the rate of Double.MAX_VALUE bytes per second. * Rate unit is bytes per sec. * - * @return RateLimiter with rate limit set + * @return StreamRateLimiter with rate limit set based on peer location. */ - public static RateLimiter getRateLimiter() + public static StreamRateLimiter getRateLimiter(InetAddress peer) { - double currentThroughput = (((double) DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec()) * 1024 * 1024 ) / 8; - // if throughput is set to 0, throttling is disabled - if (currentThroughput == 0) - currentThroughput = Double.MAX_VALUE; - if (limiter.getRate() != currentThroughput) - limiter.setRate(currentThroughput); - return limiter; + return new StreamRateLimiter(peer); + } + + public static class StreamRateLimiter + { + private static final double ONE_MEGA_BIT = 1024 * 1024 * 8; + private static final RateLimiter limiter = RateLimiter.create(Double.MAX_VALUE); + private static final RateLimiter interDCLimiter = RateLimiter.create(Double.MAX_VALUE); + private final boolean isLocalDC; + + public StreamRateLimiter(InetAddress peer) + { + double throughput = ((double) DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec()) * ONE_MEGA_BIT; + mayUpdateThroughput(throughput, limiter); + + double interDCThroughput = ((double) DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec()) * ONE_MEGA_BIT; + mayUpdateThroughput(interDCThroughput, interDCLimiter); + + isLocalDC = DatabaseDescriptor.getLocalDataCenter().equals( + DatabaseDescriptor.getEndpointSnitch().getDatacenter(peer)); + } + + private void mayUpdateThroughput(double limit, RateLimiter rateLimiter) + { + // if throughput is set to 0, throttling is disabled + if (limit == 0) + limit = Double.MAX_VALUE; + if (rateLimiter.getRate() != limit) + rateLimiter.setRate(limit); + } + + public void acquire(int toTransfer) + { + limiter.acquire(toTransfer); + if (!isLocalDC) + interDCLimiter.acquire(toTransfer); + } } private final StreamEventJMXNotifier notifier = new StreamEventJMXNotifier(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4112a7fa/src/java/org/apache/cassandra/streaming/StreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java index 5609f20..5a5163f 100644 --- a/src/java/org/apache/cassandra/streaming/StreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java @@ -24,7 +24,6 @@ import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.Collection; -import com.google.common.util.concurrent.RateLimiter; import com.ning.compress.lzf.LZFOutputStream; import org.apache.cassandra.io.sstable.Component; @@ -33,6 +32,7 @@ import org.apache.cassandra.io.util.DataIntegrityMetadata; import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; import org.apache.cassandra.utils.Pair; /** @@ -44,7 +44,7 @@ public class StreamWriter protected final SSTableReader sstable; protected final Collection<Pair<Long, Long>> sections; - protected final RateLimiter limiter = StreamManager.getRateLimiter(); + protected final StreamRateLimiter limiter; protected final StreamSession session; private OutputStream compressedOutput; @@ -57,6 +57,7 @@ public class StreamWriter this.session = session; this.sstable = sstable; this.sections = sections; + this.limiter = StreamManager.getRateLimiter(session.peer); } /**