Fix CRR, add pig test for it Patch by Mike Adamson, reviewed by brandonwilliams for CASSANDRA-7726
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7049ee0e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7049ee0e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7049ee0e Branch: refs/heads/cassandra-2.1.0 Commit: 7049ee0e2bdb37a0dc82fa849462ffd375a20e85 Parents: f7e8803 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Tue Aug 12 12:33:46 2014 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Tue Aug 12 12:34:14 2014 -0500 ---------------------------------------------------------------------- .../cassandra/hadoop/cql3/CqlRecordReader.java | 28 ++++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7049ee0e/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java index 74310cf..fa8dec9 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java @@ -89,7 +89,6 @@ public class CqlRecordReader extends RecordReader<Long, Row> private int pageRowSize; private List<String> partitionKeys = new ArrayList<>(); - private List<String> clusteringKeys = new ArrayList<>(); // partition keys -- key aliases private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap(); @@ -106,8 +105,8 @@ public class CqlRecordReader extends RecordReader<Long, Row> totalRowCount = (this.split.getLength() < Long.MAX_VALUE) ? (int) this.split.getLength() : ConfigHelper.getInputSplitSize(conf); - cfName = quote(ConfigHelper.getInputColumnFamily(conf)); - keyspace = quote(ConfigHelper.getInputKeyspace(conf)); + cfName = ConfigHelper.getInputColumnFamily(conf); + keyspace = ConfigHelper.getInputKeyspace(conf); partitioner = ConfigHelper.getInputPartitioner(conf); inputColumns = CqlConfigHelper.getInputcolumns(conf); userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf); @@ -161,6 +160,14 @@ public class CqlRecordReader extends RecordReader<Long, Row> // whereClauses // pageRowSize cqlQuery = CqlConfigHelper.getInputCql(conf); + // validate that the user hasn't tried to give us a custom query along with input columns + // and where clauses + if (StringUtils.isNotEmpty(cqlQuery) && (StringUtils.isNotEmpty(inputColumns) || + StringUtils.isNotEmpty(userDefinedWhereClauses))) + { + throw new AssertionError("Cannot define a custom query with input columns and / or where clauses"); + } + if (StringUtils.isEmpty(cqlQuery)) cqlQuery = buildQuery(); logger.debug("cqlQuery {}", cqlQuery); @@ -266,7 +273,7 @@ public class CqlRecordReader extends RecordReader<Long, Row> { AbstractType type = partitioner.getTokenValidator(); ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) ); - for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey()) + for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(quote(keyspace)).getTable(quote(cfName)).getPartitionKey()) partitionBoundColumns.put(meta.getName(), Boolean.TRUE); rows = rs.iterator(); } @@ -534,7 +541,8 @@ public class CqlRecordReader extends RecordReader<Long, Row> { fetchKeys(); - String selectColumnList = makeColumnList(getSelectColumns()); + List<String> columns = getSelectColumns(); + String selectColumnList = columns.size() == 0 ? "*" : makeColumnList(columns); String partitionKeyList = makeColumnList(partitionKeys); return String.format("SELECT %s FROM %s.%s WHERE token(%s)>? AND token(%s)<=?" + getAdditionalWhereClauses(), @@ -556,9 +564,7 @@ public class CqlRecordReader extends RecordReader<Long, Row> { List<String> selectColumns = new ArrayList<>(); - if (StringUtils.isEmpty(inputColumns)) - selectColumns.add("*"); - else + if (StringUtils.isNotEmpty(inputColumns)) { // We must select all the partition keys plus any other columns the user wants selectColumns.addAll(partitionKeys); @@ -605,16 +611,10 @@ public class CqlRecordReader extends RecordReader<Long, Row> int componentIndex = row.isNull(1) ? 0 : row.getInt(1); partitionKeyArray[componentIndex] = column; } - else if (type.equals("clustering_key")) - { - clusteringKeys.add(column); - } } partitionKeys.addAll(Arrays.asList(partitionKeyArray)); } - - private String quote(String identifier) { return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";