Improve stress workload realism

patch by benedict; reviewed by tjake for CASSANDRA-7519


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0580fb2b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0580fb2b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0580fb2b

Branch: refs/heads/cassandra-2.1.0
Commit: 0580fb2b7707beaa69019a73a6c53d86fe088a0a
Parents: c6a2c65
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Sun Sep 7 21:18:53 2014 +0700
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Sun Sep 7 21:19:58 2014 +0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 tools/cqlstress-counter-example.yaml            |  20 +-
 tools/cqlstress-example.yaml                    |  25 +-
 tools/cqlstress-insanity-example.yaml           |  20 +-
 .../org/apache/cassandra/stress/Operation.java  |   9 +-
 .../apache/cassandra/stress/StressAction.java   | 169 +++-------
 .../apache/cassandra/stress/StressMetrics.java  |  26 +-
 .../apache/cassandra/stress/StressProfile.java  |  75 ++++-
 .../org/apache/cassandra/stress/StressYaml.java |  12 +-
 .../stress/generate/DistributionInverted.java   |   7 +
 .../stress/generate/DistributionQuantized.java  |  90 +++++
 .../cassandra/stress/generate/FasterRandom.java | 116 +++++++
 .../cassandra/stress/generate/Partition.java    | 327 +++++++++++++++----
 .../stress/generate/PartitionGenerator.java     |  28 +-
 .../stress/generate/RatioDistribution.java      |   5 +
 .../apache/cassandra/stress/generate/Seed.java  |  67 ++++
 .../stress/generate/SeedGenerator.java          |  29 --
 .../cassandra/stress/generate/SeedManager.java  | 249 ++++++++++++++
 .../stress/generate/SeedRandomGenerator.java    |  54 ---
 .../stress/generate/SeedSeriesGenerator.java    |  42 ---
 .../stress/generate/values/Booleans.java        |   2 +-
 .../cassandra/stress/generate/values/Bytes.java |   9 +-
 .../cassandra/stress/generate/values/Dates.java |   3 +-
 .../stress/generate/values/Doubles.java         |   2 +-
 .../stress/generate/values/Floats.java          |   2 +-
 .../stress/generate/values/Generator.java       |   4 +-
 .../stress/generate/values/HexBytes.java        |   2 +-
 .../stress/generate/values/HexStrings.java      |   4 +-
 .../cassandra/stress/generate/values/Inets.java |   2 +-
 .../stress/generate/values/Integers.java        |   2 +-
 .../cassandra/stress/generate/values/Lists.java |   2 +-
 .../cassandra/stress/generate/values/Longs.java |   2 +-
 .../cassandra/stress/generate/values/Sets.java  |   2 +-
 .../stress/generate/values/Strings.java         |  12 +-
 .../stress/generate/values/TimeUUIDs.java       |   2 +-
 .../cassandra/stress/generate/values/UUIDs.java |   2 +-
 .../operations/predefined/CqlCounterAdder.java  |   5 +
 .../operations/predefined/CqlInserter.java      |   5 +
 .../predefined/PredefinedOperation.java         |   2 +-
 .../predefined/ThriftCounterAdder.java          |   5 +
 .../operations/predefined/ThriftInserter.java   |   5 +
 .../operations/userdefined/SchemaInsert.java    |  80 +++--
 .../operations/userdefined/SchemaQuery.java     |  87 ++++-
 .../operations/userdefined/SchemaStatement.java |  53 +--
 .../cassandra/stress/settings/CliOption.java    |   3 +-
 .../stress/settings/OptionDistribution.java     |  72 +++-
 .../settings/OptionRatioDistribution.java       |  40 ++-
 .../stress/settings/SettingsCommand.java        |  14 +-
 .../settings/SettingsCommandPreDefined.java     |  13 +-
 .../SettingsCommandPreDefinedMixed.java         |   4 +-
 .../stress/settings/SettingsCommandUser.java    |  16 +-
 .../stress/settings/SettingsErrors.java         |  92 ++++++
 .../stress/settings/SettingsInsert.java         | 103 ++++++
 .../cassandra/stress/settings/SettingsKey.java  | 153 ---------
 .../stress/settings/SettingsPopulation.java     | 176 ++++++++++
 .../stress/settings/SettingsSchema.java         |  17 +-
 .../stress/settings/StressSettings.java         |  23 +-
 .../cassandra/stress/util/DynamicList.java      | 259 +++++++++++++++
 .../org/apache/cassandra/stress/util/Timer.java |   7 +-
 .../apache/cassandra/stress/util/Timing.java    |  13 +-
 .../cassandra/stress/util/TimingInterval.java   |   6 +-
 61 files changed, 1955 insertions(+), 724 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 46836bf..e42d9c4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,7 +5,7 @@
  * cqlsh: DESCRIBE support for frozen UDTs, tuples (CASSANDRA-7863)
  * Avoid exposing internal classes over JMX (CASSANDRA-7879)
  * Add null check for keys when freezing collection (CASSANDRA-7869)
-
+ * Improve stress workload realism (CASSANDRA-7519)
 
 2.1.0-rc7
  * Add frozen keyword and require UDT to be frozen (CASSANDRA-7857)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/cqlstress-counter-example.yaml
----------------------------------------------------------------------
diff --git a/tools/cqlstress-counter-example.yaml 
b/tools/cqlstress-counter-example.yaml
index cff14b6..f8f70ea 100644
--- a/tools/cqlstress-counter-example.yaml
+++ b/tools/cqlstress-counter-example.yaml
@@ -62,19 +62,17 @@ columnspec:
     population: fixed(1)
 
 insert:
-  partitions: fixed(1)            # number of unique partitions to update in a 
single operation
-                                  # if perbatch < 1, multiple batches will be 
used but all partitions will
-                                  # occur in all batches (unless already 
finished); only the row counts will vary
-  pervisit: fixed(1)/1            # ratio of rows each partition should update 
in a single visit to the partition,
-                                  # as a proportion of the total possible for 
the partition
-  perbatch: fixed(1)/1            # number of rows each partition should 
update in a single batch statement,
-                                  # as a proportion of the proportion we are 
inserting this visit
-                                  # (i.e. compounds with (and capped by) 
pervisit)
-  batchtype: UNLOGGED             # type of batch to use
+  partitions: fixed(1)             # number of unique partitions to update in 
a single operation
+                                  # if batchcount > 1, multiple batches will 
be used but all partitions will
+                                  # occur in all batches (unless they finish 
early); only the row counts will vary
+  batchtype: LOGGED               # type of batch to use
+  select: fixed(1)/1              # uniform chance any single generated CQL 
row will be visited in a partition;
+                                  # generated for each partition 
independently, each time we visit it
 
 #
 # A list of queries you wish to run against the schema
 #
 queries:
-   simple1: select * from counttest where name = ?
-
+   simple1:
+      cql: select * from counttest where name = ?
+      fields: samerow             # samerow or multirow (select arguments from 
the same row, or randomly from all rows in the partition)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/cqlstress-example.yaml
----------------------------------------------------------------------
diff --git a/tools/cqlstress-example.yaml b/tools/cqlstress-example.yaml
index d5c90a2..4dd5e4a 100644
--- a/tools/cqlstress-example.yaml
+++ b/tools/cqlstress-example.yaml
@@ -69,25 +69,26 @@ columnspec:
     size: uniform(1..10)
     population: uniform(1..1M)     # the range of unique values to select for 
the field (default is 100Billion)
   - name: date
-    cluster: uniform(1..4)
+    cluster: uniform(20..40)
   - name: lval
     population: gaussian(1..1000)
     cluster: uniform(1..4)
 
 insert:
