Remove CPRR/CPIF. Patch by brandonwilliams for CASSANDRA-7570
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7fa93a2c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7fa93a2c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7fa93a2c Branch: refs/heads/trunk Commit: 7fa93a2ca7febbff593aafef0265daa8799a9fb3 Parents: f83909e Author: Brandon Williams <brandonwilli...@apache.org> Authored: Wed Aug 6 09:21:14 2014 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Wed Aug 6 09:21:14 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 5 +- .../hadoop/cql3/CqlPagingInputFormat.java | 85 -- .../hadoop/cql3/CqlPagingRecordReader.java | 800 ------------------- 4 files changed, 5 insertions(+), 886 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7fa93a2c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index dcc5bf8..49cb6a1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.10 + * Remove CqlPagingRecordReader/CqlPagingInputFormat (CASSANDRA-7570) * Fix IncompatibleClassChangeError from hadoop2 (CASSANDRA-7229) * Add 'nodetool sethintedhandoffthrottlekb' (CASSANDRA-7635) * Update java driver (for hadoop) (CASSANDRA-7618) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7fa93a2c/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 93fe0b1..0491384 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -17,7 +17,10 @@ using the provided 'sstableupgrade' tool. ==== New features ------------ - - If you are using Leveled Compaction, you can now disable doing size-tiered + - CqlPaginRecordReader and CqlPagingInputFormat have both been removed. + Use CqlInputFormat instead. + - If you are using Leveled Compaction, you can now disable doing + size-tiered compaction in L0 by starting Cassandra with -Dcassandra.disable_stcs_in_l0 (see CASSANDRA-6621 for details). - Shuffle and taketoken have been removed. For clusters that choose to http://git-wip-us.apache.org/repos/asf/cassandra/blob/7fa93a2c/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java deleted file mode 100644 index 96f2f94..0000000 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop.cql3; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Map; - -import org.apache.cassandra.hadoop.HadoopCompat; -import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat; -import org.apache.cassandra.hadoop.ReporterWrapper; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; - -/** - * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily. - * - * At minimum, you need to set the KS and CF in your Hadoop job Configuration. - * The ConfigHelper class is provided to make this - * simple: - * ConfigHelper.setInputColumnFamily - * - * You can also configure the number of rows per InputSplit with - * ConfigHelper.setInputSplitSize. The default split size is 64k rows. - * the number of CQL rows per page - * - * the number of CQL rows per page - * CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You - * should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL - * query, so you need set it big enough to minimize the network overhead, and also - * not too big to avoid out of memory issue. - * - * the column names of the select CQL query. The default is all columns - * CQLConfigHelper.setInputColumns - * - * the user defined the where clause - * CQLConfigHelper.setInputWhereClauses. The default is no user defined where clause - */ -public class CqlPagingInputFormat extends AbstractColumnFamilyInputFormat<Map<String, ByteBuffer>, Map<String, ByteBuffer>> -{ - public RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter) - throws IOException - { - TaskAttemptContext tac = HadoopCompat.newMapContext( - jobConf, - TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)), - null, - null, - null, - new ReporterWrapper(reporter), - null); - - CqlPagingRecordReader recordReader = new CqlPagingRecordReader(); - recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac); - return recordReader; - } - - @Override - public org.apache.hadoop.mapreduce.RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> createRecordReader( - org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1) throws IOException, - InterruptedException - { - return new CqlPagingRecordReader(); - } - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7fa93a2c/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java deleted file mode 100644 index 03d9ae9..0000000 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java +++ /dev/null @@ -1,800 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop.cql3; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; -import java.util.*; - -import com.google.common.base.Optional; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Iterables; - -import org.apache.cassandra.hadoop.HadoopCompat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.CompositeType; -import org.apache.cassandra.db.marshal.LongType; -import org.apache.cassandra.db.marshal.ReversedType; -import org.apache.cassandra.db.marshal.TypeParser; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.exceptions.SyntaxException; -import org.apache.cassandra.hadoop.ColumnFamilySplit; -import org.apache.cassandra.hadoop.ConfigHelper; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransport; - -/** - * Hadoop RecordReader read the values return from the CQL query - * It use CQL key range query to page through the wide rows. - * <p/> - * Return List<IColumn> as keys columns - * <p/> - * Map<ByteBuffer, IColumn> as column name to columns mappings - */ -public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> - implements org.apache.hadoop.mapred.RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> -{ - private static final Logger logger = LoggerFactory.getLogger(CqlPagingRecordReader.class); - - public static final int DEFAULT_CQL_PAGE_LIMIT = 1000; // TODO: find the number large enough but not OOM - - private ColumnFamilySplit split; - private RowIterator rowIterator; - - private Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>> currentRow; - private int totalRowCount; // total number of rows to fetch - private String keyspace; - private String cfName; - private Cassandra.Client client; - private ConsistencyLevel consistencyLevel; - - // partition keys -- key aliases - private List<BoundColumn> partitionBoundColumns = new ArrayList<BoundColumn>(); - - // cluster keys -- column aliases - private List<BoundColumn> clusterColumns = new ArrayList<BoundColumn>(); - - // map prepared query type to item id - private Map<Integer, Integer> preparedQueryIds = new HashMap<Integer, Integer>(); - - // cql query select columns - private String columns; - - // the number of cql rows per page - private int pageRowSize; - - // user defined where clauses - private String userDefinedWhereClauses; - - private IPartitioner partitioner; - - private AbstractType<?> keyValidator; - - public CqlPagingRecordReader() - { - super(); - } - - public void initialize(InputSplit split, TaskAttemptContext context) throws IOException - { - this.split = (ColumnFamilySplit) split; - Configuration conf = HadoopCompat.getConfiguration(context); - totalRowCount = (this.split.getLength() < Long.MAX_VALUE) - ? (int) this.split.getLength() - : ConfigHelper.getInputSplitSize(conf); - cfName = ConfigHelper.getInputColumnFamily(conf); - consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf)); - keyspace = ConfigHelper.getInputKeyspace(conf); - columns = CqlConfigHelper.getInputcolumns(conf); - userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf); - - Optional<Integer> pageRowSizeOptional = CqlConfigHelper.getInputPageRowSize(conf); - try - { - pageRowSize = pageRowSizeOptional.isPresent() ? pageRowSizeOptional.get() : DEFAULT_CQL_PAGE_LIMIT; - } - catch(NumberFormatException e) - { - pageRowSize = DEFAULT_CQL_PAGE_LIMIT; - } - - partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context)); - - try - { - if (client != null) - return; - - // create connection using thrift - String[] locations = split.getLocations(); - Exception lastException = null; - for (String location : locations) - { - int port = ConfigHelper.getInputRpcPort(conf); - try - { - client = CqlPagingInputFormat.createAuthenticatedClient(location, port, conf); - break; - } - catch (Exception e) - { - lastException = e; - logger.warn("Failed to create authenticated client to {}:{}", location , port); - } - } - if (client == null && lastException != null) - throw lastException; - - // retrieve partition keys and cluster keys from system.schema_columnfamilies table - retrieveKeys(); - - client.set_keyspace(keyspace); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - rowIterator = new RowIterator(); - - logger.debug("created {}", rowIterator); - } - - public void close() - { - if (client != null) - { - TTransport transport = client.getOutputProtocol().getTransport(); - if (transport.isOpen()) - transport.close(); - client = null; - } - } - - public Map<String, ByteBuffer> getCurrentKey() - { - return currentRow.left; - } - - public Map<String, ByteBuffer> getCurrentValue() - { - return currentRow.right; - } - - public float getProgress() - { - if (!rowIterator.hasNext()) - return 1.0F; - - // the progress is likely to be reported slightly off the actual but close enough - float progress = ((float) rowIterator.totalRead / totalRowCount); - return progress > 1.0F ? 1.0F : progress; - } - - public boolean nextKeyValue() throws IOException - { - if (!rowIterator.hasNext()) - { - logger.debug("Finished scanning {} rows (estimate was: {})", rowIterator.totalRead, totalRowCount); - return false; - } - - try - { - currentRow = rowIterator.next(); - } - catch (Exception e) - { - // throw it as IOException, so client can catch it and handle it at client side - IOException ioe = new IOException(e.getMessage()); - ioe.initCause(ioe.getCause()); - throw ioe; - } - return true; - } - - // we don't use endpointsnitch since we are trying to support hadoop nodes that are - // not necessarily on Cassandra machines, too. This should be adequate for single-DC clusters, at least. - private String[] getLocations() - { - Collection<InetAddress> localAddresses = FBUtilities.getAllLocalAddresses(); - - for (InetAddress address : localAddresses) - { - for (String location : split.getLocations()) - { - InetAddress locationAddress; - try - { - locationAddress = InetAddress.getByName(location); - } - catch (UnknownHostException e) - { - throw new AssertionError(e); - } - if (address.equals(locationAddress)) - { - return new String[] { location }; - } - } - } - return split.getLocations(); - } - - // Because the old Hadoop API wants us to write to the key and value - // and the new asks for them, we need to copy the output of the new API - // to the old. Thus, expect a small performance hit. - // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat - // and ColumnFamilyRecordReader don't support them, it should be fine for now. - public boolean next(Map<String, ByteBuffer> keys, Map<String, ByteBuffer> value) throws IOException - { - if (nextKeyValue()) - { - value.clear(); - value.putAll(getCurrentValue()); - - keys.clear(); - keys.putAll(getCurrentKey()); - - return true; - } - return false; - } - - public long getPos() throws IOException - { - return (long) rowIterator.totalRead; - } - - public Map<String, ByteBuffer> createKey() - { - return new LinkedHashMap<String, ByteBuffer>(); - } - - public Map<String, ByteBuffer> createValue() - { - return new LinkedHashMap<String, ByteBuffer>(); - } - - /** CQL row iterator */ - private class RowIterator extends AbstractIterator<Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>>> - { - protected int totalRead = 0; // total number of cf rows read - protected Iterator<CqlRow> rows; - private int pageRows = 0; // the number of cql rows read of this page - private String previousRowKey = null; // previous CF row key - private String partitionKeyString; // keys in <key1>, <key2>, <key3> string format - private String partitionKeyMarkers; // question marks in ? , ? , ? format which matches the number of keys - - public RowIterator() - { - // initial page - executeQuery(); - } - - protected Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>> computeNext() - { - if (rows == null) - return endOfData(); - - int index = -2; - //check there are more page to read - while (!rows.hasNext()) - { - // no more data - if (index == -1 || emptyPartitionKeyValues()) - { - logger.debug("no more data"); - return endOfData(); - } - - index = setTailNull(clusterColumns); - logger.debug("set tail to null, index: {}", index); - executeQuery(); - pageRows = 0; - - if (rows == null || !rows.hasNext() && index < 0) - { - logger.debug("no more data"); - return endOfData(); - } - } - - Map<String, ByteBuffer> valueColumns = createValue(); - Map<String, ByteBuffer> keyColumns = createKey(); - int i = 0; - CqlRow row = rows.next(); - for (Column column : row.columns) - { - String columnName = stringValue(ByteBuffer.wrap(column.getName())); - logger.debug("column: {}", columnName); - - if (i < partitionBoundColumns.size() + clusterColumns.size()) - keyColumns.put(stringValue(column.name), column.value); - else - valueColumns.put(stringValue(column.name), column.value); - - i++; - } - - // increase total CQL row read for this page - pageRows++; - - // increase total CF row read - if (newRow(keyColumns, previousRowKey)) - totalRead++; - - // read full page - if (pageRows >= pageRowSize || !rows.hasNext()) - { - Iterator<String> newKeys = keyColumns.keySet().iterator(); - for (BoundColumn column : partitionBoundColumns) - column.value = keyColumns.get(newKeys.next()); - - for (BoundColumn column : clusterColumns) - column.value = keyColumns.get(newKeys.next()); - - executeQuery(); - pageRows = 0; - } - - return Pair.create(keyColumns, valueColumns); - } - - /** check whether start to read a new CF row by comparing the partition keys */ - private boolean newRow(Map<String, ByteBuffer> keyColumns, String previousRowKey) - { - if (keyColumns.isEmpty()) - return false; - - String rowKey = ""; - if (keyColumns.size() == 1) - { - rowKey = partitionBoundColumns.get(0).validator.getString(keyColumns.get(partitionBoundColumns.get(0).name)); - } - else - { - Iterator<ByteBuffer> iter = keyColumns.values().iterator(); - for (BoundColumn column : partitionBoundColumns) - rowKey = rowKey + column.validator.getString(ByteBufferUtil.clone(iter.next())) + ":"; - } - - logger.debug("previous RowKey: {}, new row key: {}", previousRowKey, rowKey); - if (previousRowKey == null) - { - this.previousRowKey = rowKey; - return true; - } - - if (rowKey.equals(previousRowKey)) - return false; - - this.previousRowKey = rowKey; - return true; - } - - /** set the last non-null key value to null, and return the previous index */ - private int setTailNull(List<BoundColumn> values) - { - if (values.isEmpty()) - return -1; - - Iterator<BoundColumn> iterator = values.iterator(); - int previousIndex = -1; - BoundColumn current; - while (iterator.hasNext()) - { - current = iterator.next(); - if (current.value == null) - { - int index = previousIndex > 0 ? previousIndex : 0; - BoundColumn column = values.get(index); - logger.debug("set key {} value to null", column.name); - column.value = null; - return previousIndex - 1; - } - - previousIndex++; - } - - BoundColumn column = values.get(previousIndex); - logger.debug("set key {} value to null", column.name); - column.value = null; - return previousIndex - 1; - } - - /** serialize the prepared query, pair.left is query id, pair.right is query */ - private Pair<Integer, String> composeQuery(String columns) - { - Pair<Integer, String> clause = whereClause(); - if (columns == null) - { - columns = "*"; - } - else - { - // add keys in the front in order - String partitionKey = keyString(partitionBoundColumns); - String clusterKey = keyString(clusterColumns); - - columns = withoutKeyColumns(columns); - columns = (clusterKey == null || "".equals(clusterKey)) - ? partitionKey + (columns != null ? ("," + columns) : "") - : partitionKey + "," + clusterKey + (columns != null ? ("," + columns) : ""); - } - - String whereStr = userDefinedWhereClauses == null ? "" : " AND " + userDefinedWhereClauses; - return Pair.create(clause.left, - String.format("SELECT %s FROM %s%s%s LIMIT %d ALLOW FILTERING", - columns, quote(cfName), clause.right, whereStr, pageRowSize)); - } - - - /** remove key columns from the column string */ - private String withoutKeyColumns(String columnString) - { - Set<String> keyNames = new HashSet<String>(); - for (BoundColumn column : Iterables.concat(partitionBoundColumns, clusterColumns)) - keyNames.add(column.name); - - String[] columns = columnString.split(","); - String result = null; - for (String column : columns) - { - String trimmed = column.trim(); - if (keyNames.contains(trimmed)) - continue; - - String quoted = quote(trimmed); - result = result == null ? quoted : result + "," + quoted; - } - return result; - } - - /** serialize the where clause */ - private Pair<Integer, String> whereClause() - { - if (partitionKeyString == null) - partitionKeyString = keyString(partitionBoundColumns); - - if (partitionKeyMarkers == null) - partitionKeyMarkers = partitionKeyMarkers(); - // initial query token(k) >= start_token and token(k) <= end_token - if (emptyPartitionKeyValues()) - return Pair.create(0, String.format(" WHERE token(%s) > ? AND token(%s) <= ?", partitionKeyString, partitionKeyString)); - - // query token(k) > token(pre_partition_key) and token(k) <= end_token - if (clusterColumns.size() == 0 || clusterColumns.get(0).value == null) - return Pair.create(1, - String.format(" WHERE token(%s) > token(%s) AND token(%s) <= ?", - partitionKeyString, partitionKeyMarkers, partitionKeyString)); - - // query token(k) = token(pre_partition_key) and m = pre_cluster_key_m and n > pre_cluster_key_n - Pair<Integer, String> clause = whereClause(clusterColumns, 0); - return Pair.create(clause.left, - String.format(" WHERE token(%s) = token(%s) %s", partitionKeyString, partitionKeyMarkers, clause.right)); - } - - /** recursively serialize the where clause */ - private Pair<Integer, String> whereClause(List<BoundColumn> column, int position) - { - if (position == column.size() - 1 || column.get(position + 1).value == null) - return Pair.create(position + 2, String.format(" AND %s %s ? ", quote(column.get(position).name), column.get(position).reversed ? " < " : " >")); - - Pair<Integer, String> clause = whereClause(column, position + 1); - return Pair.create(clause.left, String.format(" AND %s = ? %s", quote(column.get(position).name), clause.right)); - } - - /** check whether all key values are null */ - private boolean emptyPartitionKeyValues() - { - for (BoundColumn column : partitionBoundColumns) - { - if (column.value != null) - return false; - } - return true; - } - - /** serialize the partition key string in format of <key1>, <key2>, <key3> */ - private String keyString(List<BoundColumn> columns) - { - String result = null; - for (BoundColumn column : columns) - result = result == null ? quote(column.name) : result + "," + quote(column.name); - - return result == null ? "" : result; - } - - /** serialize the question marks for partition key string in format of ?, ? , ? */ - private String partitionKeyMarkers() - { - String result = null; - for (BoundColumn column : partitionBoundColumns) - result = result == null ? "?" : result + ",?"; - - return result; - } - - /** serialize the query binding variables, pair.left is query id, pair.right is the binding variables */ - private Pair<Integer, List<ByteBuffer>> preparedQueryBindValues() - { - List<ByteBuffer> values = new LinkedList<ByteBuffer>(); - - // initial query token(k) >= start_token and token(k) <= end_token - if (emptyPartitionKeyValues()) - { - values.add(partitioner.getTokenValidator().fromString(split.getStartToken())); - values.add(partitioner.getTokenValidator().fromString(split.getEndToken())); - return Pair.create(0, values); - } - else - { - for (BoundColumn partitionBoundColumn1 : partitionBoundColumns) - values.add(partitionBoundColumn1.value); - - if (clusterColumns.size() == 0 || clusterColumns.get(0).value == null) - { - // query token(k) > token(pre_partition_key) and token(k) <= end_token - values.add(partitioner.getTokenValidator().fromString(split.getEndToken())); - return Pair.create(1, values); - } - else - { - // query token(k) = token(pre_partition_key) and m = pre_cluster_key_m and n > pre_cluster_key_n - int type = preparedQueryBindValues(clusterColumns, 0, values); - return Pair.create(type, values); - } - } - } - - /** recursively serialize the query binding variables */ - private int preparedQueryBindValues(List<BoundColumn> column, int position, List<ByteBuffer> bindValues) - { - if (position == column.size() - 1 || column.get(position + 1).value == null) - { - bindValues.add(column.get(position).value); - return position + 2; - } - else - { - bindValues.add(column.get(position).value); - return preparedQueryBindValues(column, position + 1, bindValues); - } - } - - /** get the prepared query item Id */ - private int prepareQuery(int type) throws InvalidRequestException, TException - { - Integer itemId = preparedQueryIds.get(type); - if (itemId != null) - return itemId; - - Pair<Integer, String> query = null; - query = composeQuery(columns); - logger.debug("type: {}, query: {}", query.left, query.right); - CqlPreparedResult cqlPreparedResult = client.prepare_cql3_query(ByteBufferUtil.bytes(query.right), Compression.NONE); - preparedQueryIds.put(query.left, cqlPreparedResult.itemId); - return cqlPreparedResult.itemId; - } - - /** Quoting for working with uppercase */ - private String quote(String identifier) - { - return "\"" + identifier.replaceAll("\"", "\"\"") + "\""; - } - - /** execute the prepared query */ - private void executeQuery() - { - Pair<Integer, List<ByteBuffer>> bindValues = preparedQueryBindValues(); - logger.debug("query type: {}", bindValues.left); - - // check whether it reach end of range for type 1 query CASSANDRA-5573 - if (bindValues.left == 1 && reachEndRange()) - { - rows = null; - return; - } - - int retries = 0; - // only try three times for TimedOutException and UnavailableException - while (retries < 3) - { - try - { - CqlResult cqlResult = client.execute_prepared_cql3_query(prepareQuery(bindValues.left), bindValues.right, consistencyLevel); - if (cqlResult != null && cqlResult.rows != null) - rows = cqlResult.rows.iterator(); - return; - } - catch (TimedOutException e) - { - retries++; - if (retries >= 3) - { - rows = null; - RuntimeException rte = new RuntimeException(e.getMessage()); - rte.initCause(e); - throw rte; - } - } - catch (UnavailableException e) - { - retries++; - if (retries >= 3) - { - rows = null; - RuntimeException rte = new RuntimeException(e.getMessage()); - rte.initCause(e); - throw rte; - } - } - catch (Exception e) - { - rows = null; - RuntimeException rte = new RuntimeException(e.getMessage()); - rte.initCause(e); - throw rte; - } - } - } - } - - /** retrieve the partition keys and cluster keys from system.schema_columnfamilies table */ - private void retrieveKeys() throws Exception - { - String query = "select key_aliases," + - "column_aliases, " + - "key_validator, " + - "comparator " + - "from system.schema_columnfamilies " + - "where keyspace_name='%s' and columnfamily_name='%s'"; - String formatted = String.format(query, keyspace, cfName); - CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(formatted), Compression.NONE, ConsistencyLevel.ONE); - - CqlRow cqlRow = result.rows.get(0); - String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue())); - logger.debug("partition keys: {}", keyString); - List<String> keys = FBUtilities.fromJsonList(keyString); - - for (String key : keys) - partitionBoundColumns.add(new BoundColumn(key)); - - keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue())); - logger.debug("cluster columns: {}", keyString); - keys = FBUtilities.fromJsonList(keyString); - - for (String key : keys) - clusterColumns.add(new BoundColumn(key)); - - Column rawKeyValidator = cqlRow.columns.get(2); - String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue())); - logger.debug("row key validator: {}", validator); - keyValidator = parseType(validator); - - if (keyValidator instanceof CompositeType) - { - List<AbstractType<?>> types = ((CompositeType) keyValidator).types; - for (int i = 0; i < partitionBoundColumns.size(); i++) - partitionBoundColumns.get(i).validator = types.get(i); - } - else - { - partitionBoundColumns.get(0).validator = keyValidator; - } - - Column rawComparator = cqlRow.columns.get(3); - String comparator = ByteBufferUtil.string(ByteBuffer.wrap(rawComparator.getValue())); - logger.debug("comparator: {}", comparator); - AbstractType comparatorValidator = parseType(comparator); - if (comparatorValidator instanceof CompositeType) - { - for (int i = 0; i < clusterColumns.size(); i++) - clusterColumns.get(i).reversed = (((CompositeType) comparatorValidator).types.get(i) instanceof ReversedType); - } - else if (comparatorValidator instanceof ReversedType) - { - clusterColumns.get(0).reversed = true; - } - } - - /** check whether current row is at the end of range */ - private boolean reachEndRange() - { - // current row key - ByteBuffer rowKey; - if (keyValidator instanceof CompositeType) - { - ByteBuffer[] keys = new ByteBuffer[partitionBoundColumns.size()]; - for (int i = 0; i < partitionBoundColumns.size(); i++) - keys[i] = partitionBoundColumns.get(i).value.duplicate(); - - rowKey = CompositeType.build(keys); - } - else - { - rowKey = partitionBoundColumns.get(0).value; - } - - String endToken = split.getEndToken(); - String currentToken = partitioner.getToken(rowKey).toString(); - logger.debug("End token: {}, current token: {}", endToken, currentToken); - - return endToken.equals(currentToken); - } - - private static AbstractType<?> parseType(String type) throws IOException - { - try - { - // always treat counters like longs, specifically CCT.serialize is not what we need - if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType")) - return LongType.instance; - return TypeParser.parse(type); - } - catch (ConfigurationException e) - { - throw new IOException(e); - } - catch (SyntaxException e) - { - throw new IOException(e); - } - } - - private static class BoundColumn - { - final String name; - ByteBuffer value; - AbstractType<?> validator; - boolean reversed = false; - - public BoundColumn(String name) - { - this.name = name; - } - } - - /** get string from a ByteBuffer, catch the exception and throw it as runtime exception*/ - private static String stringValue(ByteBuffer value) - { - try - { - return ByteBufferUtil.string(value); - } - catch (CharacterCodingException e) - { - throw new RuntimeException(e); - } - } -}