Introduce intra-cluster message coalescing patch by ariel; reviewed by benedict for CASSANDRA-8692
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/82849649 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/82849649 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/82849649 Branch: refs/heads/trunk Commit: 828496492c51d7437b690999205ecc941f41a0a9 Parents: db1a741 Author: Ariel Weisberg <ariel.weisb...@datastax.com> Authored: Wed Mar 18 10:37:28 2015 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Wed Mar 18 10:38:04 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/Config.java | 22 + .../cassandra/config/DatabaseDescriptor.java | 10 + .../cassandra/net/IncomingTcpConnection.java | 5 +- .../cassandra/net/OutboundTcpConnection.java | 117 +++- .../cassandra/utils/CoalescingStrategies.java | 544 +++++++++++++++++++ .../utils/NanoTimeToCurrentTimeMillis.java | 88 +++ .../utils/CoalescingStrategiesTest.java | 445 +++++++++++++++ .../utils/NanoTimeToCurrentTimeMillisTest.java | 52 ++ 9 files changed, 1254 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 56a7164..68df77e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.4 + * Introduce intra-cluster message coalescing (CASSANDRA-8692) * DatabaseDescriptor throws NPE when rpc_interface is used (CASSANDRA-8839) * Don't check if an sstable is live for offline compactions (CASSANDRA-8841) * Don't set clientMode in SSTableLoader (CASSANDRA-8238) http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index fbbd1dd..378a1ad 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -39,6 +39,12 @@ import org.apache.cassandra.utils.FBUtilities; */ public class Config { + /* + * Prefix for Java properties for internal Cassandra configuration options + */ + public static final String PROPERTY_PREFIX = "cassandra."; + + public String cluster_name = "Test Cluster"; public String authenticator; public String authorizer; @@ -223,6 +229,22 @@ public class Config private static final CsvPreference STANDARD_SURROUNDING_SPACES_NEED_QUOTES = new CsvPreference.Builder(CsvPreference.STANDARD_PREFERENCE) .surroundingSpacesNeedQuotes(true).build(); + /* + * Strategy to use for coalescing messages in OutboundTcpConnection. + * Can be fixed, movingaverage, timehorizon, disabled. Setting is case and leading/trailing + * whitespace insensitive. You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name. + */ + public String otc_coalescing_strategy = "DISABLED"; + + /* + * How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first + * messgae is received before it will be sent with any accompanying messages. For moving average this is the + * maximum amount of time that will be waited as well as the interval at which messages must arrive on average + * for coalescing to be enabled. + */ + public static final int otc_coalescing_window_us_default = 200; + public int otc_coalescing_window_us = otc_coalescing_window_us_default; + public static boolean getOutboundBindAny() { return outboundBindAny; http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 65cec9c..d0db9f4 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1682,4 +1682,14 @@ public class DatabaseDescriptor String arch = System.getProperty("os.arch"); return arch.contains("64") || arch.contains("sparcv9"); } + + public static String getOtcCoalescingStrategy() + { + return conf.otc_coalescing_strategy; + } + + public static int getOtcCoalescingWindow() + { + return conf.otc_coalescing_window_us; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index ee44493..e7d434b 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -32,6 +32,7 @@ import net.jpountz.lz4.LZ4Factory; import net.jpountz.xxhash.XXHashFactory; import org.xerial.snappy.SnappyInputStream; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.UnknownColumnFamilyException; import org.apache.cassandra.gms.Gossiper; @@ -40,6 +41,8 @@ public class IncomingTcpConnection extends Thread { private static final Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class); + private static final int BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX + ".itc_buffer_size", 1024 * 4); + private final int version; private final boolean compressed; private final Socket socket; @@ -132,7 +135,7 @@ public class IncomingTcpConnection extends Thread } else { - in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096)); + in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE)); } if (version > MessagingService.current_version) http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 4586667..2d6d4fe 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -25,10 +25,7 @@ import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -44,14 +41,18 @@ import net.jpountz.lz4.LZ4BlockOutputStream; import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Factory; import net.jpountz.xxhash.XXHashFactory; + import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.CoalescingStrategies; +import org.apache.cassandra.utils.CoalescingStrategies.Coalescable; +import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis; import org.apache.cassandra.utils.UUIDGen; import org.xerial.snappy.SnappyOutputStream; - import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; @@ -61,6 +62,54 @@ public class OutboundTcpConnection extends Thread { private static final Logger logger = LoggerFactory.getLogger(OutboundTcpConnection.class); + private static final String PREFIX = Config.PROPERTY_PREFIX; + + /* + * Enabled/disable TCP_NODELAY for intradc connections. Defaults to enabled. + */ + private static final String INTRADC_TCP_NODELAY_PROPERTY = PREFIX + "otc_intradc_tcp_nodelay"; + private static final boolean INTRADC_TCP_NODELAY = Boolean.valueOf(System.getProperty(INTRADC_TCP_NODELAY_PROPERTY, "true")); + + /* + * Size of buffer in output stream + */ + private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size"; + private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64); + + private static CoalescingStrategy newCoalescingStrategy(String displayName) + { + return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(), + DatabaseDescriptor.getOtcCoalescingWindow(), + logger, + displayName); + } + + static + { + String strategy = DatabaseDescriptor.getOtcCoalescingStrategy(); + switch (strategy) + { + case "TIMEHORIZON": + break; + case "MOVINGAVERAGE": + case "FIXED": + case "DISABLED": + logger.info("OutboundTcpConnection using coalescing strategy " + strategy); + break; + default: + //Check that it can be loaded + newCoalescingStrategy("dummy"); + } + + int coalescingWindow = DatabaseDescriptor.getOtcCoalescingWindow(); + if (coalescingWindow != Config.otc_coalescing_window_us_default) + logger.info("OutboundTcpConnection coalescing window set to " + coalescingWindow + "μs"); + + if (coalescingWindow < 0) + throw new ExceptionInInitializerError( + "Value provided for coalescing window must be greather than 0: " + coalescingWindow); + } + private static final MessageOut CLOSE_SENTINEL = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE); private volatile boolean isStopped = false; @@ -74,6 +123,7 @@ public class OutboundTcpConnection extends Thread private final OutboundTcpConnectionPool poolReference; + private final CoalescingStrategy cs; private DataOutputStreamPlus out; private Socket socket; private volatile long completed; @@ -85,6 +135,7 @@ public class OutboundTcpConnection extends Thread { super("WRITE-" + pool.endPoint()); this.poolReference = pool; + cs = newCoalescingStrategy(pool.endPoint().getHostAddress()); } private static boolean isLocalDC(InetAddress targetHost) @@ -127,26 +178,27 @@ public class OutboundTcpConnection extends Thread public void run() { + final int drainedMessageSize = 128; // keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize) - final List<QueuedMessage> drainedMessages = new ArrayList<>(128); + final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize); + outer: while (true) { - if (backlog.drainTo(drainedMessages, drainedMessages.size()) == 0) + try { - try - { - drainedMessages.add(backlog.take()); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - + cs.coalesce(backlog, drainedMessages, drainedMessageSize); } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + currentMsgBufferCount = drainedMessages.size(); int count = drainedMessages.size(); + //The timestamp of the first message has already been provided to the coalescing strategy + //so skip logging it. for (QueuedMessage qm : drainedMessages) { try @@ -159,10 +211,11 @@ public class OutboundTcpConnection extends Thread break outer; continue; } - if (qm.isTimedOut(m.getTimeout())) + + if (qm.isTimedOut(TimeUnit.MILLISECONDS.toNanos(m.getTimeout()), System.nanoTime())) dropped.incrementAndGet(); else if (socket != null || connect()) - writeConnected(qm, count == 1 && backlog.size() == 0); + writeConnected(qm, count == 1 && backlog.isEmpty()); else // clear out the queue, else gossip messages back up. backlog.clear(); @@ -225,7 +278,8 @@ public class OutboundTcpConnection extends Thread } } - writeInternal(qm.message, qm.id, qm.timestamp); + long timestampMillis = NanoTimeToCurrentTimeMillis.convert(qm.timestampNanos); + writeInternal(qm.message, qm.id, timestampMillis); completed++; if (flush) @@ -325,7 +379,7 @@ public class OutboundTcpConnection extends Thread socket.setKeepAlive(true); if (isLocalDC(poolReference.endPoint())) { - socket.setTcpNoDelay(true); + socket.setTcpNoDelay(INTRADC_TCP_NODELAY); } else { @@ -342,7 +396,7 @@ public class OutboundTcpConnection extends Thread logger.warn("Failed to set send buffer size on internode socket.", se); } } - out = new DataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream(), 4096)); + out = new DataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE)); out.writeInt(MessagingService.PROTOCOL_MAGIC); writeHeader(out, targetVersion, shouldCompressConnection()); @@ -416,7 +470,7 @@ public class OutboundTcpConnection extends Thread } return false; } - + private int handshakeVersion(final DataInputStream inputStream) { final AtomicInteger version = new AtomicInteger(NO_VERSION); @@ -431,7 +485,7 @@ public class OutboundTcpConnection extends Thread logger.info("Handshaking version with {}", poolReference.endPoint()); version.set(inputStream.readInt()); } - catch (IOException ex) + catch (IOException ex) { final String msg = "Cannot handshake version with " + poolReference.endPoint(); if (logger.isTraceEnabled()) @@ -464,7 +518,7 @@ public class OutboundTcpConnection extends Thread while (iter.hasNext()) { QueuedMessage qm = iter.next(); - if (qm.timestamp >= System.currentTimeMillis() - qm.message.getTimeout()) + if (qm.timestampNanos >= System.nanoTime() - qm.message.getTimeout()) return; iter.remove(); dropped.incrementAndGet(); @@ -472,31 +526,36 @@ public class OutboundTcpConnection extends Thread } /** messages that have not been retried yet */ - private static class QueuedMessage + private static class QueuedMessage implements Coalescable { final MessageOut<?> message; final int id; - final long timestamp; + final long timestampNanos; final boolean droppable; QueuedMessage(MessageOut<?> message, int id) { this.message = message; this.id = id; - this.timestamp = System.currentTimeMillis(); + this.timestampNanos = System.nanoTime(); this.droppable = MessagingService.DROPPABLE_VERBS.contains(message.verb); } /** don't drop a non-droppable message just because it's timestamp is expired */ - boolean isTimedOut(long maxTime) + boolean isTimedOut(long maxTimeNanos, long nowNanos) { - return droppable && timestamp < System.currentTimeMillis() - maxTime; + return droppable && timestampNanos < nowNanos - maxTimeNanos; } boolean shouldRetry() { return !droppable; } + + public long timestampNanos() + { + return timestampNanos; + } } private static class RetriedQueuedMessage extends QueuedMessage http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/utils/CoalescingStrategies.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java new file mode 100644 index 0000000..ca1399b --- /dev/null +++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.io.util.FileUtils; +import org.slf4j.Logger; + +import java.io.File; +import java.io.RandomAccessFile; +import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel.MapMode; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +public class CoalescingStrategies +{ + + /* + * Log debug information at info level about what the average is and when coalescing is enabled/disabled + */ + private static final String DEBUG_COALESCING_PROPERTY = Config.PROPERTY_PREFIX + "coalescing_debug"; + private static final boolean DEBUG_COALESCING = Boolean.getBoolean(DEBUG_COALESCING_PROPERTY); + + private static final String DEBUG_COALESCING_PATH_PROPERTY = Config.PROPERTY_PREFIX + "coalescing_debug_path"; + private static final String DEBUG_COALESCING_PATH = System.getProperty(DEBUG_COALESCING_PATH_PROPERTY, "/tmp/coleascing_debug"); + + static { + if (DEBUG_COALESCING) + { + File directory = new File(DEBUG_COALESCING_PATH); + + if (directory.exists()) + FileUtils.deleteRecursive(directory); + + if (!directory.mkdirs()) + throw new ExceptionInInitializerError("Couldn't create log dir"); + } + } + + @VisibleForTesting + interface Clock + { + long nanoTime(); + } + + @VisibleForTesting + static Clock CLOCK = new Clock() + { + public long nanoTime() + { + return System.nanoTime(); + } + }; + + public static interface Coalescable { + long timestampNanos(); + } + + @VisibleForTesting + static void parkLoop(long nanos) + { + long now = System.nanoTime(); + final long timer = now + nanos; + do + { + LockSupport.parkNanos(timer - now); + } + while (timer - (now = System.nanoTime()) > nanos / 16); + } + + private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker) + { + // only sleep if we can expect to double the number of messages we're sending in the time interval + long sleep = messages * averageGap; + if (sleep > maxCoalesceWindow) + return false; + + // assume we receive as many messages as we expect; apply the same logic to the future batch: + // expect twice as many messages to consider sleeping for "another" interval; this basically translates + // to doubling our sleep period until we exceed our max sleep window + while (sleep * 2 < maxCoalesceWindow) + sleep *= 2; + parker.park(sleep); + return true; + } + + public static abstract class CoalescingStrategy + { + protected final Parker parker; + protected final Logger logger; + protected volatile boolean shouldLogAverage = false; + protected final ByteBuffer logBuffer; + private RandomAccessFile ras; + private final String displayName; + + protected CoalescingStrategy(Parker parker, Logger logger, String displayName) + { + this.parker = parker; + this.logger = logger; + this.displayName = displayName; + if (DEBUG_COALESCING) + { + new Thread(displayName + " debug thread") { + @Override + public void run() { + while (true) { + try + { + Thread.sleep(5000); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + shouldLogAverage = true; + } + } + }.start(); + } + RandomAccessFile rasTemp = null; + ByteBuffer logBufferTemp = null; + if (DEBUG_COALESCING) + { + try + { + File outFile = File.createTempFile("coalescing_" + this.displayName + "_", ".log", new File(DEBUG_COALESCING_PATH)); + rasTemp = new RandomAccessFile(outFile, "rw"); + logBufferTemp = ras.getChannel().map(MapMode.READ_WRITE, 0, Integer.MAX_VALUE); + logBufferTemp.putLong(0); + } + catch (Exception e) + { + logger.error("Unable to create output file for debugging coalescing", e); + } + } + ras = rasTemp; + logBuffer = logBufferTemp; + } + + /* + * If debugging is enabled log to the logger the current average gap calculation result. + */ + final protected void debugGap(long averageGap) + { + if (DEBUG_COALESCING && shouldLogAverage) + { + shouldLogAverage = false; + logger.info(toString() + " gap " + TimeUnit.NANOSECONDS.toMicros(averageGap) + "μs"); + } + } + + /* + * If debugging is enabled log the provided nanotime timestamp to a file. + */ + final protected void debugTimestamp(long timestamp) + { + if(DEBUG_COALESCING && logBuffer != null) + { + logBuffer.putLong(0, logBuffer.getLong(0) + 1); + logBuffer.putLong(timestamp); + } + } + + /* + * If debugging is enabled log the timestamps of all the items in the provided collection + * to a file. + */ + final protected <C extends Coalescable> void debugTimestamps(Collection<C> coalescables) { + if (DEBUG_COALESCING) { + for (C coalescable : coalescables) { + debugTimestamp(coalescable.timestampNanos()); + } + } + } + + /** + * Drain from the input blocking queue to the output list up to maxItems elements. + * + * The coalescing strategy may choose to park the current thread if it thinks it will + * be able to produce an output list with more elements. + * + * @param input Blocking queue to retrieve elements from + * @param out Output list to place retrieved elements in. Must be empty. + * @param maxItems Maximum number of elements to place in the output list + */ + public <C extends Coalescable> void coalesce(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException + { + Preconditions.checkArgument(out.isEmpty(), "out list should be empty"); + coalesceInternal(input, out, maxItems); + } + + protected abstract <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException; + + } + + @VisibleForTesting + interface Parker + { + void park(long nanos); + } + + private static final Parker PARKER = new Parker() + { + @Override + public void park(long nanos) + { + parkLoop(nanos); + } + }; + + @VisibleForTesting + static class TimeHorizonMovingAverageCoalescingStrategy extends CoalescingStrategy + { + // for now we'll just use 64ms per bucket; this can be made configurable, but results in ~1s for 16 samples + private static final int INDEX_SHIFT = 26; + private static final long BUCKET_INTERVAL = 1L << 26; + private static final int BUCKET_COUNT = 16; + private static final long INTERVAL = BUCKET_INTERVAL * BUCKET_COUNT; + private static final long MEASURED_INTERVAL = BUCKET_INTERVAL * (BUCKET_COUNT - 1); + + // the minimum timestamp we will now accept updates for; only moves forwards, never backwards + private long epoch = CLOCK.nanoTime(); + // the buckets, each following on from epoch; the measurements run from ix(epoch) to ix(epoch - 1) + // ix(epoch-1) is a partial result, that is never actually part of the calculation, and most updates + // are expected to hit this bucket + private final int samples[] = new int[BUCKET_COUNT]; + private long sum = 0; + private final long maxCoalesceWindow; + + public TimeHorizonMovingAverageCoalescingStrategy(int maxCoalesceWindow, Parker parker, Logger logger, String displayName) + { + super(parker, logger, displayName); + this.maxCoalesceWindow = TimeUnit.MICROSECONDS.toNanos(maxCoalesceWindow); + sum = 0; + } + + private void logSample(long nanos) + { + debugTimestamp(nanos); + long epoch = this.epoch; + long delta = nanos - epoch; + if (delta < 0) + // have to simply ignore, but would be a bit crazy to get such reordering + return; + + if (delta > INTERVAL) + epoch = rollepoch(delta, epoch, nanos); + + int ix = ix(nanos); + samples[ix]++; + + // if we've updated an old bucket, we need to update the sum to match + if (ix != ix(epoch - 1)) + sum++; + } + + private long averageGap() + { + if (sum == 0) + return Integer.MAX_VALUE; + return MEASURED_INTERVAL / sum; + } + + // this sample extends past the end of the range we cover, so rollover + private long rollepoch(long delta, long epoch, long nanos) + { + if (delta > 2 * INTERVAL) + { + // this sample is more than twice our interval ahead, so just clear our counters completely + epoch = epoch(nanos); + sum = 0; + Arrays.fill(samples, 0); + } + else + { + // ix(epoch - 1) => last index; this is our partial result bucket, so we add this to the sum + sum += samples[ix(epoch - 1)]; + // then we roll forwards, clearing buckets, until our interval covers the new sample time + while (epoch + INTERVAL < nanos) + { + int index = ix(epoch); + sum -= samples[index]; + samples[index] = 0; + epoch += BUCKET_INTERVAL; + } + } + // store the new epoch + this.epoch = epoch; + return epoch; + } + + private long epoch(long latestNanos) + { + return (latestNanos - MEASURED_INTERVAL) & ~(BUCKET_INTERVAL - 1); + } + + private int ix(long nanos) + { + return (int) ((nanos >>> INDEX_SHIFT) & 15); + } + + @Override + protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException + { + if (input.drainTo(out, maxItems) == 0) + { + out.add(input.take()); + input.drainTo(out, maxItems - 1); + } + + for (Coalescable qm : out) + logSample(qm.timestampNanos()); + + long averageGap = averageGap(); + debugGap(averageGap); + + int count = out.size(); + if (maybeSleep(count, averageGap, maxCoalesceWindow, parker)) + { + input.drainTo(out, maxItems - out.size()); + int prevCount = count; + count = out.size(); + for (int i = prevCount; i < count; i++) + logSample(out.get(i).timestampNanos()); + } + } + + @Override + public String toString() { + return "Time horizon moving average"; + } + } + + /* + * Start coalescing by sleeping if the moving average is < the requested window. + * The actual time spent waiting to coalesce will be the min( window, moving average * 2) + * The actual amount of time spent waiting can be greater then the window. For instance + * observed time spent coalescing was 400 microseconds with the window set to 200 in one benchmark. + */ + @VisibleForTesting + static class MovingAverageCoalescingStrategy extends CoalescingStrategy + { + private final int samples[] = new int[16]; + private long lastSample = 0; + private int index = 0; + private long sum = 0; + + private final long maxCoalesceWindow; + + public MovingAverageCoalescingStrategy(int maxCoalesceWindow, Parker parker, Logger logger, String displayName) + { + super(parker, logger, displayName); + this.maxCoalesceWindow = TimeUnit.MICROSECONDS.toNanos(maxCoalesceWindow); + for (int ii = 0; ii < samples.length; ii++) + samples[ii] = Integer.MAX_VALUE; + sum = Integer.MAX_VALUE * (long)samples.length; + } + + private long logSample(int value) + { + sum -= samples[index]; + sum += value; + samples[index] = value; + index++; + index = index & ((1 << 4) - 1); + return sum / 16; + } + + private long notifyOfSample(long sample) + { + debugTimestamp(sample); + if (sample > lastSample) + { + final int delta = (int)(Math.min(Integer.MAX_VALUE, sample - lastSample)); + lastSample = sample; + return logSample(delta); + } + else + { + return logSample(1); + } + } + + @Override + protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException + { + if (input.drainTo(out, maxItems) == 0) + { + out.add(input.take()); + } + + long average = notifyOfSample(out.get(0).timestampNanos()); + + debugGap(average); + + maybeSleep(out.size(), average, maxCoalesceWindow, parker); + + input.drainTo(out, maxItems - out.size()); + for (int ii = 1; ii < out.size(); ii++) + notifyOfSample(out.get(ii).timestampNanos()); + } + + @Override + public String toString() { + return "Moving average"; + } + } + + /* + * A fixed strategy as a backup in case MovingAverage or TimeHorizongMovingAverage fails in some scenario + */ + @VisibleForTesting + static class FixedCoalescingStrategy extends CoalescingStrategy + { + private final long coalesceWindow; + + public FixedCoalescingStrategy(int coalesceWindowMicros, Parker parker, Logger logger, String displayName) + { + super(parker, logger, displayName); + coalesceWindow = TimeUnit.MICROSECONDS.toNanos(coalesceWindowMicros); + } + + @Override + protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException + { + if (input.drainTo(out, maxItems) == 0) + { + out.add(input.take()); + parker.park(coalesceWindow); + input.drainTo(out, maxItems - 1); + } + debugTimestamps(out); + } + + @Override + public String toString() { + return "Fixed"; + } + } + + /* + * A coalesscing strategy that just returns all currently available elements + */ + @VisibleForTesting + static class DisabledCoalescingStrategy extends CoalescingStrategy + { + + public DisabledCoalescingStrategy(int coalesceWindowMicros, Parker parker, Logger logger, String displayName) + { + super(parker, logger, displayName); + } + + @Override + protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException + { + if (input.drainTo(out, maxItems) == 0) + { + out.add(input.take()); + input.drainTo(out, maxItems - 1); + } + debugTimestamps(out); + } + + @Override + public String toString() { + return "Disabled"; + } + } + + @VisibleForTesting + static CoalescingStrategy newCoalescingStrategy(String strategy, + int coalesceWindow, + Parker parker, + Logger logger, + String displayName) + { + String classname = null; + String strategyCleaned = strategy.trim().toUpperCase(); + switch(strategyCleaned) + { + case "MOVINGAVERAGE": + classname = MovingAverageCoalescingStrategy.class.getName(); + break; + case "FIXED": + classname = FixedCoalescingStrategy.class.getName(); + break; + case "TIMEHORIZON": + classname = TimeHorizonMovingAverageCoalescingStrategy.class.getName(); + break; + case "DISABLED": + classname = DisabledCoalescingStrategy.class.getName(); + break; + default: + classname = strategy; + } + + try + { + Class<?> clazz = Class.forName(classname); + + if (!CoalescingStrategy.class.isAssignableFrom(clazz)) + { + throw new RuntimeException(classname + " is not an instance of CoalescingStrategy"); + } + + Constructor<?> constructor = clazz.getConstructor(int.class, Parker.class, Logger.class, String.class); + + return (CoalescingStrategy)constructor.newInstance(coalesceWindow, parker, logger, displayName); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + public static CoalescingStrategy newCoalescingStrategy(String strategy, int coalesceWindow, Logger logger, String displayName) + { + return newCoalescingStrategy(strategy, coalesceWindow, PARKER, logger, displayName); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java b/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java new file mode 100644 index 0000000..a6c5d28 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.config.Config; + +import com.google.common.annotations.VisibleForTesting; + +/* + * Convert from nanotime to non-monotonic current time millis. Beware of weaker ordering guarantees. + */ +public class NanoTimeToCurrentTimeMillis +{ + /* + * How often to pull a new timestamp from the system. + */ + private static final String TIMESTAMP_UPDATE_INTERVAL_PROPERTY = Config.PROPERTY_PREFIX + "NANOTIMETOMILLIS_TIMESTAMP_UPDATE_INTERVAL"; + private static final long TIMESTAMP_UPDATE_INTERVAL = Long.getLong(TIMESTAMP_UPDATE_INTERVAL_PROPERTY, 10000); + + private static volatile long TIMESTAMP_BASE[] = new long[] { System.currentTimeMillis(), System.nanoTime() }; + + @VisibleForTesting + public static final Object TIMESTAMP_UPDATE = new Object(); + + /* + * System.currentTimeMillis() is 25 nanoseconds. This is 2 nanoseconds (maybe) according to JMH. + * Faster than calling both currentTimeMillis() and nanoTime(). + * + * There is also the issue of how scalable nanoTime() and currentTimeMillis() are which is a moving target. + * + * These timestamps don't order with System.currentTimeMillis() because currentTimeMillis() can tick over + * before this one does. I have seen it behind by as much as 2 milliseconds. + */ + public static final long convert(long nanoTime) + { + final long timestampBase[] = TIMESTAMP_BASE; + return timestampBase[0] + TimeUnit.NANOSECONDS.toMillis(nanoTime - timestampBase[1]); + } + + static + { + //Pick up updates from NTP periodically + Thread t = new Thread("NanoTimeToCurrentTimeMillis updater") + { + @Override + public void run() + { + while (true) + { + try + { + synchronized (TIMESTAMP_UPDATE) + { + TIMESTAMP_UPDATE.wait(TIMESTAMP_UPDATE_INTERVAL); + } + } + catch (InterruptedException e) + { + return; + } + + TIMESTAMP_BASE = new long[] { + Math.max(TIMESTAMP_BASE[0], System.currentTimeMillis()), + Math.max(TIMESTAMP_BASE[1], System.nanoTime()) }; + } + } + }; + t.setDaemon(true); + t.start(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java new file mode 100644 index 0000000..97d15fe --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import org.apache.cassandra.utils.CoalescingStrategies.Clock; +import org.apache.cassandra.utils.CoalescingStrategies.Coalescable; +import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy; +import org.apache.cassandra.utils.CoalescingStrategies.Parker; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +import static org.junit.Assert.*; + +public class CoalescingStrategiesTest +{ + + static final ExecutorService ex = Executors.newSingleThreadExecutor(); + + private static final Logger logger = LoggerFactory.getLogger(CoalescingStrategiesTest.class); + + static class MockParker implements Parker + { + Queue<Long> parks = new ArrayDeque<Long>(); + Semaphore permits = new Semaphore(0); + + Semaphore parked = new Semaphore(0); + + public void park(long nanos) + { + parks.offer(nanos); + parked.release(); + try + { + permits.acquire(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + static class SimpleCoalescable implements Coalescable + { + final long timestampNanos; + + SimpleCoalescable(long timestampNanos) + { + this.timestampNanos = timestampNanos; + } + + public long timestampNanos() + { + return timestampNanos; + } + } + + + static long toNanos(long micros) + { + return TimeUnit.MICROSECONDS.toNanos(micros); + } + + MockParker parker; + + BlockingQueue<SimpleCoalescable> input; + List<SimpleCoalescable> output; + + CoalescingStrategy cs; + + Semaphore queueParked = new Semaphore(0); + Semaphore queueRelease = new Semaphore(0); + + @SuppressWarnings({ "serial" }) + @Before + public void setUp() throws Exception + { + cs = null; + CoalescingStrategies.CLOCK = new Clock() + { + @Override + public long nanoTime() + { + return 0; + } + }; + + parker = new MockParker(); + input = new LinkedBlockingQueue<SimpleCoalescable>() + { + @Override + public SimpleCoalescable take() throws InterruptedException + { + queueParked.release(); + queueRelease.acquire(); + return super.take(); + } + }; + output = new ArrayList<>(128); + + clear(); + } + + CoalescingStrategy newStrategy(String name, int window) + { + return CoalescingStrategies.newCoalescingStrategy(name, window, parker, logger, "Stupendopotamus"); + } + + void add(long whenMicros) + { + input.offer(new SimpleCoalescable(toNanos(whenMicros))); + } + + void clear() + { + output.clear(); + input.clear(); + parker.parks.clear(); + parker.parked.drainPermits(); + parker.permits.drainPermits(); + queueParked.drainPermits(); + queueRelease.drainPermits(); + } + + void release() throws Exception + { + queueRelease.release(); + parker.permits.release(); + fut.get(); + } + + Future<?> fut; + void runBlocker(Semaphore waitFor) throws Exception + { + fut = ex.submit(new Runnable() + { + @Override + public void run() + { + try + { + cs.coalesce(input, output, 128); + } + catch (Exception ex) + { + ex.printStackTrace(); + throw new RuntimeException(ex); + } + } + }); + waitFor.acquire(); + } + + @Test + public void testFixedCoalescingStrategy() throws Exception + { + cs = newStrategy("FIXED", 200); + + //Test that when a stream of messages continues arriving it keeps sending until all are drained + //It does this because it is already awake and sending messages + add(42); + add(42); + cs.coalesce(input, output, 128); + assertEquals( 2, output.size()); + assertNull(parker.parks.poll()); + + clear(); + + runBlocker(queueParked); + add(42); + add(42); + add(42); + release(); + assertEquals( 3, output.size()); + assertEquals(toNanos(200), parker.parks.poll().longValue()); + + } + + @Test + public void testDisabledCoalescingStrateg() throws Exception + { + cs = newStrategy("DISABLED", 200); + + add(42); + add(42); + cs.coalesce(input, output, 128); + assertEquals( 2, output.size()); + assertNull(parker.parks.poll()); + + clear(); + + runBlocker(queueParked); + add(42); + add(42); + release(); + assertEquals( 2, output.size()); + assertNull(parker.parks.poll()); + } + + @Test + public void parkLoop() throws Exception + { + final Thread current = Thread.currentThread(); + final Semaphore helperReady = new Semaphore(0); + final Semaphore helperGo = new Semaphore(0); + + new Thread() + { + @Override + public void run() + { + try + { + helperReady.release(); + helperGo.acquire(); + Thread.sleep(50); + LockSupport.unpark(current); + } + catch (Exception e) + { + e.printStackTrace(); + logger.error("Error", e); + System.exit(-1); + } + } + }.start(); + + long start = System.nanoTime(); + helperGo.release(); + + long parkNanos = TimeUnit.MILLISECONDS.toNanos(500); + + CoalescingStrategies.parkLoop(parkNanos); + long delta = System.nanoTime() - start; + + assertTrue (delta >= (parkNanos - (parkNanos / 16))); + } + + @Test + public void testMovingAverageCoalescingStrategy() throws Exception + { + cs = newStrategy("org.apache.cassandra.utils.CoalescingStrategies$MovingAverageCoalescingStrategy", 200); + + + //Test that things can be pulled out of the queue if it is non-empty + add(201); + add(401); + cs.coalesce(input, output, 128); + assertEquals( 2, output.size()); + assertNull(parker.parks.poll()); + + //Test that blocking on the queue results in everything drained + clear(); + + runBlocker(queueParked); + add(601); + add(801); + release(); + assertEquals( 2, output.size()); + assertNull(parker.parks.poll()); + + clear(); + + //Test that out of order samples still flow + runBlocker(queueParked); + add(0); + release(); + assertEquals( 1, output.size()); + assertNull(parker.parks.poll()); + + clear(); + + add(0); + cs.coalesce(input, output, 128); + assertEquals( 1, output.size()); + assertNull(parker.parks.poll()); + + clear(); + + //Test that too high an average doesn't coalesce + for (long ii = 0; ii < 128; ii++) + add(ii * 1000); + cs.coalesce(input, output, 128); + assertEquals(output.size(), 128); + assertTrue(parker.parks.isEmpty()); + + clear(); + + runBlocker(queueParked); + add(129 * 1000); + release(); + assertTrue(parker.parks.isEmpty()); + + clear(); + + //Test that a low enough average coalesces + cs = newStrategy("MOVINGAVERAGE", 200); + for (long ii = 0; ii < 128; ii++) + add(ii * 99); + cs.coalesce(input, output, 128); + assertEquals(output.size(), 128); + assertTrue(parker.parks.isEmpty()); + + clear(); + + runBlocker(queueParked); + add(128 * 99); + add(129 * 99); + release(); + assertEquals(2, output.size()); + assertEquals(toNanos(198), parker.parks.poll().longValue()); + } + + @Test + public void testTimeHorizonStrategy() throws Exception + { + cs = newStrategy("TIMEHORIZON", 200); + + //Test that things can be pulled out of the queue if it is non-empty + add(201); + add(401); + cs.coalesce(input, output, 128); + assertEquals( 2, output.size()); + assertNull(parker.parks.poll()); + + //Test that blocking on the queue results in everything drained + clear(); + + runBlocker(queueParked); + add(601); + add(801); + release(); + assertEquals( 2, output.size()); + assertNull(parker.parks.poll()); + + clear(); + + //Test that out of order samples still flow + runBlocker(queueParked); + add(0); + release(); + assertEquals( 1, output.size()); + assertNull(parker.parks.poll()); + + clear(); + + add(0); + cs.coalesce(input, output, 128); + assertEquals( 1, output.size()); + assertNull(parker.parks.poll()); + + clear(); + + //Test that too high an average doesn't coalesce + for (long ii = 0; ii < 128; ii++) + add(ii * 1000); + cs.coalesce(input, output, 128); + assertEquals(output.size(), 128); + assertTrue(parker.parks.isEmpty()); + + clear(); + + runBlocker(queueParked); + add(129 * 1000); + release(); + assertTrue(parker.parks.isEmpty()); + + clear(); + + //Test that a low enough average coalesces + cs = newStrategy("TIMEHORIZON", 200); + primeTimeHorizonAverage(99); + + clear(); + + runBlocker(queueParked); + add(100000 * 99); + queueRelease.release(); + parker.parked.acquire(); + add(100001 * 99); + parker.permits.release(); + fut.get(); + assertEquals(2, output.size()); + assertEquals(toNanos(198), parker.parks.poll().longValue()); + + clear(); + + //Test far future + add(Integer.MAX_VALUE); + cs.coalesce(input, output, 128); + assertEquals(1, output.size()); + assertTrue(parker.parks.isEmpty()); + + clear(); + + //Distant past + add(0); + cs.coalesce(input, output, 128); + assertEquals(1, output.size()); + assertTrue(parker.parks.isEmpty()); + } + + void primeTimeHorizonAverage(long micros) throws Exception + { + for (long ii = 0; ii < 100000; ii++) + { + add(ii * micros); + if (ii % 128 == 0) + { + cs.coalesce(input, output, 128); + output.clear(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/test/unit/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillisTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillisTest.java b/test/unit/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillisTest.java new file mode 100644 index 0000000..5c025cf --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillisTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class NanoTimeToCurrentTimeMillisTest +{ + @Test + public void testTimestampOrdering() throws Exception + { + long nowNanos = System.nanoTime(); + long now = System.currentTimeMillis(); + long lastConverted = 0; + for (long ii = 0; ii < 10000000; ii++) + { + now = Math.max(now, System.currentTimeMillis()); + if (ii % 10000 == 0) + { + synchronized (NanoTimeToCurrentTimeMillis.TIMESTAMP_UPDATE) + { + NanoTimeToCurrentTimeMillis.TIMESTAMP_UPDATE.notify(); + } + Thread.sleep(1); + } + nowNanos = Math.max(now, System.nanoTime()); + long convertedNow = NanoTimeToCurrentTimeMillis.convert(nowNanos); + assertTrue("convertedNow = " + convertedNow + " lastConverted = " + lastConverted + " in iteration " + ii, convertedNow >= (lastConverted - 1)); + lastConverted = convertedNow; + //Seems to be off by as much as two milliseconds sadly + assertTrue("now = " + now + " convertedNow = " + convertedNow + " in iteration " + ii, (now - 2) <= convertedNow); + + } + } +}