-  partitions: uniform(1..50)      # number of unique partitions to update in a 
single operation
-                                  # if perbatch < 1, multiple batches will be 
used but all partitions will
-                                  # occur in all batches (unless already 
finished); only the row counts will vary
-  pervisit: uniform(1..10)/10     # ratio of rows each partition should update 
in a single visit to the partition,
-                                  # as a proportion of the total possible for 
the partition
-  perbatch: ~exp(1..3)/4          # number of rows each partition should 
update in a single batch statement,
-                                  # as a proportion of the proportion we are 
inserting this visit
-                                  # (i.e. compounds with (and capped by) 
pervisit)
-  batchtype: UNLOGGED             # type of batch to use
+  partitions: uniform(1..50)       # number of unique partitions to update in 
a single operation
+                                  # if batchcount > 1, multiple batches will 
be used but all partitions will
+                                  # occur in all batches (unless they finish 
early); only the row counts will vary
+  batchtype: LOGGED               # type of batch to use
+  select: uniform(1..10)/10       # uniform chance any single generated CQL 
row will be visited in a partition;
+                                  # generated for each partition 
independently, each time we visit it
 
 #
 # A list of queries you wish to run against the schema
 #
 queries:
-   simple1: select * from typestest where name = ? and choice = ? LIMIT 100
-   range1: select * from typestest where name = ? and choice = ? and date >= ? 
LIMIT 100
+   simple1:
+      cql: select * from typestest where name = ? and choice = ? LIMIT 100
+      fields: samerow             # samerow or multirow (select arguments from 
the same row, or randomly from all rows in the partition)
+   range1:
+      cql: select * from typestest where name = ? and choice = ? and date >= ? 
LIMIT 100
+      fields: multirow            # samerow or multirow (select arguments from 
the same row, or randomly from all rows in the partition)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/cqlstress-insanity-example.yaml
----------------------------------------------------------------------
diff --git a/tools/cqlstress-insanity-example.yaml 
b/tools/cqlstress-insanity-example.yaml
index ef1bb3a..ea4f97f 100644
--- a/tools/cqlstress-insanity-example.yaml
+++ b/tools/cqlstress-insanity-example.yaml
@@ -74,19 +74,17 @@ columnspec:
 
 
 insert:
-  partitions: fixed(1)            # number of unique partitions to update in a 
single operation
-                                  # if perbatch < 1, multiple batches will be 
used but all partitions will
-                                  # occur in all batches (unless already 
finished); only the row counts will vary
-  pervisit: uniform(1..10)/10     # ratio of rows each partition should update 
in a single visit to the partition,
-                                  # as a proportion of the total possible for 
the partition
-  perbatch: fixed(1)/1            # number of rows each partition should 
update in a single batch statement,
-                                  # as a proportion of the proportion we are 
inserting this visit
-                                  # (i.e. compounds with (and capped by) 
pervisit)
-  batchtype: UNLOGGED             # type of batch to use
+  partitions: fixed(1)             # number of unique partitions to update in 
a single operation
+                                  # if batchcount > 1, multiple batches will 
be used but all partitions will
+                                  # occur in all batches (unless they finish 
early); only the row counts will vary
+  batchtype: LOGGED               # type of batch to use
+  select: fixed(1)/1              # uniform chance any single generated CQL 
row will be visited in a partition;
+                                  # generated for each partition 
independently, each time we visit it
 
 #
 # A list of queries you wish to run against the schema
 #
 queries:
-   simple1: select * from insanitytest where name = ? and choice = ? LIMIT 100
-
+   simple1:
+      cql: select * from insanitytest where name = ? and choice = ? LIMIT 100
+      fields: samerow             # samerow or multirow (select arguments from 
the same row, or randomly from all rows in the partition)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java 
b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index 7831074..5560240 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -61,6 +61,11 @@ public abstract class Operation
         this.partitions = partitions;
     }
 
