Repository: cassandra Updated Branches: refs/heads/trunk 6da5fb56c -> 2aeed037e
Support light-weight transactions in cassandra-stress patch by Jaydeepkumar Chovatia; reviewed by Dinesh Joshi and jasobrown for CASSANDRA-13529 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2aeed037 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2aeed037 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2aeed037 Branch: refs/heads/trunk Commit: 2aeed037e0f105e72366e15afa012257e910a25d Parents: 6da5fb5 Author: Jaydeepkumar Chovatia <chovatia.jayd...@gmail.com> Authored: Fri May 12 17:23:44 2017 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Wed Jun 13 05:46:20 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/source/tools/cassandra_stress.rst | 18 ++ doc/source/tools/stress-lwt-example.yaml | 70 ++++++ .../cql3/conditions/ColumnCondition.java | 7 +- .../cql3/statements/ModificationStatement.java | 11 + .../apache/cassandra/stress/StressProfile.java | 46 +++- .../stress/generate/PartitionGenerator.java | 10 + .../stress/operations/PartitionOperation.java | 13 +- .../stress/operations/userdefined/CASQuery.java | 227 +++++++++++++++++++ .../operations/userdefined/SchemaQuery.java | 11 +- .../operations/userdefined/SchemaStatement.java | 6 + 11 files changed, 403 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6819711..629df0c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Support light-weight transactions in cassandra-stress (CASSANDRA-13529) * Make AsyncOneResponse use the correct timeout (CASSANDRA-14509) * Add option to sanity check tombstones on reads/compactions (CASSANDRA-14467) * Add a virtual table to expose all running sstable tasks (CASSANDRA-14457) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/doc/source/tools/cassandra_stress.rst ---------------------------------------------------------------------- diff --git a/doc/source/tools/cassandra_stress.rst b/doc/source/tools/cassandra_stress.rst index 322a981..bcac54e 100644 --- a/doc/source/tools/cassandra_stress.rst +++ b/doc/source/tools/cassandra_stress.rst @@ -220,6 +220,24 @@ Running a user mode test with multiple yaml files:: This will run operations as specified in both the example.yaml and example2.yaml files. example.yaml and example2.yaml can reference the same table although care must be taken that the table definition is identical (data generation specs can be different). +Lightweight transaction support ++++++++++++++++++++++++++++++++ + +cassandra-stress supports lightweight transactions. In this it will first read current data from Cassandra and then uses read value(s) +to fulfill lightweight transaction condition(s). + +Lightweight transaction update query:: + + queries: + regularupdate: + cql: update blogposts set author = ? where domain = ? and published_date = ? + fields: samerow + updatewithlwt: + cql: update blogposts set author = ? where domain = ? and published_date = ? IF body = ? AND url = ? + fields: samerow + +The full example can be found here :download:`yaml <./stress-lwt-example.yaml>` + Graphing ^^^^^^^^ http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/doc/source/tools/stress-lwt-example.yaml ---------------------------------------------------------------------- diff --git a/doc/source/tools/stress-lwt-example.yaml b/doc/source/tools/stress-lwt-example.yaml new file mode 100644 index 0000000..fc5db08 --- /dev/null +++ b/doc/source/tools/stress-lwt-example.yaml @@ -0,0 +1,70 @@ +# Keyspace Name +keyspace: stresscql + +# The CQL for creating a keyspace (optional if it already exists) +# Would almost always be network topology unless running something locall +keyspace_definition: | + CREATE KEYSPACE stresscql WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; + +# Table name +table: blogposts + +# The CQL for creating a table you wish to stress (optional if it already exists) +table_definition: | + CREATE TABLE blogposts ( + domain text, + published_date timeuuid, + url text, + author text, + title text, + body text, + PRIMARY KEY(domain, published_date) + ) WITH CLUSTERING ORDER BY (published_date DESC) + AND compaction = { 'class':'LeveledCompactionStrategy' } + AND comment='A table to hold blog posts' + +### Column Distribution Specifications ### + +columnspec: + - name: domain + size: gaussian(5..100) #domain names are relatively short + population: uniform(1..10M) #10M possible domains to pick from + + - name: published_date + cluster: fixed(1000) #under each domain we will have max 1000 posts + + - name: url + size: uniform(30..300) + + - name: title #titles shouldn't go beyond 200 chars + size: gaussian(10..200) + + - name: author + size: uniform(5..20) #author names should be short + + - name: body + size: gaussian(100..5000) #the body of the blog post can be long + +### Batch Ratio Distribution Specifications ### + +insert: + partitions: fixed(1) # Our partition key is the domain so only insert one per batch + + select: fixed(1)/1000 # We have 1000 posts per domain so 1/1000 will allow 1 post per batch + + batchtype: UNLOGGED # Unlogged batches + + +# +# A list of queries you wish to run against the schema +# +queries: + singlepost: + cql: select * from blogposts where domain = ? LIMIT 1 + fields: samerow + regularupdate: + cql: update blogposts set author = ? where domain = ? and published_date = ? + fields: samerow + updatewithlwt: + cql: update blogposts set author = ? where domain = ? and published_date = ? IF body = ? AND url = ? + fields: samerow http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java b/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java index c9d7fe8..aa5c10d 100644 --- a/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java +++ b/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java @@ -849,7 +849,12 @@ public abstract class ColumnCondition throw invalidRequest("Slice conditions ( %s ) are not supported on durations", operator); } } - + + public Term.Raw getValue() + { + return value; + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index e02fd41..65e1e2d 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -20,6 +20,7 @@ package org.apache.cassandra.cql3.statements; import java.nio.ByteBuffer; import java.util.*; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -936,5 +937,15 @@ public abstract class ModificationStatement implements CQLStatement { return rawId.prepare(metadata); } + + public List<Pair<ColumnMetadata.Raw, ColumnCondition.Raw>> getConditions() + { + ImmutableList.Builder<Pair<ColumnMetadata.Raw, ColumnCondition.Raw>> builder = ImmutableList.builderWithExpectedSize(conditions.size()); + + for (Pair<Raw, ColumnCondition.Raw> condition : conditions) + builder.add(Pair.create(condition.left, condition.right)); + + return builder.build(); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/tools/stress/src/org/apache/cassandra/stress/StressProfile.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java index 2338873..cda9c58 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java @@ -42,14 +42,17 @@ import org.apache.cassandra.cql3.CQLFragmentParser; import org.apache.cassandra.cql3.CqlParser; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.cql3.statements.ModificationStatement; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.stress.generate.*; import org.apache.cassandra.stress.generate.values.*; -import org.apache.cassandra.stress.operations.userdefined.TokenRangeQuery; +import org.apache.cassandra.stress.operations.userdefined.CASQuery; import org.apache.cassandra.stress.operations.userdefined.SchemaInsert; import org.apache.cassandra.stress.operations.userdefined.SchemaQuery; +import org.apache.cassandra.stress.operations.userdefined.SchemaStatement; +import org.apache.cassandra.stress.operations.userdefined.TokenRangeQuery; import org.apache.cassandra.stress.operations.userdefined.ValidatingSchemaQuery; import org.apache.cassandra.stress.report.Timer; import org.apache.cassandra.stress.settings.*; @@ -87,7 +90,7 @@ public class StressProfile implements Serializable transient volatile PreparedStatement insertStatement; transient volatile List<ValidatingSchemaQuery.Factory> validationFactories; - transient volatile Map<String, SchemaQuery.ArgSelect> argSelects; + transient volatile Map<String, SchemaStatement.ArgSelect> argSelects; transient volatile Map<String, PreparedStatement> queryStatements; private static final Pattern lowercaseAlphanumeric = Pattern.compile("[a-z0-9_]+"); @@ -367,13 +370,13 @@ public class StressProfile implements Serializable JavaDriverClient jclient = settings.getJavaDriverClient(keyspaceName); Map<String, PreparedStatement> stmts = new HashMap<>(); - Map<String, SchemaQuery.ArgSelect> args = new HashMap<>(); + Map<String, SchemaStatement.ArgSelect> args = new HashMap<>(); for (Map.Entry<String, StressYaml.QueryDef> e : queries.entrySet()) { stmts.put(e.getKey().toLowerCase(), jclient.prepare(e.getValue().cql)); args.put(e.getKey().toLowerCase(), e.getValue().fields == null - ? SchemaQuery.ArgSelect.MULTIROW - : SchemaQuery.ArgSelect.valueOf(e.getValue().fields.toUpperCase())); + ? SchemaStatement.ArgSelect.MULTIROW + : SchemaStatement.ArgSelect.valueOf(e.getValue().fields.toUpperCase())); } queryStatements = stmts; argSelects = args; @@ -381,9 +384,42 @@ public class StressProfile implements Serializable } } + if (dynamicConditionExists(queryStatements.get(name))) + return new CASQuery(timer, settings, generator, seeds, queryStatements.get(name), settings.command.consistencyLevel, argSelects.get(name), tableName); + return new SchemaQuery(timer, settings, generator, seeds, queryStatements.get(name), settings.command.consistencyLevel, argSelects.get(name)); } + static boolean dynamicConditionExists(PreparedStatement statement) throws IllegalArgumentException + { + if (statement == null) + return false; + + if (!statement.getQueryString().toUpperCase().startsWith("UPDATE")) + return false; + + ModificationStatement.Parsed modificationStatement; + try + { + modificationStatement = CQLFragmentParser.parseAnyUnhandled(CqlParser::updateStatement, + statement.getQueryString()); + } + catch (RecognitionException e) + { + throw new IllegalArgumentException("could not parse update query:" + statement.getQueryString(), e); + } + + /* + * here we differentiate between static vs dynamic conditions: + * - static condition example: if col1 = NULL + * - dynamic condition example: if col1 = ? + * for static condition we don't have to replace value, no extra work involved. + * for dynamic condition we have to read existing db value and then + * use current db values during the update. + */ + return modificationStatement.getConditions().stream().anyMatch(condition -> condition.right.getValue().getText().equals("?")); + } + public Operation getBulkReadQueries(String name, Timer timer, StressSettings settings, TokenRangeIterator tokenRangeIterator, boolean isWarmup) { StressYaml.TokenRangeQueryDef def = tokenRangeQueries.get(name); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java index 1230065..882b8b4 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java @@ -83,6 +83,16 @@ public class PartitionGenerator return !(index < 0 || index < clusteringComponents.size()); } + public List<Generator> getPartitionKey() + { + return Collections.unmodifiableList(partitionKey); + } + + public List<Generator> getClusteringComponents() + { + return Collections.unmodifiableList(clusteringComponents); + } + public int indexOf(String name) { Integer i = indexMap.get(name); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java index bad0a94..55c6872 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java @@ -19,6 +19,7 @@ package org.apache.cassandra.stress.operations; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.cassandra.stress.Operation; @@ -75,6 +76,16 @@ public abstract class PartitionOperation extends Operation this.spec = spec; } + public DataSpec getDataSpecification() + { + return spec; + } + + public List<PartitionIterator> getPartitions() + { + return Collections.unmodifiableList(partitions); + } + public int ready(WorkManager permits) { int partitionCount = (int) spec.partitionCount.next(); @@ -86,7 +97,7 @@ public abstract class PartitionOperation extends Operation int i = 0; boolean success = true; - for (; i < partitionCount && success ; i++) + for (; i < partitionCount && success; i++) { if (i >= partitionCache.size()) partitionCache.add(PartitionIterator.get(spec.partitionGenerator, spec.seedManager)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/CASQuery.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/CASQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/CASQuery.java new file mode 100644 index 0000000..e7d0fe3 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/CASQuery.java @@ -0,0 +1,227 @@ +package org.apache.cassandra.stress.operations.userdefined; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.LocalDate; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import org.antlr.runtime.RecognitionException; +import org.apache.cassandra.cql3.CQLFragmentParser; +import org.apache.cassandra.cql3.CqlParser; +import org.apache.cassandra.cql3.conditions.ColumnCondition; +import org.apache.cassandra.cql3.statements.ModificationStatement; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.stress.generate.DistributionFixed; +import org.apache.cassandra.stress.generate.PartitionGenerator; +import org.apache.cassandra.stress.generate.Row; +import org.apache.cassandra.stress.generate.SeedManager; +import org.apache.cassandra.stress.generate.values.Generator; +import org.apache.cassandra.stress.report.Timer; +import org.apache.cassandra.stress.settings.StressSettings; +import org.apache.cassandra.stress.util.JavaDriverClient; +import org.apache.cassandra.utils.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class CASQuery extends SchemaStatement +{ + private final ImmutableList<Integer> keysIndex; + private final ImmutableMap<Integer, Integer> casConditionArgFreqMap; + private final String readQuery; + + private PreparedStatement casReadConditionStatement; + + public CASQuery(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, PreparedStatement statement, ConsistencyLevel cl, ArgSelect argSelect, final String tableName) + { + super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), settings.insert.rowPopulationRatio.get(), argSelect == SchemaStatement.ArgSelect.MULTIROW ? statement.getVariables().size() : 1), statement, + statement.getVariables().asList().stream().map(ColumnDefinitions.Definition::getName).collect(Collectors.toList()), cl); + + if (argSelect != SchemaStatement.ArgSelect.SAMEROW) + throw new IllegalArgumentException("CAS is supported only for type 'samerow'"); + + ModificationStatement.Parsed modificationStatement; + try + { + modificationStatement = CQLFragmentParser.parseAnyUnhandled(CqlParser::updateStatement, + statement.getQueryString()); + } + catch (RecognitionException e) + { + throw new IllegalArgumentException("could not parse update query:" + statement.getQueryString(), e); + } + + final List<Pair<ColumnMetadata.Raw, ColumnCondition.Raw>> casConditionList = modificationStatement.getConditions(); + List<Integer> casConditionIndex = new ArrayList<>(); + + boolean first = true; + StringBuilder casReadConditionQuery = new StringBuilder(); + casReadConditionQuery.append("SELECT "); + for (final Pair<ColumnMetadata.Raw, ColumnCondition.Raw> condition : casConditionList) + { + if (!condition.right.getValue().getText().equals("?")) + { + //condition uses static value, ignore it + continue; + } + if (!first) + { + casReadConditionQuery.append(", "); + } + casReadConditionQuery.append(condition.left.rawText()); + casConditionIndex.add(getDataSpecification().partitionGenerator.indexOf(condition.left.rawText())); + first = false; + } + casReadConditionQuery.append(" FROM ").append(tableName).append(" WHERE "); + + first = true; + ImmutableList.Builder<Integer> keysBuilder = ImmutableList.builder(); + for (final Generator key : getDataSpecification().partitionGenerator.getPartitionKey()) + { + if (!first) + { + casReadConditionQuery.append(" AND "); + } + casReadConditionQuery.append(key.name).append(" = ? "); + keysBuilder.add(getDataSpecification().partitionGenerator.indexOf(key.name)); + first = false; + } + for (final Generator clusteringKey : getDataSpecification().partitionGenerator.getClusteringComponents()) + { + casReadConditionQuery.append(" AND ").append(clusteringKey.name).append(" = ? "); + keysBuilder.add(getDataSpecification().partitionGenerator.indexOf(clusteringKey.name)); + } + keysIndex = keysBuilder.build(); + readQuery = casReadConditionQuery.toString(); + + ImmutableMap.Builder<Integer, Integer> builder = ImmutableMap.builderWithExpectedSize(casConditionIndex.size()); + for (final Integer oneConditionIndex : casConditionIndex) + { + builder.put(oneConditionIndex, Math.toIntExact(Arrays.stream(argumentIndex).filter((x) -> x == oneConditionIndex).count())); + } + casConditionArgFreqMap = builder.build(); + } + + private class JavaDriverRun extends Runner + { + final JavaDriverClient client; + + private JavaDriverRun(JavaDriverClient client) + { + this.client = client; + casReadConditionStatement = client.prepare(readQuery); + } + + public boolean run() + { + ResultSet rs = client.getSession().execute(bind(client)); + rowCount = rs.all().size(); + partitionCount = Math.min(1, rowCount); + return true; + } + } + + @Override + public void run(JavaDriverClient client) throws IOException + { + timeWithRetry(new JavaDriverRun(client)); + } + + private BoundStatement bind(JavaDriverClient client) + { + final Object keys[] = new Object[keysIndex.size()]; + final Row row = getPartitions().get(0).next(); + + for (int i = 0; i < keysIndex.size(); i++) + { + keys[i] = row.get(keysIndex.get(i)); + } + + //get current db values for all the coluns which are part of dynamic conditions + ResultSet rs = client.getSession().execute(casReadConditionStatement.bind(keys)); + final Object casDbValues[] = new Object[casConditionArgFreqMap.size()]; + + final com.datastax.driver.core.Row casDbValue = rs.one(); + if (casDbValue != null) + { + for (int i = 0; i < casConditionArgFreqMap.size(); i++) + { + casDbValues[i] = casDbValue.getObject(i); + } + } + //now bind db values for dynamic conditions in actual CAS update operation + return prepare(row, casDbValues); + } + + private BoundStatement prepare(final Row row, final Object[] casDbValues) + { + final Map<Integer, Integer> localMapping = new HashMap<>(casConditionArgFreqMap); + int conditionIndexTracker = 0; + for (int i = 0; i < argumentIndex.length; i++) + { + boolean replace = false; + Integer count = localMapping.get(argumentIndex[i]); + if (count != null) + { + count--; + localMapping.put(argumentIndex[i], count); + if (count == 0) + { + replace = true; + } + } + + if (replace) + { + bindBuffer[i] = casDbValues[conditionIndexTracker++]; + } + else + { + Object value = row.get(argumentIndex[i]); + if (definitions.getType(i).getName() == DataType.date().getName()) + { + // the java driver only accepts com.datastax.driver.core.LocalDate for CQL type "DATE" + value = LocalDate.fromDaysSinceEpoch((Integer) value); + } + + bindBuffer[i] = value; + } + + if (bindBuffer[i] == null && !getDataSpecification().partitionGenerator.permitNulls(argumentIndex[i])) + { + throw new IllegalStateException(); + } + } + return statement.bind(bindBuffer); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java index f0b332c..cba9ce4 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java @@ -22,9 +22,6 @@ package org.apache.cassandra.stress.operations.userdefined; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; import java.util.Random; import java.util.stream.Collectors; @@ -39,19 +36,13 @@ import org.apache.cassandra.stress.util.JavaDriverClient; public class SchemaQuery extends SchemaStatement { - public static enum ArgSelect - { - MULTIROW, SAMEROW; - //TODO: FIRSTROW, LASTROW - } - final ArgSelect argSelect; final Object[][] randomBuffer; final Random random = new Random(); public SchemaQuery(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, PreparedStatement statement, ConsistencyLevel cl, ArgSelect argSelect) { - super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), settings.insert.rowPopulationRatio.get(), argSelect == ArgSelect.MULTIROW ? statement.getVariables().size() : 1), statement, + super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), settings.insert.rowPopulationRatio.get(), argSelect == SchemaStatement.ArgSelect.MULTIROW ? statement.getVariables().size() : 1), statement, statement.getVariables().asList().stream().map(d -> d.getName()).collect(Collectors.toList()), cl); this.argSelect = argSelect; randomBuffer = new Object[argumentIndex.length][argumentIndex.length]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java index 6bd3fd5..334e6c5 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java @@ -38,6 +38,12 @@ import org.apache.cassandra.stress.util.JavaDriverClient; public abstract class SchemaStatement extends PartitionOperation { + public enum ArgSelect + { + MULTIROW, SAMEROW; + //TODO: FIRSTROW, LASTROW + } + final PreparedStatement statement; final ConsistencyLevel cl; final int[] argumentIndex; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org