http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPopulation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPopulation.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPopulation.java new file mode 100644 index 0000000..da4c282 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPopulation.java @@ -0,0 +1,176 @@ +package org.apache.cassandra.stress.settings; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableList; + +import org.apache.cassandra.stress.generate.DistributionFactory; +import org.apache.cassandra.stress.generate.PartitionGenerator; + +public class SettingsPopulation implements Serializable +{ + + public final DistributionFactory distribution; + public final DistributionFactory readlookback; + public final PartitionGenerator.Order order; + public final boolean wrap; + public final long[] sequence; + + public static enum GenerateOrder + { + ARBITRARY, SHUFFLED, SORTED + } + + private SettingsPopulation(GenerateOptions options, DistributionOptions dist, SequentialOptions pop) + { + this.order = !options.contents.setByUser() ? PartitionGenerator.Order.ARBITRARY : PartitionGenerator.Order.valueOf(options.contents.value().toUpperCase()); + if (dist != null) + { + this.distribution = dist.seed.get(); + this.sequence = null; + this.readlookback = null; + this.wrap = false; + } + else + { + this.distribution = null; + String[] bounds = pop.populate.value().split("\\.\\.+"); + this.sequence = new long[] { OptionDistribution.parseLong(bounds[0]), OptionDistribution.parseLong(bounds[1]) }; + this.readlookback = pop.lookback.get(); + this.wrap = !pop.nowrap.setByUser(); + } + } + + public SettingsPopulation(DistributionOptions options) + { + this(options, options, null); + } + + public SettingsPopulation(SequentialOptions options) + { + this(options, null, options); + } + + // Option Declarations + + private static class GenerateOptions extends GroupedOptions + { + final OptionSimple contents = new OptionSimple("contents=", "(sorted|shuffled)", null, "SORTED or SHUFFLED (intra-)partition order; if not specified, will be consistent but arbitrary order", false); + + @Override + public List<? extends Option> options() + { + return Arrays.asList(contents); + } + } + + private static final class DistributionOptions extends GenerateOptions + { + final OptionDistribution seed; + + public DistributionOptions(String defaultLimit) + { + seed = new OptionDistribution("dist=", "gaussian(1.." + defaultLimit + ")", "Seeds are selected from this distribution"); + } + + @Override + public List<? extends Option> options() + { + return ImmutableList.<Option>builder().add(seed).addAll(super.options()).build(); + } + } + + private static final class SequentialOptions extends GenerateOptions + { + final OptionSimple populate; + final OptionDistribution lookback = new OptionDistribution("read-lookback=", "fixed(1)", "Select read seeds from the recently visited write seeds"); + final OptionSimple nowrap = new OptionSimple("no-wrap", "", null, "Terminate the stress test once all seeds in the range have been visited", false); + + public SequentialOptions(String defaultLimit) + { + populate = new OptionSimple("seq=", "[0-9]+\\.\\.+[0-9]+[MBK]?", + "1.." + defaultLimit, + "Generate all seeds in sequence", true); + } + + @Override + public List<? extends Option> options() + { + return ImmutableList.<Option>builder().add(populate, nowrap, lookback).addAll(super.options()).build(); + } + } + + // CLI Utility Methods + + public static SettingsPopulation get(Map<String, String[]> clArgs, SettingsCommand command) + { + // set default size to number of commands requested, unless set to err convergence, then use 1M + String defaultLimit = command.count <= 0 ? "1000000" : Long.toString(command.count); + + String[] params = clArgs.remove("-pop"); + if (params == null) + { + // return defaults: + switch(command.type) + { + case WRITE: + case COUNTER_WRITE: + return new SettingsPopulation(new SequentialOptions(defaultLimit)); + default: + return new SettingsPopulation(new DistributionOptions(defaultLimit)); + } + } + GroupedOptions options = GroupedOptions.select(params, new SequentialOptions(defaultLimit), new DistributionOptions(defaultLimit)); + if (options == null) + { + printHelp(); + System.out.println("Invalid -pop options provided, see output for valid options"); + System.exit(1); + } + return options instanceof SequentialOptions ? + new SettingsPopulation((SequentialOptions) options) : + new SettingsPopulation((DistributionOptions) options); + } + + public static void printHelp() + { + GroupedOptions.printOptions(System.out, "-pop", new SequentialOptions("N"), new DistributionOptions("N")); + } + + public static Runnable helpPrinter() + { + return new Runnable() + { + @Override + public void run() + { + printHelp(); + } + }; + } +} +
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java index 5fb2bb2..6e3a02e 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java @@ -44,12 +44,7 @@ public class SettingsSchema implements Serializable public SettingsSchema(Options options, SettingsCommand command) { if (command instanceof SettingsCommandUser) - { - if (options.compaction.setByUser() || options.keyspace.setByUser() || options.compression.setByUser() || options.replication.setByUser()) - throw new IllegalArgumentException("Cannot provide command line schema settings if a user profile is provided"); - keyspace = ((SettingsCommandUser) command).profile.keyspaceName; - } else keyspace = options.keyspace.value(); @@ -62,14 +57,7 @@ public class SettingsSchema implements Serializable public void createKeySpaces(StressSettings settings) { - if (!(settings.command instanceof SettingsCommandUser)) - { - createKeySpacesThrift(settings); - } - else - { - ((SettingsCommandUser) settings.command).profile.maybeCreateSchema(settings); - } + createKeySpacesThrift(settings); } @@ -189,6 +177,9 @@ public class SettingsSchema implements Serializable if (params == null) return new SettingsSchema(new Options(), command); + if (command instanceof SettingsCommandUser) + throw new IllegalArgumentException("-schema can only be provided with predefined operations insert, read, etc.; the 'user' command requires a schema yaml instead"); + GroupedOptions options = GroupedOptions.select(params, new Options()); if (options == null) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java index ab57289..bdd10e5 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java @@ -40,8 +40,10 @@ public class StressSettings implements Serializable { public final SettingsCommand command; public final SettingsRate rate; - public final SettingsKey keys; + public final SettingsPopulation generate; + public final SettingsInsert insert; public final SettingsColumn columns; + public final SettingsErrors errors; public final SettingsLog log; public final SettingsMode mode; public final SettingsNode node; @@ -50,12 +52,14 @@ public class StressSettings implements Serializable public final SettingsPort port; public final String sendToDaemon; - public StressSettings(SettingsCommand command, SettingsRate rate, SettingsKey keys, SettingsColumn columns, SettingsLog log, SettingsMode mode, SettingsNode node, SettingsSchema schema, SettingsTransport transport, SettingsPort port, String sendToDaemon) + public StressSettings(SettingsCommand command, SettingsRate rate, SettingsPopulation generate, SettingsInsert insert, SettingsColumn columns, SettingsErrors errors, SettingsLog log, SettingsMode mode, SettingsNode node, SettingsSchema schema, SettingsTransport transport, SettingsPort port, String sendToDaemon) { this.command = command; this.rate = rate; - this.keys = keys; + this.insert = insert; + this.generate = generate; this.columns = columns; + this.errors = errors; this.log = log; this.mode = mode; this.node = node; @@ -129,7 +133,7 @@ public class StressSettings implements Serializable } catch (Exception e) { - throw new RuntimeException(e.getMessage()); + throw new RuntimeException(e); } return client; @@ -189,9 +193,10 @@ public class StressSettings implements Serializable public void maybeCreateKeyspaces() { - if (command.type == Command.WRITE || command.type == Command.COUNTER_WRITE || command.type == Command.USER) + if (command.type == Command.WRITE || command.type == Command.COUNTER_WRITE) schema.createKeySpaces(this); - + else if (command.type == Command.USER) + ((SettingsCommandUser) command).profile.maybeCreateSchema(this); } public static StressSettings parse(String[] args) @@ -221,8 +226,10 @@ public class StressSettings implements Serializable String sendToDaemon = SettingsMisc.getSendToDaemon(clArgs); SettingsPort port = SettingsPort.get(clArgs); SettingsRate rate = SettingsRate.get(clArgs, command); - SettingsKey keys = SettingsKey.get(clArgs, command); + SettingsPopulation generate = SettingsPopulation.get(clArgs, command); + SettingsInsert insert = SettingsInsert.get(clArgs); SettingsColumn columns = SettingsColumn.get(clArgs); + SettingsErrors errors = SettingsErrors.get(clArgs); SettingsLog log = SettingsLog.get(clArgs); SettingsMode mode = SettingsMode.get(clArgs); SettingsNode node = SettingsNode.get(clArgs); @@ -244,7 +251,7 @@ public class StressSettings implements Serializable } System.exit(1); } - return new StressSettings(command, rate, keys, columns, log, mode, node, schema, transport, port, sendToDaemon); + return new StressSettings(command, rate, generate, insert, columns, errors, log, mode, node, schema, transport, port, sendToDaemon); } private static Map<String, String[]> parseMap(String[] args) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java b/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java new file mode 100644 index 0000000..2a38e7d --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java @@ -0,0 +1,259 @@ +package org.apache.cassandra.stress.util; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.TreeSet; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.cassandra.stress.generate.FasterRandom; + +// simple thread-unsafe skiplist that permits indexing/removal by position, insertion at the end +// (though easily extended to insertion at any position, not necessary here) +// we use it for sampling items by position for visiting writes in the pool of pending writes +public class DynamicList<E> +{ + + // represents a value and an index simultaneously; each node maintains a list + // of next pointers for each height in the skip-list this node participates in + // (a contiguous range from [0..height)) + public static class Node<E> + { + // stores the size of each descendant + private final int[] size; + // TODO: alternate links to save space + private final Node<E>[] links; + private final E value; + + private Node(int height, E value) + { + this.value = value; + links = new Node[height * 2]; + size = new int[height]; + Arrays.fill(size, 1); + } + + private int height() + { + return size.length; + } + + private Node<E> next(int i) + { + return links[i * 2]; + } + + private Node<E> prev(int i) + { + return links[1 + i * 2]; + } + + private void setNext(int i, Node<E> next) + { + links[i * 2] = next; + } + + private void setPrev(int i, Node<E> prev) + { + links[1 + i * 2] = prev; + } + + private Node parent(int parentHeight) + { + Node prev = this; + while (true) + { + int height = prev.height(); + if (parentHeight < height) + return prev; + prev = prev.prev(height - 1); + } + } + } + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final int maxHeight; + private final Node<E> head; + private int size; + + public DynamicList(int maxExpectedSize) + { + this.maxHeight = 3 + (int) Math.ceil(Math.log(maxExpectedSize) / Math.log(2)); + head = new Node<>(maxHeight, null); + } + + private int randomLevel() + { + return 1 + Integer.bitCount(ThreadLocalRandom.current().nextInt() & ((1 << (maxHeight - 1)) - 1)); + } + + // add the value to the end of the list, and return the associated Node that permits efficient removal + // regardless of its future position in the list from other modifications + public Node<E> append(E value) + { + Node<E> newTail = new Node<>(randomLevel(), value); + + lock.writeLock().lock(); + try + { + size++; + + Node<E> tail = head; + for (int i = maxHeight - 1 ; i >= newTail.height() ; i--) + { + Node<E> next; + while ((next = tail.next(i)) != null) + tail = next; + tail.size[i]++; + } + + for (int i = newTail.height() - 1 ; i >= 0 ; i--) + { + Node<E> next; + while ((next = tail.next(i)) != null) + tail = next; + tail.setNext(i, newTail); + newTail.setPrev(i, tail); + } + + return newTail; + } + finally + { + lock.writeLock().unlock(); + } + } + + // remove the provided node and its associated value from the list + public void remove(Node<E> node) + { + lock.writeLock().lock(); + try + { + size--; + + // go up through each level in the skip list, unlinking this node; this entails + // simply linking each neighbour to each other, and appending the size of the + // current level owned by this node's index to the preceding neighbour (since + // ownership is defined as any node that you must visit through the index, + // removal of ourselves from a level means the preceding index entry is the + // entry point to all of the removed node's descendants) + for (int i = 0 ; i < node.height() ; i++) + { + Node<E> prev = node.prev(i); + Node<E> next = node.next(i); + assert prev != null; + prev.setNext(i, next); + if (next != null) + next.setPrev(i, prev); + prev.size[i] += node.size[i] - 1; + } + + // then go up the levels, removing 1 from the size at each height above ours + for (int i = node.height() ; i < maxHeight ; i++) + { + // if we're at our height limit, we backtrack at our top level until we + // hit a neighbour with a greater height + while (i == node.height()) + node = node.prev(i - 1); + node.size[i]--; + } + } + finally + { + lock.writeLock().unlock(); + } + } + + // retrieve the item at the provided index, or return null if the index is past the end of the list + public E get(int index) + { + lock.readLock().lock(); + try + { + if (index >= size) + return null; + + index++; + int c = 0; + Node<E> finger = head; + for (int i = maxHeight - 1 ; i >= 0 ; i--) + { + while (c + finger.size[i] <= index) + { + c += finger.size[i]; + finger = finger.next(i); + } + } + + assert c == index; + return finger.value; + } + finally + { + lock.readLock().unlock(); + } + } + + // some quick and dirty tests to confirm the skiplist works as intended + // don't create a separate unit test - tools tree doesn't currently warrant them + + private boolean isWellFormed() + { + for (int i = 0 ; i < maxHeight ; i++) + { + int c = 0; + for (Node node = head ; node != null ; node = node.next(i)) + { + if (node.prev(i) != null && node.prev(i).next(i) != node) + return false; + if (node.next(i) != null && node.next(i).prev(i) != node) + return false; + c += node.size[i]; + if (i + 1 < maxHeight && node.parent(i + 1).next(i + 1) == node.next(i)) + { + if (node.parent(i + 1).size[i + 1] != c) + return false; + c = 0; + } + } + if (i == maxHeight - 1 && c != size + 1) + return false; + } + return true; + } + + public static void main(String[] args) + { + DynamicList<Integer> list = new DynamicList<>(20); + TreeSet<Integer> canon = new TreeSet<>(); + HashMap<Integer, Node> nodes = new HashMap<>(); + int c = 0; + for (int i = 0 ; i < 100000 ; i++) + { + nodes.put(c, list.append(c)); + canon.add(c); + c++; + } + FasterRandom rand = new FasterRandom(); + assert list.isWellFormed(); + for (int loop = 0 ; loop < 100 ; loop++) + { + System.out.println(loop); + for (int i = 0 ; i < 100000 ; i++) + { + int index = rand.nextInt(100000); + Integer seed = list.get(index); +// assert canon.headSet(seed, false).size() == index; + list.remove(nodes.remove(seed)); + canon.remove(seed); + nodes.put(c, list.append(c)); + canon.add(c); + c++; + } + assert list.isWellFormed(); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java index 45e1ba7..4e2b0a3 100644 --- a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java +++ b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java @@ -30,7 +30,7 @@ import java.util.concurrent.CountDownLatch; public final class Timer { - private static final int SAMPLE_SIZE_SHIFT = 10; + private static final int SAMPLE_SIZE_SHIFT = 14; private static final int SAMPLE_SIZE_MASK = (1 << SAMPLE_SIZE_SHIFT) - 1; private final Random rnd = new Random(); @@ -66,6 +66,11 @@ public final class Timer return 1 + (index >>> SAMPLE_SIZE_SHIFT); } + public boolean running() + { + return finalReport == null; + } + public void stop(long partitionCount, long rowCount) { maybeReport(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java index 2bdca82..b6d4e52 100644 --- a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java +++ b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java @@ -40,6 +40,7 @@ public class Timing private final CopyOnWriteArrayList<Timer> timers = new CopyOnWriteArrayList<>(); private volatile TimingInterval history; private final Random rnd = new Random(); + private boolean done; // TIMING @@ -57,11 +58,16 @@ public class Timing if (!ready.await(2L, TimeUnit.MINUTES)) throw new RuntimeException("Timed out waiting for a timer thread - seems one got stuck"); + boolean done = true; // reports have been filled in by timer threadCount, so merge List<TimingInterval> intervals = new ArrayList<>(); for (Timer timer : timers) + { intervals.add(timer.report); + done &= !timer.running(); + } + this.done = done; return TimingInterval.merge(rnd, intervals, Integer.MAX_VALUE, history.endNanos()); } @@ -78,10 +84,15 @@ public class Timing history = new TimingInterval(System.nanoTime()); } + public boolean done() + { + return done; + } + public TimingInterval snapInterval() throws InterruptedException { final TimingInterval interval = snapInterval(rnd); - history = TimingInterval.merge(rnd, Arrays.asList(interval, history), 50000, history.startNanos()); + history = TimingInterval.merge(rnd, Arrays.asList(interval, history), 200000, history.startNanos()); return interval; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java index db3fef1..50ab608 100644 --- a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java +++ b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java @@ -97,14 +97,14 @@ public final class TimingInterval } - public double realOpRate() + public double opRate() { return operationCount / ((end - start) * 0.000000001d); } - public double adjustedOpRate() + public double adjustedRowRate() { - return operationCount / ((end - (start + pauseLength)) * 0.000000001d); + return rowCount / ((end - (start + pauseLength)) * 0.000000001d); } public double partitionRate()