+    public boolean isWrite()
+    {
+        return false;
+    }
+
     /**
      * Run operation
      * @param client Cassandra Thrift client connection
@@ -84,7 +89,7 @@ public abstract class Operation
         String exceptionMessage = null;
 
         int tries = 0;
-        for (; tries < settings.command.tries; tries++)
+        for (; tries < settings.errors.tries; tries++)
         {
             try
             {
@@ -144,7 +149,7 @@ public abstract class Operation
 
     protected void error(String message) throws IOException
     {
-        if (!settings.command.ignoreErrors)
+        if (!settings.errors.ignore)
             throw new IOException(message);
         else if (settings.log.level.compareTo(SettingsLog.Level.MINIMAL) > 0)
             System.err.println(message);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java 
b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 2105a72..e58bfa1 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -23,7 +23,6 @@ import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -32,7 +31,6 @@ import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.cassandra.stress.generate.Partition;
-import org.apache.cassandra.stress.generate.SeedGenerator;
 import org.apache.cassandra.stress.operations.OpDistribution;
 import org.apache.cassandra.stress.operations.OpDistributionFactory;
 import org.apache.cassandra.stress.settings.*;
@@ -58,6 +56,7 @@ public class StressAction implements Runnable
         // creating keyspace and column families
         settings.maybeCreateKeyspaces();
 
+        // TODO: warmup should
         if (!settings.command.noWarmup)
             warmup(settings.command.getFactory(settings));
 
@@ -155,8 +154,8 @@ public class StressAction implements Runnable
         double improvement = 0;
         for (int i = results.size() - count ; i < results.size() ; i++)
         {
-            double prev = results.get(i - 
1).getTiming().getHistory().realOpRate();
-            double cur = results.get(i).getTiming().getHistory().realOpRate();
+            double prev = results.get(i - 1).getTiming().getHistory().opRate();
+            double cur = results.get(i).getTiming().getHistory().opRate();
             improvement += (cur - prev) / prev;
         }
         return improvement / count;
@@ -169,11 +168,11 @@ public class StressAction implements Runnable
                                      operations.desc(),
                                      threadCount,
                                      opCount > 0 ? " for " + opCount + " 
iterations" : "until stderr of mean < " + settings.command.targetUncertainty));
-        final WorkQueue workQueue;
+        final WorkManager workManager;
         if (opCount < 0)
-            workQueue = new ContinuousWorkQueue(50);
+            workManager = new ContinuousWorkManager();
         else
-            workQueue = FixedWorkQueue.build(opCount);
+            workManager = new FixedWorkManager(opCount);
 
         RateLimiter rateLimiter = null;
         // TODO : move this to a new queue wrapper that gates progress based 
on a poisson (or configurable) distribution
@@ -185,7 +184,7 @@ public class StressAction implements Runnable
         final CountDownLatch done = new CountDownLatch(threadCount);
         final Consumer[] consumers = new Consumer[threadCount];
         for (int i = 0; i < threadCount; i++)
-            consumers[i] = new Consumer(operations, done, workQueue, metrics, 
rateLimiter);
+            consumers[i] = new Consumer(operations, done, workManager, 
metrics, rateLimiter);
 
         // starting worker threadCount
         for (int i = 0; i < threadCount; i++)
@@ -201,14 +200,15 @@ public class StressAction implements Runnable
                         settings.command.minimumUncertaintyMeasurements,
                         settings.command.maximumUncertaintyMeasurements);
             } catch (InterruptedException e) { }
-            workQueue.stop();
+            workManager.stop();
         }
 
         try
         {
             done.await();
             metrics.stop();
-        } catch (InterruptedException e) {}
+        }
+        catch (InterruptedException e) {}
 
         if (metrics.wasCancelled())
             return null;
@@ -231,20 +231,18 @@ public class StressAction implements Runnable
         private final OpDistribution operations;
         private final StressMetrics metrics;
         private final Timer timer;
-        private final SeedGenerator seedGenerator;
         private final RateLimiter rateLimiter;
         private volatile boolean success = true;
-        private final WorkQueue workQueue;
+        private final WorkManager workManager;
         private final CountDownLatch done;
 
-        public Consumer(OpDistributionFactory operations, CountDownLatch done, 
WorkQueue workQueue, StressMetrics metrics, RateLimiter rateLimiter)
+        public Consumer(OpDistributionFactory operations, CountDownLatch done, 
WorkManager workManager, StressMetrics metrics, RateLimiter rateLimiter)
         {
             this.done = done;
             this.rateLimiter = rateLimiter;
-            this.workQueue = workQueue;
+            this.workManager = workManager;
             this.metrics = metrics;
             this.timer = metrics.getTiming().newTimer();
-            this.seedGenerator = settings.keys.newSeedGenerator();
             this.operations = operations.get(timer);
         }
 
@@ -275,42 +273,33 @@ public class StressAction implements Runnable
                 }
 
                 int maxBatchSize = operations.maxBatchSize();
-                Work work = workQueue.poll();
                 Partition[] partitions = new Partition[maxBatchSize];
-                int workDone = 0;
-                while (work != null)
+                while (true)
                 {
 
+                    // TODO: Operation should be able to ecapsulate much of 
this behaviour
                     Operation op = operations.next();
                     op.generator.reset();
-                    int batchSize = Math.max(1, (int) 
op.partitionCount.next());
-                    int partitionCount = 0;
 
+                    int batchSize = workManager.takePermits(Math.max(1, (int) 
op.partitionCount.next()));
+                    if (batchSize < 0)
+                        break;
+
+                    if (rateLimiter != null)
+                        rateLimiter.acquire(batchSize);
+
+                    int partitionCount = 0;
                     while (partitionCount < batchSize)
                     {
-                        int count = Math.min((work.count - workDone), 
batchSize - partitionCount);
-                        for (int i = 0 ; i < count ; i++)
-                        {
-                            long seed = seedGenerator.next(work.offset + 
workDone + i);
-                            partitions[partitionCount + i] = 
op.generator.generate(seed);
-                        }
-                        workDone += count;
-                        partitionCount += count;
-                        if (workDone == work.count)
-                        {
-                            workDone = 0;
-                            work = workQueue.poll();
-                            if (work == null)
-                            {
-                                if (partitionCount == 0)
-                                    return;
-                                break;
-                            }
-                            if (rateLimiter != null)
-                                rateLimiter.acquire(work.count);
-                        }
+                        Partition p = op.generator.generate(op);
+                        if (p == null)
+                            break;
+                        partitions[partitionCount++] = p;
                     }
 
+                    if (partitionCount == 0)
+                        break;
+
                     op.setPartitions(Arrays.asList(partitions).subList(0, 
partitionCount));
 
                     try
@@ -340,7 +329,7 @@ public class StressAction implements Runnable
 
                         e.printStackTrace(output);
                         success = false;
-                        workQueue.stop();
+                        workManager.stop();
                         metrics.cancel();
                         return;
                     }
@@ -356,107 +345,58 @@ public class StressAction implements Runnable
 
     }
 
-    private interface WorkQueue
+    private interface WorkManager
     {
-        // null indicates consumer should terminate
-        Work poll();
+        // -1 indicates consumer should terminate
+        int takePermits(int count);
 
         // signal all consumers to terminate
         void stop();
     }
 
-    private static final class Work
-    {
-        // index of operations
-        final long offset;
-
-        // how many operations to perform
-        final int count;
-
-        public Work(long offset, int count)
-        {
-            this.offset = offset;
-            this.count = count;
-        }
-    }
-
-    private static final class FixedWorkQueue implements WorkQueue
+    private static final class FixedWorkManager implements WorkManager
     {
 
-        final ArrayBlockingQueue<Work> work;
-        volatile boolean stop = false;
+        final AtomicLong permits;
 
-        public FixedWorkQueue(ArrayBlockingQueue<Work> work)
+        public FixedWorkManager(long permits)
         {
-            this.work = work;
+            this.permits = new AtomicLong(permits);
         }
 
         @Override
-        public Work poll()
+        public int takePermits(int count)
         {
-            if (stop)
-                return null;
-            return work.poll();
+            while (true)
+            {
+                long cur = permits.get();
+                if (cur == 0)
+                    return -1;
+                count = (int) Math.min(count, cur);
+                long next = cur - count;
+                if (permits.compareAndSet(cur, next))
+                    return count;
+            }
         }
 
         @Override
         public void stop()
         {
-            stop = true;
+            permits.getAndSet(0);
         }
-
-        static FixedWorkQueue build(long operations)
-        {
-            // target splitting into around 50-500k items, with a minimum size 
of 20
-            if (operations > Integer.MAX_VALUE * (1L << 19))
-                throw new IllegalStateException("Cannot currently support more 
than approx 2^50 operations for one stress run. This is a LOT.");
-            int batchSize = (int) (operations / (1 << 19));
-            if (batchSize < 20)
-                batchSize = 20;
-            ArrayBlockingQueue<Work> work = new ArrayBlockingQueue<>(
-                    (int) ((operations / batchSize)
-                  + (operations % batchSize == 0 ? 0 : 1))
-            );
-            long offset = 0;
-            while (offset < operations)
-            {
-                work.add(new Work(offset, (int) Math.min(batchSize, operations 
- offset)));
-                offset += batchSize;
-            }
-            return new FixedWorkQueue(work);
-        }
-
     }
 
-    private static final class ContinuousWorkQueue implements WorkQueue
+    private static final class ContinuousWorkManager implements WorkManager
     {
 
-        final AtomicLong offset = new AtomicLong();
-        final int batchSize;
         volatile boolean stop = false;
 
-        private ContinuousWorkQueue(int batchSize)
-        {
-            this.batchSize = batchSize;
-        }
-
         @Override
-        public Work poll()
+        public int takePermits(int count)
         {
             if (stop)
-                return null;
-            return new Work(nextOffset(), batchSize);
-        }
-
-        private long nextOffset()
-        {
-            final int inc = batchSize;
-            while (true)
-            {
-                final long cur = offset.get();
-                if (offset.compareAndSet(cur, cur + inc))
-                    return cur;
-            }
+                return -1;
+            return count;
         }
 
         @Override
@@ -466,5 +406,4 @@ public class StressAction implements Runnable
         }
 
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java 
b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
index 7e5c1b6..a9edfc6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@ -42,7 +42,7 @@ public class StressMetrics
     private final Thread thread;
     private volatile boolean stop = false;
     private volatile boolean cancelled = false;
-    private final Uncertainty opRateUncertainty = new Uncertainty();
+    private final Uncertainty rowRateUncertainty = new Uncertainty();
     private final CountDownLatch stopped = new CountDownLatch(1);
     private final Timing timing = new Timing();
 
@@ -68,6 +68,7 @@ public class StressMetrics
                                 Thread.sleep(logIntervalMillis);
                             else
                                 Thread.sleep(sleep);
+
                             update();
                         } catch (InterruptedException e)
                         {
@@ -86,6 +87,7 @@ public class StressMetrics
                 }
                 finally
                 {
+                    rowRateUncertainty.wakeAll();
                     stopped.countDown();
                 }
             }
@@ -99,7 +101,7 @@ public class StressMetrics
 
     public void waitUntilConverges(double targetUncertainty, int 
minMeasurements, int maxMeasurements) throws InterruptedException
     {
-        opRateUncertainty.await(targetUncertainty, minMeasurements, 
maxMeasurements);
+        rowRateUncertainty.await(targetUncertainty, minMeasurements, 
maxMeasurements);
     }
 
     public void cancel()
@@ -107,7 +109,7 @@ public class StressMetrics
         cancelled = true;
         stop = true;
         thread.interrupt();
-        opRateUncertainty.wakeAll();
+        rowRateUncertainty.wakeAll();
     }
 
     public void stop() throws InterruptedException
@@ -120,8 +122,11 @@ public class StressMetrics
     private void update() throws InterruptedException
     {
         TimingInterval interval = timing.snapInterval();
-        printRow("", interval, timing.getHistory(), opRateUncertainty, output);
-        opRateUncertainty.update(interval.adjustedOpRate());
+        if (interval.partitionCount != 0)
+            printRow("", interval, timing.getHistory(), rowRateUncertainty, 
output);
+        rowRateUncertainty.update(interval.adjustedRowRate());
+        if (timing.done())
+            stop = true;
     }
 
 
@@ -132,14 +137,15 @@ public class StressMetrics
 
     private static void printHeader(String prefix, PrintStream output)
     {
-        output.println(prefix + String.format(HEADFORMAT, "partitions","op/s", 
"pk/s", "row/s","mean","med",".95",".99",".999","max","time","stderr"));
+        output.println(prefix + String.format(HEADFORMAT, "total ops","adj 
row/s","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr"));
     }
 
     private static void printRow(String prefix, TimingInterval interval, 
TimingInterval total, Uncertainty opRateUncertainty, PrintStream output)
     {
         output.println(prefix + String.format(ROWFORMAT,
-                total.partitionCount,
-                interval.realOpRate(),
+                total.operationCount,
+                interval.adjustedRowRate(),
+                interval.opRate(),
                 interval.partitionRate(),
                 interval.rowRate(),
                 interval.meanLatency(),
@@ -157,7 +163,7 @@ public class StressMetrics
         output.println("\n");
         output.println("Results:");
         TimingInterval history = timing.getHistory();
-        output.println(String.format("op rate                   : %.0f", 
history.realOpRate()));
+        output.println(String.format("op rate                   : %.0f", 
history.opRate()));
         output.println(String.format("partition rate            : %.0f", 
history.partitionRate()));
         output.println(String.format("row rate                  : %.0f", 
history.rowRate()));
         output.println(String.format("latency mean              : %.1f", 
history.meanLatency()));
@@ -181,7 +187,7 @@ public class StressMetrics
             printRow(String.format(formatstr, ids.get(i)),
                     summarise.get(i).timing.getHistory(),
                     summarise.get(i).timing.getHistory(),
-                    summarise.get(i).opRateUncertainty,
+                    summarise.get(i).rowRateUncertainty,
                     out
             );
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java 
b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 4e09775..de561f3 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -24,15 +24,18 @@ package org.apache.cassandra.stress;
 import com.datastax.driver.core.*;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
 
+import com.google.common.base.Function;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.CreateKeyspaceStatement;
 import org.apache.cassandra.exceptions.RequestValidationException;
 
+import org.apache.cassandra.stress.generate.Distribution;
 import org.apache.cassandra.stress.generate.DistributionFactory;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
 import org.apache.cassandra.stress.generate.RatioDistributionFactory;
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.generate.values.Booleans;
 import org.apache.cassandra.stress.generate.values.Bytes;
 import org.apache.cassandra.stress.generate.values.Generator;
@@ -88,7 +91,7 @@ public class StressProfile implements Serializable
     public String keyspaceName;
     public String tableName;
     private Map<String, GeneratorConfig> columnConfigs;
-    private Map<String, String> queries;
+    private Map<String, StressYaml.QueryDef> queries;
     private Map<String, String> insert;
 
     transient volatile TableMetadata tableMetaData;
@@ -97,11 +100,11 @@ public class StressProfile implements Serializable
 
     transient volatile BatchStatement.Type batchType;
     transient volatile DistributionFactory partitions;
-    transient volatile RatioDistributionFactory pervisit;
-    transient volatile RatioDistributionFactory perbatch;
+    transient volatile RatioDistributionFactory selectchance;
     transient volatile PreparedStatement insertStatement;
     transient volatile Integer thriftInsertId;
 
+    transient volatile Map<String, SchemaQuery.ArgSelect> argSelects;
     transient volatile Map<String, PreparedStatement> queryStatements;
     transient volatile Map<String, Integer> thriftQueryIds;
 
@@ -242,13 +245,18 @@ public class StressProfile implements Serializable
                         ThriftClient tclient = settings.getThriftClient();
                         Map<String, PreparedStatement> stmts = new HashMap<>();
                         Map<String, Integer> tids = new HashMap<>();
-                        for (Map.Entry<String, String> e : queries.entrySet())
+                        Map<String, SchemaQuery.ArgSelect> args = new 
HashMap<>();
+                        for (Map.Entry<String, StressYaml.QueryDef> e : 
queries.entrySet())
                         {
-                            stmts.put(e.getKey().toLowerCase(), 
jclient.prepare(e.getValue()));
-                            tids.put(e.getKey().toLowerCase(), 
tclient.prepare_cql3_query(e.getValue(), Compression.NONE));
+                            stmts.put(e.getKey().toLowerCase(), 
jclient.prepare(e.getValue().cql));
+                            tids.put(e.getKey().toLowerCase(), 
tclient.prepare_cql3_query(e.getValue().cql, Compression.NONE));
+                            args.put(e.getKey().toLowerCase(), 
e.getValue().fields == null
+                                                                     ? 
SchemaQuery.ArgSelect.MULTIROW
+                                                                     : 
SchemaQuery.ArgSelect.valueOf(e.getValue().fields.toUpperCase()));
                         }
                         thriftQueryIds = tids;
                         queryStatements = stmts;
+                        argSelects = args;
                     }
                     catch (TException e)
                     {
@@ -260,7 +268,9 @@ public class StressProfile implements Serializable
 
         // TODO validation
         name = name.toLowerCase();
-        return new SchemaQuery(timer, generator, settings, 
thriftQueryIds.get(name), queryStatements.get(name), 
ThriftConversion.fromThrift(settings.command.consistencyLevel), 
ValidationType.NOT_FAIL);
+        if (!queryStatements.containsKey(name))
+            throw new IllegalArgumentException("No query defined with name " + 
name);
+        return new SchemaQuery(timer, generator, settings, 
thriftQueryIds.get(name), queryStatements.get(name), 
ThriftConversion.fromThrift(settings.command.consistencyLevel), 
ValidationType.NOT_FAIL, argSelects.get(name));
     }
 
     public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, 
StressSettings settings)
@@ -328,18 +338,37 @@ public class StressProfile implements Serializable
                         insert = new HashMap<>();
                     lowerCase(insert);
 
-                    partitions = 
OptionDistribution.get(!insert.containsKey("partitions") ? "fixed(1)" : 
insert.remove("partitions"));
-                    pervisit = 
OptionRatioDistribution.get(!insert.containsKey("pervisit") ? "fixed(1)/1" : 
insert.remove("pervisit"));
-                    perbatch = 
OptionRatioDistribution.get(!insert.containsKey("perbatch") ? "fixed(1)/1" : 
insert.remove("perbatch"));
-                    batchType = !insert.containsKey("batchtype") ? 
BatchStatement.Type.LOGGED : 
BatchStatement.Type.valueOf(insert.remove("batchtype"));
+                    partitions = select(settings.insert.batchsize, 
"partitions", "fixed(1)", insert, OptionDistribution.BUILDER);
+                    selectchance = select(settings.insert.selectRatio, 
"select", "fixed(1)/1", insert, OptionRatioDistribution.BUILDER);
+                    batchType = settings.insert.batchType != null
+                                ? settings.insert.batchType
+                                : !insert.containsKey("batchtype")
+                                  ? BatchStatement.Type.LOGGED
+                                  : 
BatchStatement.Type.valueOf(insert.remove("batchtype"));
                     if (!insert.isEmpty())
                         throw new IllegalArgumentException("Unrecognised 
insert option(s): " + insert);
 
+                    Distribution visits = settings.insert.visits.get();
+                    // these min/max are not absolutely accurate if 
selectchance < 1, but they're close enough to
+                    // guarantee the vast majority of actions occur in these 
bounds
+                    double minBatchSize = selectchance.get().min() * 
partitions.get().minValue() * generator.minRowCount * (1d / visits.maxValue());
+                    double maxBatchSize = selectchance.get().max() * 
partitions.get().maxValue() * generator.maxRowCount * (1d / visits.minValue());
+                    System.out.printf("Generating batches with [%d..%d] 
partitions and [%.0f..%.0f] rows (of [%.0f..%.0f] total rows in the 
partitions)\n",
+                                      partitions.get().minValue(), 
partitions.get().maxValue(),
+                                      minBatchSize, maxBatchSize,
+                                      partitions.get().minValue() * 
generator.minRowCount,
+                                      partitions.get().maxValue() * 
generator.maxRowCount);
                     if (generator.maxRowCount > 100 * 1000 * 1000)
                         System.err.printf("WARNING: You have defined a schema 
that permits very large partitions (%.0f max rows (>100M))\n", 
generator.maxRowCount);
-                    if (perbatch.get().max() * pervisit.get().max() * 
partitions.get().maxValue() * generator.maxRowCount > 100000)
+                    if (batchType == BatchStatement.Type.LOGGED && 
maxBatchSize > 65535)
+                    {
+                        System.err.printf("ERROR: You have defined a workload 
that generates batches with more than 65k rows (%.0f), but have required the 
use of LOGGED batches. There is a 65k row limit on a single batch.\n",
+                                          selectchance.get().max() * 
partitions.get().maxValue() * generator.maxRowCount);
+                        System.exit(1);
+                    }
+                    if (maxBatchSize > 100000)
                         System.err.printf("WARNING: You have defined a schema 
that permits very large batches (%.0f max rows (>100K)). This may OOM this 
stress client, or the server.\n",
-                                           perbatch.get().max() * 
pervisit.get().max() * partitions.get().maxValue() * generator.maxRowCount);
+                                          selectchance.get().max() * 
partitions.get().maxValue() * generator.maxRowCount);
 
                     JavaDriverClient client = settings.getJavaDriverClient();
                     String query = sb.toString();
@@ -356,10 +385,20 @@ public class StressProfile implements Serializable
             }
         }
 
-        return new SchemaInsert(timer, generator, settings, partitions.get(), 
pervisit.get(), perbatch.get(), thriftInsertId, insertStatement, 
ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType);
+        return new SchemaInsert(timer, generator, settings, partitions.get(), 
selectchance.get(), thriftInsertId, insertStatement, 
ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType);
+    }
+
+    private static <E> E select(E first, String key, String defValue, 
Map<String, String> map, Function<String, E> builder)
+    {
+        String val = map.remove(key);
+        if (first != null)
+            return first;
+        if (val != null)
+            return builder.apply(val);
+        return builder.apply(defValue);
     }
 
-    public PartitionGenerator newGenerator(StressSettings settings)
+    public PartitionGenerator newGenerator(StressSettings settings, 
SeedManager seeds)
     {
         if (generatorFactory == null)
         {
@@ -371,7 +410,7 @@ public class StressProfile implements Serializable
             }
         }
 
-        return generatorFactory.newGenerator();
+        return generatorFactory.newGenerator(settings, seeds);
     }
 
     private class GeneratorFactory
@@ -393,9 +432,9 @@ public class StressProfile implements Serializable
                     valueColumns.add(new ColumnInfo(metadata.getName(), 
metadata.getType(), columnConfigs.get(metadata.getName())));
         }
 
-        PartitionGenerator newGenerator()
+        PartitionGenerator newGenerator(StressSettings settings, SeedManager 
seeds)
         {
-            return new PartitionGenerator(get(partitionKeys), 
get(clusteringColumns), get(valueColumns));
+            return new PartitionGenerator(get(partitionKeys), 
get(clusteringColumns), get(valueColumns), settings.generate.order, seeds);
         }
 
         List<Generator> get(List<ColumnInfo> columnInfos)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java 
b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
index deea1fb..b6efc5e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java
@@ -30,8 +30,14 @@ public class StressYaml
     public String table;
     public String table_definition;
 
-    public List<Map<String,Object>> columnspec;
-    public Map<String,String> queries;
-    public Map<String,String> insert;
+    public List<Map<String, Object>> columnspec;
+    public Map<String, QueryDef> queries;
+    public Map<String, String> insert;
+
+    public static class QueryDef
+    {
+        public String cql;
+        public String fields;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java
 
b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java
index 13fae0d..4062b58 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java
@@ -55,4 +55,11 @@ public class DistributionInverted extends Distribution
         wrapped.setSeed(seed);
     }
 
+    public static Distribution invert(Distribution distribution)
+    {
+        if (distribution instanceof DistributionInverted)
+            return ((DistributionInverted) distribution).wrapped;
+        return new DistributionInverted(distribution);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionQuantized.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionQuantized.java
 
b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionQuantized.java
new file mode 100644
index 0000000..9771134
--- /dev/null
+++ 
b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionQuantized.java
@@ -0,0 +1,90 @@
+package org.apache.cassandra.stress.generate;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.cassandra.stress.Stress;
+
+public class DistributionQuantized extends Distribution
+{
+
+    final Distribution delegate;
+    final long[] bounds;
+    final Random random = new Random();
+
+    public DistributionQuantized(Distribution delegate, int quantas)
+    {
+        this.delegate = delegate;
+        this.bounds = new long[quantas + 1];
+        bounds[0] = delegate.minValue();
+        bounds[quantas] = delegate.maxValue() + 1;
+        for (int i = 1 ; i < quantas ; i++)
+            bounds[i] = delegate.inverseCumProb(i / (double) quantas);
+    }
+
+    @Override
+    public long next()
+    {
+        int quanta = quanta(delegate.next());
+        return bounds[quanta] + (long) (random.nextDouble() * ((bounds[quanta 
+ 1] - bounds[quanta])));
+    }
+
+    public double nextDouble()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long inverseCumProb(double cumProb)
+    {
+        long val = delegate.inverseCumProb(cumProb);
+        int quanta = quanta(val);
+        if (quanta < 0)
+            return bounds[0];
+        if (quanta >= bounds.length - 1)
+            return bounds[bounds.length - 1] - 1;
+        cumProb -= (quanta / ((double) bounds.length - 1));
+        cumProb *= (double) bounds.length - 1;
+        return bounds[quanta] + (long) (cumProb * (bounds[quanta + 1] - 
bounds[quanta]));
+    }
+
+    int quanta(long val)
+    {
+        int i = Arrays.binarySearch(bounds, val);
+        if (i < 0)
+            return -2 -i;
+        return i - 1;
+    }
+
+    public void setSeed(long seed)
+    {
+        delegate.setSeed(seed);
+    }
+
+    public static void main(String[] args) throws Exception
+    {
+        Stress.main(new String[] { "print", "dist=qextreme(1..1M,2,2)"});
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java
new file mode 100644
index 0000000..455fec4
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java
@@ -0,0 +1,116 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.stress.generate;
+
+import java.util.Random;
+
+import org.apache.commons.math3.random.RandomGenerator;
+
+// based on http://en.wikipedia.org/wiki/Xorshift, but periodically we reseed 
with our stronger random generator
+// note it is also non-atomically updated, so expects to be used by a single 
thread
+public class FasterRandom implements RandomGenerator
+{
+    final Random random = new Random();
+
+    private long seed;
+    private int reseed;
+
+    public void setSeed(int seed)
+    {
+        setSeed((long) seed);
+    }
+
+    public void setSeed(int[] ints)
+    {
+        if (ints.length > 1)
+            setSeed (((long) ints[0] << 32) | ints[1]);
+        else
+            setSeed(ints[0]);
+    }
+
+    public void setSeed(long seed)
+    {
+        this.seed = seed;
+        rollover();
+    }
+
+    private void rollover()
+    {
+        this.reseed = 0;
+        random.setSeed(seed);
+        seed = random.nextLong();
+    }
+
+    public void nextBytes(byte[] bytes)
+    {
+        int i = 0;
+        while (i < bytes.length)
+        {
+            long next = nextLong();
+            while (i < bytes.length)
+            {
+                bytes[i++] = (byte) (next & 0xFF);
+                next >>>= 8;
+            }
+        }
+    }
+
+    public int nextInt()
+    {
+        return (int) nextLong();
+    }
+
+    public int nextInt(int i)
+    {
+        return Math.abs((int) nextLong() % i);
+    }
+
+    public long nextLong()
+    {
+        if (++this.reseed == 32)
+            rollover();
+
+        long seed = this.seed;
+        seed ^= seed >> 12;
+        seed ^= seed << 25;
+        seed ^= seed >> 27;
+        this.seed = seed;
+        return seed * 2685821657736338717L;
+    }
+
+    public boolean nextBoolean()
+    {
+        return ((int) nextLong() & 1) == 1;
+    }
+
+    public float nextFloat()
+    {
+        return Float.intBitsToFloat((int) nextLong());
+    }
+
+    public double nextDouble()
+    {
+        return Double.longBitsToDouble(nextLong());
+    }
+
+    public double nextGaussian()
+    {
+        return random.nextGaussian();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java
index f05e95b..18f5732 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java
@@ -23,24 +23,34 @@ package org.apache.cassandra.stress.generate;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.stress.generate.values.Generator;
 
 // a partition is re-used to reduce garbage generation, as is its internal 
RowIterator
+// TODO: we should batch the generation of clustering components so we can 
bound the time and size necessary to
+// generate huge partitions with only a small number of clustering components; 
i.e. we should generate seeds for batches
+// of a single component, and then generate the values within those batches as 
necessary. this will be difficult with
+// generating sorted partitions, and may require generator support (e.g. we 
may need to support generating prefixes
+// that are extended/suffixed to generate each batch, so that we can sort the 
prefixes)
 public class Partition
 {
 
     private long idseed;
+    private Seed seed;
     private final Object[] partitionKey;
     private final PartitionGenerator generator;
     private final RowIterator iterator;
@@ -55,31 +65,32 @@ public class Partition
             iterator = new SingleRowIterator();
     }
 
-    void setSeed(long seed)
+    void setSeed(Seed seed)
     {
         long idseed = 0;
         for (int i = 0 ; i < partitionKey.length ; i++)
         {
             Generator generator = this.generator.partitionKey.get(i);
             // set the partition key seed based on the current work item we're 
processing
-            generator.setSeed(seed);
+            generator.setSeed(seed.seed);
             Object key = generator.generate();
             partitionKey[i] = key;
             // then contribute this value to the data seed
             idseed = seed(key, generator.type, idseed);
         }
+        this.seed = seed;
         this.idseed = idseed;
     }
 
-    public RowIterator iterator(double useChance)
+    public RowIterator iterator(double useChance, boolean isWrite)
     {
-        iterator.reset(useChance, 0);
+        iterator.reset(useChance, 0, 1, isWrite);
         return iterator;
     }
 
-    public RowIterator iterator(int targetCount)
+    public RowIterator iterator(int targetCount, boolean isWrite)
     {
-        iterator.reset(Double.NaN, targetCount);
+        iterator.reset(Double.NaN, targetCount, 1, isWrite);
         return iterator;
     }
 
@@ -87,12 +98,12 @@ public class Partition
     {
         boolean done;
 
-        void reset(double useChance, int targetCount)
+        void reset(double useChance, int targetCount, int batches, boolean 
isWrite)
         {
             done = false;
         }
 
-        public Iterable<Row> batch(double ratio)
+        public Iterable<Row> next()
         {
             if (done)
                 return Collections.emptyList();
@@ -110,6 +121,12 @@ public class Partition
         {
             return done;
         }
+
+        public void markWriteFinished()
+        {
+            assert done;
+            generator.seeds.markFinished(seed);
+        }
     }
 
     public abstract class RowIterator
@@ -117,10 +134,10 @@ public class Partition
         // we reuse the row object to save garbage
         final Row row = new Row(partitionKey, new 
Object[generator.clusteringComponents.size() + 
generator.valueComponents.size()]);
 
-        public abstract Iterable<Row> batch(double ratio);
-        abstract void reset(double useChance, int targetCount);
-
+        public abstract Iterable<Row> next();
         public abstract boolean done();
+        public abstract void markWriteFinished();
+        abstract void reset(double useChance, int targetCount, int batches, 
boolean isWrite);
 
         public Partition partition()
         {
@@ -128,31 +145,40 @@ public class Partition
         }
     }
 
-    // permits iterating a random subset of the procedurally generated rows in 
this partition;  this is the only mechanism for visiting rows
+    // permits iterating a random subset of the procedurally generated rows in 
this partition. this is the only mechanism for visiting rows.
     // we maintain a stack of clustering components and their seeds; for each 
clustering component we visit, we generate all values it takes at that level,
     // and then, using the average (total) number of children it takes we 
randomly choose whether or not we visit its children;
-    // if we do, we generate all possible values the children can take, and 
repeat the process. So at any one time we are using space proportional
+    // if we do, we generate all possible values the immediate children can 
take, and repeat the process. So at any one time we are using space proportional
     // to C.N, where N is the average number of values each clustering 
component takes, as opposed to N^C total values in the partition.
+    // TODO : guarantee at least one row is always returned
+    // TODO : support first/last row, and constraining reads to rows we know 
are populated
     class MultiRowIterator extends RowIterator
     {
 
         // probability any single row will be generated in this iteration
         double useChance;
-        double expectedRowCount;
 
-        // the current seed in use at any given level; used to save 
recalculating it for each row, so we only need to recalc
-        // from prior row
+        // the seed used to generate the current values for the clustering 
components at each depth;
+        // used to save recalculating it for each row, so we only need to 
recalc from prior row.
         final long[] clusteringSeeds = new 
long[generator.clusteringComponents.size()];
         // the components remaining to be visited for each level of the 
current stack
-        final Queue<Object>[] clusteringComponents = new 
ArrayDeque[generator.clusteringComponents.size()];
+        final Deque<Object>[] clusteringComponents = new 
ArrayDeque[generator.clusteringComponents.size()];
 
         // we want our chance of selection to be applied uniformly, so we 
compound the roll we make at each level
         // so that we know with what chance we reached there, and we adjust 
our roll at that level by that amount
-        double[] chancemodifier = new 
double[generator.clusteringComponents.size()];
-        double[] rollmodifier = new 
double[generator.clusteringComponents.size()];
+        final double[] chancemodifier = new 
double[generator.clusteringComponents.size()];
+        final double[] rollmodifier = new 
double[generator.clusteringComponents.size()];
+
+        // track where in the partition we are, and where we are limited to
+        final int[] position = new int[generator.clusteringComponents.size()];
+        final int[] limit = new int[position.length];
+        int batchSize;
+        boolean returnedOne;
+        boolean forceReturnOne;
 
-        // reusable set for generating unique clustering components
+        // reusable collections for generating unique and sorted clustering 
components
         final Set<Object> unique = new HashSet<>();
+        final List<Comparable> tosort = new ArrayList<>();
         final Random random = new Random();
 
         MultiRowIterator()
@@ -163,126 +189,262 @@ public class Partition
             chancemodifier[0] = generator.clusteringChildAverages[0];
         }
 
-        void reset(double useChance, int targetCount)
+        // if we're a write, the expected behaviour is that the requested 
batch count is compounded with the seed's visit
+        // count to decide how much we should return in one iteration
+        void reset(double useChance, int targetCount, int batches, boolean 
isWrite)
         {
+            if (this.useChance < 1d)
+            {
+                // we clear our prior roll-modifiers if the use chance was 
previously less-than zero
+                Arrays.fill(rollmodifier, 1d);
+                Arrays.fill(chancemodifier, 1d);
+            }
+
+            // set the seed for the first clustering component
             generator.clusteringComponents.get(0).setSeed(idseed);
+            int[] position = seed.position;
+
+            // calculate how many first clustering components we'll generate, 
and how many total rows this predicts
             int firstComponentCount = (int) 
generator.clusteringComponents.get(0).clusteringDistribution.next();
-            this.expectedRowCount = firstComponentCount * 
generator.clusteringChildAverages[0];
+            int expectedRowCount;
+
+            if (!isWrite && position != null)
+            {
+                expectedRowCount = 0;
+                for (int i = 0 ; i < position.length ; i++)
+                {
+                    expectedRowCount += position[i] * 
generator.clusteringChildAverages[i];
+                    limit[i] = position[i];
+                }
+            }
+            else
+            {
+                expectedRowCount = firstComponentCount * 
generator.clusteringChildAverages[0];
+                if (isWrite)
+                    batches *= seed.visits;
+                Arrays.fill(limit, Integer.MAX_VALUE);
+            }
+
+            batchSize = Math.max(1, expectedRowCount / batches);
             if (Double.isNaN(useChance))
-                useChance = Math.max(0d, Math.min(1d, targetCount / 
expectedRowCount));
+                useChance = Math.max(0d, Math.min(1d, targetCount / (double) 
expectedRowCount));
 
+            // clear any remnants of the last iteration, wire up our 
constants, and fill in the first clustering components
+            this.useChance = useChance;
+            this.returnedOne = false;
             for (Queue<?> q : clusteringComponents)
                 q.clear();
-
-            this.useChance = useChance;
             clusteringSeeds[0] = idseed;
-            clusteringComponents[0].add(this);
             fill(clusteringComponents[0], firstComponentCount, 
generator.clusteringComponents.get(0));
-            advance(0, 1f);
+
+            // seek to our start position
+            seek(isWrite ? position : null);
         }
 
-        void fill(int component)
+        // generate the clustering components for the provided depth; requires 
preceding components
+        // to have been generated and their seeds populated into 
clusteringSeeds
+        void fill(int depth)
         {
-            long seed = clusteringSeeds[component - 1];
-            Generator gen = generator.clusteringComponents.get(component);
+            long seed = clusteringSeeds[depth - 1];
+            Generator gen = generator.clusteringComponents.get(depth);
             gen.setSeed(seed);
-            clusteringSeeds[component] = seed(clusteringComponents[component - 
1].peek(), generator.clusteringComponents.get(component - 1).type, seed);
-            fill(clusteringComponents[component], (int) 
gen.clusteringDistribution.next(), gen);
+            clusteringSeeds[depth] = seed(clusteringComponents[depth - 
1].peek(), generator.clusteringComponents.get(depth - 1).type, seed);
+            fill(clusteringComponents[depth], (int) 
gen.clusteringDistribution.next(), gen);
         }
 
+        // generate the clustering components into the queue
         void fill(Queue<Object> queue, int count, Generator generator)
         {
             if (count == 1)
             {
                 queue.add(generator.generate());
+                return;
             }
-            else
+
+            switch (Partition.this.generator.order)
             {
-                unique.clear();
-                for (int i = 0 ; i < count ; i++)
-                {
-                    Object next = generator.generate();
-                    if (unique.add(next))
-                        queue.add(next);
-                }
+                case SORTED:
+                    if (Comparable.class.isAssignableFrom(generator.clazz))
+                    {
+                        tosort.clear();
+                        for (int i = 0 ; i < count ; i++)
+                            tosort.add((Comparable) generator.generate());
+                        Collections.sort(tosort);
+                        for (int i = 0 ; i < count ; i++)
+                            if (i == 0 || tosort.get(i - 1).compareTo(i) < 0)
+                                queue.add(tosort.get(i));
+                        break;
+                    }
+                case ARBITRARY:
+                    unique.clear();
+                    for (int i = 0 ; i < count ; i++)
+                    {
+                        Object next = generator.generate();
+                        if (unique.add(next))
+                            queue.add(next);
+                    }
+                    break;
+                case SHUFFLED:
+                    unique.clear();
+                    tosort.clear();
+                    for (int i = 0 ; i < count ; i++)
+                    {
+                        Object next = generator.generate();
+                        if (unique.add(next))
+                            tosort.add(new RandomOrder(next));
+                    }
+                    Collections.sort(tosort);
+                    for (Object o : tosort)
+                        queue.add(((RandomOrder)o).value);
+                    break;
+                default:
+                    throw new IllegalStateException();
+            }
+        }
+
+        // seek to the provided position (or the first entry if null)
+        private void seek(int[] position)
+        {
+            if (position == null)
+            {
+                this.position[0] = -1;
+                clusteringComponents[0].addFirst(this);
+                advance(0);
+                return;
+            }
+
+            assert position.length == clusteringComponents.length;
+            for (int i = 0 ; i < position.length ; i++)
+            {
+                if (i != 0)
+                    fill(i);
+                for (int c = position[i] ; c > 0 ; c--)
+                    clusteringComponents[i].poll();
+                row.row[i] = clusteringComponents[i].peek();
             }
+            System.arraycopy(position, 0, this.position, 0, position.length);
         }
 
-        private boolean advance(double continueChance)
+        // normal method for moving the iterator forward; maintains the row 
object, and delegates to advance(int)
+        // to move the iterator to the next item
+        void advance()
         {
-            // we always start at the leaf level
+            // we are always at the leaf level when this method is invoked
+            // so we calculate the seed for generating the row by combining 
the seed that generated the clustering components
             int depth = clusteringComponents.length - 1;
-            // fill the row with the position we *were* at (unless pre-start)
+            long parentSeed = clusteringSeeds[depth];
+            long rowSeed = seed(clusteringComponents[depth].peek(), 
generator.clusteringComponents.get(depth).type, parentSeed);
+
+            // and then fill the row with the _non-clustering_ values for the 
position we _were_ at, as this is what we'll deliver
             for (int i = clusteringSeeds.length ; i < row.row.length ; i++)
             {
                 Generator gen = generator.valueComponents.get(i - 
clusteringSeeds.length);
-                long seed = clusteringSeeds[depth];
-                seed = seed(clusteringComponents[depth].peek(), 
generator.clusteringComponents.get(depth).type, seed);
-                gen.setSeed(seed);
+                gen.setSeed(rowSeed);
                 row.row[i] = gen.generate();
             }
-            clusteringComponents[depth].poll();
+            returnedOne = true;
+            forceReturnOne = false;
 
-            return advance(depth, continueChance);
+            // then we advance the leaf level
+            advance(depth);
         }
 
-        private boolean advance(int depth, double continueChance)
+        private void advance(int depth)
         {
             // advance the leaf component
             clusteringComponents[depth].poll();
+            position[depth]++;
             while (true)
             {
                 if (clusteringComponents[depth].isEmpty())
                 {
+                    // if we've run out of clustering components at this 
level, ascend
                     if (depth == 0)
-                        return false;
+                        return;
                     depth--;
                     clusteringComponents[depth].poll();
+                    position[depth]++;
                     continue;
                 }
 
-                // the chance of descending is the uniform use chance, 
multiplied by the number of children
+                if (depth == 0 && !returnedOne && 
clusteringComponents[0].size() == 1)
+                    forceReturnOne = true;
+
+                // the chance of descending is the uniform usechance, 
multiplied by the number of children
                 // we would on average generate (so if we have a 0.1 use 
chance, but should generate 10 children
                 // then we will always descend), multiplied by 1/(compound 
roll), where (compound roll) is the
                 // chance with which we reached this depth, i.e. if we already 
beat 50/50 odds, we double our
                 // chance of beating this next roll
                 double thischance = useChance * chancemodifier[depth];
-                if (thischance > 0.999f || thischance >= random.nextDouble())
+                if (forceReturnOne || thischance > 0.999f || thischance >= 
random.nextDouble())
                 {
+                    // if we're descending, we fill in our clustering 
component and increase our depth
                     row.row[depth] = clusteringComponents[depth].peek();
                     depth++;
                     if (depth == clusteringComponents.length)
                         break;
-                    rollmodifier[depth] = rollmodifier[depth - 1] / 
Math.min(1d, thischance);
-                    chancemodifier[depth] = 
generator.clusteringChildAverages[depth] * rollmodifier[depth];
+                    // if we haven't reached the leaf, we update our 
probability statistics, fill in all of
+                    // this level's clustering components, and repeat
+                    if (useChance < 1d)
+                    {
+                        rollmodifier[depth] = rollmodifier[depth - 1] / 
Math.min(1d, thischance);
+                        chancemodifier[depth] = 
generator.clusteringChildAverages[depth] * rollmodifier[depth];
+                    }
+                    position[depth] = 0;
                     fill(depth);
                     continue;
                 }
 
+                // if we don't descend, we remove the clustering suffix we've 
skipped and continue
                 clusteringComponents[depth].poll();
+                position[depth]++;
             }
-
-            return continueChance >= 1.0d || continueChance >= 
random.nextDouble();
         }
 
-        public Iterable<Row> batch(final double ratio)
+        public Iterable<Row> next()
         {
-            final double continueChance = 1d - (Math.pow(ratio, 
expectedRowCount * useChance));
+            final int[] limit = position.clone();
+            int remainingSize = batchSize;
+            for (int i = 0 ; i < limit.length && remainingSize > 0 ; i++)
+            {
+                limit[i] += remainingSize / 
generator.clusteringChildAverages[i];
+                remainingSize %= generator.clusteringChildAverages[i];
+            }
+            assert remainingSize == 0;
+            for (int i = limit.length - 1 ; i > 0 ; i--)
+            {
+                if (limit[i] > generator.clusteringChildAverages[i])
+                {
+                    limit[i - 1] += limit[i] / 
generator.clusteringChildAverages[i];
+                    limit[i] %= generator.clusteringChildAverages[i];
+                }
+            }
+            for (int i = 0 ; i < limit.length ; i++)
+            {
+                if (limit[i] < this.limit[i])
+                    break;
+                limit[i] = Math.min(limit[i], this.limit[i]);
+            }
             return new Iterable<Row>()
             {
                 public Iterator<Row> iterator()
                 {
                     return new Iterator<Row>()
                     {
-                        boolean hasNext = true;
+
                         public boolean hasNext()
                         {
-                            return hasNext;
+                            if (done())
+                                return false;
+                            for (int i = 0 ; i < position.length ; i++)
+                                if (position[i] < limit[i])
+                                    return true;
+                            return false;
                         }
 
                         public Row next()
                         {
-                            hasNext = advance(continueChance);
+                            advance();
                             return row;
                         }
 
@@ -300,26 +462,37 @@ public class Partition
             return clusteringComponents[0].isEmpty();
         }
 
+        public void markWriteFinished()
+        {
+            if (done())
+                generator.seeds.markFinished(seed);
+            else
+                generator.seeds.markVisited(seed, position.clone());
+        }
+
         public Partition partition()
         {
             return Partition.this;
         }
     }
 
-    public String getKeyAsString()
+    private static class RandomOrder implements Comparable<RandomOrder>
     {
-        StringBuilder sb = new StringBuilder();
-        int i = 0;
-        for (Object key : partitionKey)
+        final int order = ThreadLocalRandom.current().nextInt();
+        final Object value;
+        private RandomOrder(Object value)
         {
-            if (i > 0)
-                sb.append("|");
-            AbstractType type = generator.partitionKey.get(i++).type;
-            sb.append(type.getString(type.decompose(key)));
+            this.value = value;
+        }
+
+        public int compareTo(RandomOrder that)
+        {
+            return Integer.compare(this.order, that.order);
         }
-        return sb.toString();
     }
 
+    // calculate a new seed based on the combination of a parent seed and the 
generated child, to generate
+    // any children of this child
     static long seed(Object object, AbstractType type, long seed)
     {
         if (object instanceof ByteBuffer)
@@ -355,6 +528,20 @@ public class Partition
         return partitionKey[i];
     }
 
+    public String getKeyAsString()
+    {
+        StringBuilder sb = new StringBuilder();
+        int i = 0;
+        for (Object key : partitionKey)
+        {
+            if (i > 0)
+                sb.append("|");
+            AbstractType type = generator.partitionKey.get(i++).type;
+            sb.append(type.getString(type.decompose(key)));
+        }
+        return sb.toString();
+    }
+
     // used for thrift smart routing - if it's a multi-part key we don't try 
to route correctly right now
     public ByteBuffer getToken()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
index d05350d..128d2f5 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
@@ -30,18 +30,27 @@ import java.util.NoSuchElementException;
 
 import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.values.Generator;
 
 public class PartitionGenerator
 {
 
+    public static enum Order
+    {
+        ARBITRARY, SHUFFLED, SORTED
+    }
+
     public final double maxRowCount;
+    public final double minRowCount;
     final List<Generator> partitionKey;
     final List<Generator> clusteringComponents;
     final List<Generator> valueComponents;
     final int[] clusteringChildAverages;
 
     private final Map<String, Integer> indexMap;
+    final Order order;
+    final SeedManager seeds;
 
     final List<Partition> recyclable = new ArrayList<>();
     int partitionsInUse = 0;
@@ -51,18 +60,25 @@ public class PartitionGenerator
         partitionsInUse = 0;
     }
 
-    public PartitionGenerator(List<Generator> partitionKey, List<Generator> 
clusteringComponents, List<Generator> valueComponents)
+    public PartitionGenerator(List<Generator> partitionKey, List<Generator> 
clusteringComponents, List<Generator> valueComponents, Order order, SeedManager 
seeds)
     {
         this.partitionKey = partitionKey;
         this.clusteringComponents = clusteringComponents;
         this.valueComponents = valueComponents;
+        this.order = order;
+        this.seeds = seeds;
         this.clusteringChildAverages = new int[clusteringComponents.size()];
         for (int i = clusteringChildAverages.length - 1 ; i >= 0 ; i--)
             clusteringChildAverages[i] = (int) (i < 
(clusteringChildAverages.length - 1) ? clusteringComponents.get(i + 
1).clusteringDistribution.average() * clusteringChildAverages[i + 1] : 1);
         double maxRowCount = 1d;
+        double minRowCount = 1d;
         for (Generator component : clusteringComponents)
+        {
             maxRowCount *= component.clusteringDistribution.maxValue();
+            minRowCount *= component.clusteringDistribution.minValue();
+        }
         this.maxRowCount = maxRowCount;
+        this.minRowCount = minRowCount;
         this.indexMap = new HashMap<>();
         int i = 0;
         for (Generator generator : partitionKey)
@@ -72,6 +88,11 @@ public class PartitionGenerator
             indexMap.put(generator.name, i++);
     }
 
+    public boolean permitNulls(int index)
+    {
+        return !(index < 0 || index < clusteringComponents.size());
+    }
+
     public int indexOf(String name)
     {
         Integer i = indexMap.get(name);
@@ -80,11 +101,14 @@ public class PartitionGenerator
         return i;
     }
 
-    public Partition generate(long seed)
+    public Partition generate(Operation op)
     {
         if (recyclable.size() <= partitionsInUse || 
recyclable.get(partitionsInUse) == null)
             recyclable.add(new Partition(this));
 
+        Seed seed = seeds.next(op);
+        if (seed == null)
+            return null;
         Partition partition = recyclable.get(partitionsInUse++);
         partition.setSeed(seed);
         return partition;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/RatioDistribution.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/RatioDistribution.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/RatioDistribution.java
index 37ad4c5..c71945a 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/generate/RatioDistribution.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/generate/RatioDistribution.java
@@ -39,6 +39,11 @@ public class RatioDistribution
         return Math.max(0f, Math.min(1f, distribution.nextDouble() / divisor));
     }
 
+    public double min()
+    {
+        return Math.min(1d, distribution.minValue() / divisor);
+    }
+
     public double max()
     {
         return Math.min(1d, distribution.maxValue() / divisor);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
new file mode 100644
index 0000000..f427608
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
@@ -0,0 +1,67 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.stress.generate;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import org.apache.cassandra.stress.util.DynamicList;
+
+public class Seed implements Comparable<Seed>
+{
+
+    public final long seed;
+    final int visits;
+
+    DynamicList.Node poolNode;
+    volatile int[] position;
+    volatile State state = State.HELD;
+
+    private static final AtomicReferenceFieldUpdater<Seed, Seed.State> 
stateUpdater = AtomicReferenceFieldUpdater.newUpdater(Seed.class, State.class, 
"state");
+
+    public int compareTo(Seed that)
+    {
+        return Long.compare(this.seed, that.seed);
+    }
+
+    static enum State
+    {
+        HELD, AVAILABLE
+    }
+
+    Seed(long seed, int visits)
+    {
+        this.seed = seed;
+        this.visits = visits;
+    }
+
+    boolean take()
+    {
+        return stateUpdater.compareAndSet(this, State.AVAILABLE, State.HELD);
+    }
+
+    void yield()
+    {
+        state = State.AVAILABLE;
+    }
+
+    public int[] position()
+    {
+        return position;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/SeedGenerator.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/SeedGenerator.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/SeedGenerator.java
deleted file mode 100644
index d579223..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/generate/SeedGenerator.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.cassandra.stress.generate;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-public interface SeedGenerator
-{
-
-    long next(long workIndex);
-
-}

Reply via email to