http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/plan/Operation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/plan/Operation.java b/src/java/org/apache/cassandra/index/sasi/plan/Operation.java new file mode 100644 index 0000000..1857c56 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/plan/Operation.java @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.plan; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.ColumnDefinition.Kind; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.index.sasi.conf.ColumnIndex; +import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer; +import org.apache.cassandra.index.sasi.disk.Token; +import org.apache.cassandra.index.sasi.plan.Expression.Op; +import org.apache.cassandra.index.sasi.utils.RangeIntersectionIterator; +import org.apache.cassandra.index.sasi.utils.RangeIterator; +import org.apache.cassandra.index.sasi.utils.RangeUnionIterator; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.*; +import org.apache.cassandra.utils.FBUtilities; + +public class Operation extends RangeIterator<Long, Token> +{ + public enum OperationType + { + AND, OR; + + public boolean apply(boolean a, boolean b) + { + switch (this) + { + case OR: + return a | b; + + case AND: + return a & b; + + default: + throw new AssertionError(); + } + } + } + + private final QueryController controller; + + protected final OperationType op; + protected final ListMultimap<ColumnDefinition, Expression> expressions; + protected final RangeIterator<Long, Token> range; + + protected Operation left, right; + + private Operation(OperationType operation, + QueryController controller, + ListMultimap<ColumnDefinition, Expression> expressions, + RangeIterator<Long, Token> range, + Operation left, Operation right) + { + super(range); + + this.op = operation; + this.controller = controller; + this.expressions = expressions; + this.range = range; + + this.left = left; + this.right = right; + } + + /** + * Recursive "satisfies" checks based on operation + * and data from the lower level members using depth-first search + * and bubbling the results back to the top level caller. + * + * Most of the work here is done by {@link #localSatisfiedBy(Unfiltered, boolean)} + * see it's comment for details, if there are no local expressions + * assigned to Operation it will call satisfiedBy(Row) on it's children. + * + * Query: first_name = X AND (last_name = Y OR address = XYZ AND street = IL AND city = C) OR (state = 'CA' AND country = 'US') + * Row: key1: (first_name: X, last_name: Z, address: XYZ, street: IL, city: C, state: NY, country:US) + * + * #1 OR + * / \ + * #2 (first_name) AND AND (state, country) + * \ + * #3 (last_name) OR + * \ + * #4 AND (address, street, city) + * + * + * Evaluation of the key1 is top-down depth-first search: + * + * --- going down --- + * Level #1 is evaluated, OR expression has to pull results from it's children which are at level #2 and OR them together, + * Level #2 AND (state, country) could be be evaluated right away, AND (first_name) refers to it's "right" child from level #3 + * Level #3 OR (last_name) requests results from level #4 + * Level #4 AND (address, street, city) does logical AND between it's 3 fields, returns result back to level #3. + * --- bubbling up --- + * Level #3 computes OR between AND (address, street, city) result and it's "last_name" expression + * Level #2 computes AND between "first_name" and result of level #3, AND (state, country) which is already computed + * Level #1 does OR between results of AND (first_name) and AND (state, country) and returns final result. + * + * @param row The row to check. + * @return true if give Row satisfied all of the expressions in the tree, + * false otherwise. + */ + public boolean satisfiedBy(Unfiltered row, boolean allowMissingColumns) + { + boolean sideL, sideR; + + if (expressions == null || expressions.isEmpty()) + { + sideL = left != null && left.satisfiedBy(row, allowMissingColumns); + sideR = right != null && right.satisfiedBy(row, allowMissingColumns); + + // one of the expressions was skipped + // because it had no indexes attached + if (left == null) + return sideR; + } + else + { + sideL = localSatisfiedBy(row, allowMissingColumns); + + // if there is no right it means that this expression + // is last in the sequence, we can just return result from local expressions + if (right == null) + return sideL; + + sideR = right.satisfiedBy(row, allowMissingColumns); + } + + + return op.apply(sideL, sideR); + } + + /** + * Check every expression in the analyzed list to figure out if the + * columns in the give row match all of the based on the operation + * set to the current operation node. + * + * The algorithm is as follows: for every given expression from analyzed + * list get corresponding column from the Row: + * - apply {@link Expression#contains(ByteBuffer)} + * method to figure out if it's satisfied; + * - apply logical operation between boolean accumulator and current boolean result; + * - if result == false and node's operation is AND return right away; + * + * After all of the expressions have been evaluated return resulting accumulator variable. + * + * Example: + * + * Operation = (op: AND, columns: [first_name = p, 5 < age < 7, last_name: y]) + * Row = (first_name: pavel, last_name: y, age: 6, timestamp: 15) + * + * #1 get "first_name" = p (expressions) + * - row-get "first_name" => "pavel" + * - compare "pavel" against "p" => true (current) + * - set accumulator current => true (because this is expression #1) + * + * #2 get "last_name" = y (expressions) + * - row-get "last_name" => "y" + * - compare "y" against "y" => true (current) + * - set accumulator to accumulator & current => true + * + * #3 get 5 < "age" < 7 (expressions) + * - row-get "age" => "6" + * - compare 5 < 6 < 7 => true (current) + * - set accumulator to accumulator & current => true + * + * #4 return accumulator => true (row satisfied all of the conditions) + * + * @param row The row to check. + * @return true if give Row satisfied all of the analyzed expressions, + * false otherwise. + */ + private boolean localSatisfiedBy(Unfiltered row, boolean allowMissingColumns) + { + if (row == null || !row.isRow()) + return false; + + final int now = FBUtilities.nowInSeconds(); + boolean result = false; + int idx = 0; + + for (ColumnDefinition column : expressions.keySet()) + { + if (column.kind == Kind.PARTITION_KEY) + continue; + + ByteBuffer value = ColumnIndex.getValueOf(column, (Row) row, now); + boolean isMissingColumn = value == null; + + if (!allowMissingColumns && isMissingColumn) + throw new IllegalStateException("All indexed columns should be included into the column slice, missing: " + column); + + boolean isMatch = false; + // If there is a column with multiple expressions that effectively means an OR + // e.g. comment = 'x y z' could be split into 'comment' EQ 'x', 'comment' EQ 'y', 'comment' EQ 'z' + // by analyzer, in situation like that we only need to check if at least one of expressions matches, + // and there is no hit on the NOT_EQ (if any) which are always at the end of the filter list. + // Loop always starts from the end of the list, which makes it possible to break after the last + // NOT_EQ condition on first EQ/RANGE condition satisfied, instead of checking every + // single expression in the column filter list. + List<Expression> filters = expressions.get(column); + for (int i = filters.size() - 1; i >= 0; i--) + { + Expression expression = filters.get(i); + isMatch = !isMissingColumn && expression.contains(value); + if (expression.getOp() == Op.NOT_EQ) + { + // since this is NOT_EQ operation we have to + // inverse match flag (to check against other expressions), + // and break in case of negative inverse because that means + // that it's a positive hit on the not-eq clause. + isMatch = !isMatch; + if (!isMatch) + break; + } // if it was a match on EQ/RANGE or column is missing + else if (isMatch || isMissingColumn) + break; + } + + if (idx++ == 0) + { + result = isMatch; + continue; + } + + result = op.apply(result, isMatch); + + // exit early because we already got a single false + if (op == OperationType.AND && !result) + return false; + } + + return idx == 0 || result; + } + + @VisibleForTesting + protected static ListMultimap<ColumnDefinition, Expression> analyzeGroup(QueryController controller, + OperationType op, + List<RowFilter.Expression> expressions) + { + ListMultimap<ColumnDefinition, Expression> analyzed = ArrayListMultimap.create(); + + // sort all of the expressions in the operation by name and priority of the logical operator + // this gives us an efficient way to handle inequality and combining into ranges without extra processing + // and converting expressions from one type to another. + Collections.sort(expressions, (a, b) -> { + int cmp = a.column().compareTo(b.column()); + return cmp == 0 ? -Integer.compare(getPriority(a.operator()), getPriority(b.operator())) : cmp; + }); + + for (final RowFilter.Expression e : expressions) + { + ColumnIndex columnIndex = controller.getIndex(e); + List<Expression> perColumn = analyzed.get(e.column()); + + if (columnIndex == null) + columnIndex = new ColumnIndex(controller.getKeyValidator(), e.column(), null); + + AbstractAnalyzer analyzer = columnIndex.getAnalyzer(); + analyzer.reset(e.getIndexValue()); + + // EQ/NOT_EQ can have multiple expressions e.g. text = "Hello World", + // becomes text = "Hello" OR text = "World" because "space" is always interpreted as a split point (by analyzer), + // NOT_EQ is made an independent expression only in case of pre-existing multiple EQ expressions, or + // if there is no EQ operations and NOT_EQ is met or a single NOT_EQ expression present, + // in such case we know exactly that there would be no more EQ/RANGE expressions for given column + // since NOT_EQ has the lowest priority. + if (e.operator() == Operator.EQ + || (e.operator() == Operator.NEQ + && (perColumn.size() == 0 || perColumn.size() > 1 + || (perColumn.size() == 1 && perColumn.get(0).getOp() == Op.NOT_EQ)))) + { + while (analyzer.hasNext()) + { + final ByteBuffer token = analyzer.next(); + perColumn.add(new Expression(controller, columnIndex).add(e.operator(), token)); + } + } + else + // "range" or not-equals operator, combines both bounds together into the single expression, + // iff operation of the group is AND, otherwise we are forced to create separate expressions, + // not-equals is combined with the range iff operator is AND. + { + Expression range; + if (perColumn.size() == 0 || op != OperationType.AND) + perColumn.add((range = new Expression(controller, columnIndex))); + else + range = Iterables.getLast(perColumn); + + while (analyzer.hasNext()) + range.add(e.operator(), analyzer.next()); + } + } + + return analyzed; + } + + private static int getPriority(Operator op) + { + switch (op) + { + case EQ: + return 4; + + case GTE: + case GT: + return 3; + + case LTE: + case LT: + return 2; + + case NEQ: + return 1; + + default: + return 0; + } + } + + protected Token computeNext() + { + return range != null && range.hasNext() ? range.next() : endOfData(); + } + + protected void performSkipTo(Long nextToken) + { + if (range != null) + range.skipTo(nextToken); + } + + public void close() throws IOException + { + controller.releaseIndexes(this); + } + + public static class Builder + { + private final QueryController controller; + + protected final OperationType op; + protected final List<RowFilter.Expression> expressions; + + protected Builder left, right; + + public Builder(OperationType operation, QueryController controller, RowFilter.Expression... columns) + { + this.op = operation; + this.controller = controller; + this.expressions = new ArrayList<>(); + Collections.addAll(expressions, columns); + } + + public Builder setRight(Builder operation) + { + this.right = operation; + return this; + } + + public Builder setLeft(Builder operation) + { + this.left = operation; + return this; + } + + public void add(RowFilter.Expression e) + { + expressions.add(e); + } + + public void add(Collection<RowFilter.Expression> newExpressions) + { + if (expressions != null) + expressions.addAll(newExpressions); + } + + public Operation complete() + { + if (!expressions.isEmpty()) + { + ListMultimap<ColumnDefinition, Expression> analyzedExpressions = analyzeGroup(controller, op, expressions); + RangeIterator.Builder<Long, Token> range = controller.getIndexes(op, analyzedExpressions.values()); + + Operation rightOp = null; + if (right != null) + { + rightOp = right.complete(); + range.add(rightOp); + } + + return new Operation(op, controller, analyzedExpressions, range.build(), null, rightOp); + } + else + { + Operation leftOp = null, rightOp = null; + boolean leftIndexes = false, rightIndexes = false; + + if (left != null) + { + leftOp = left.complete(); + leftIndexes = leftOp != null && leftOp.range != null; + } + + if (right != null) + { + rightOp = right.complete(); + rightIndexes = rightOp != null && rightOp.range != null; + } + + RangeIterator<Long, Token> join; + /** + * Operation should allow one of it's sub-trees to wrap no indexes, that is related to the fact that we + * have to accept defined-but-not-indexed columns as well as key range as IndexExpressions. + * + * Two cases are possible: + * + * only left child produced indexed iterators, that could happen when there are two columns + * or key range on the right: + * + * AND + * / \ + * OR \ + * / \ AND + * a b / \ + * key key + * + * only right child produced indexed iterators: + * + * AND + * / \ + * AND a + * / \ + * key key + */ + if (leftIndexes && !rightIndexes) + join = leftOp; + else if (!leftIndexes && rightIndexes) + join = rightOp; + else if (leftIndexes) + { + RangeIterator.Builder<Long, Token> builder = op == OperationType.OR + ? RangeUnionIterator.<Long, Token>builder() + : RangeIntersectionIterator.<Long, Token>builder(); + + join = builder.add(leftOp).add(rightOp).build(); + } + else + throw new AssertionError("both sub-trees have 0 indexes."); + + return new Operation(op, controller, null, join, leftOp, rightOp); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java new file mode 100644 index 0000000..8e10fd0 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.plan; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.ColumnFamilyStore.RefViewFragment; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.index.sasi.SASIIndex; +import org.apache.cassandra.index.sasi.SSTableIndex; +import org.apache.cassandra.index.sasi.TermIterator; +import org.apache.cassandra.index.sasi.conf.ColumnIndex; +import org.apache.cassandra.index.sasi.conf.view.View; +import org.apache.cassandra.index.sasi.disk.Token; +import org.apache.cassandra.index.sasi.exceptions.TimeQuotaExceededException; +import org.apache.cassandra.index.sasi.plan.Operation.OperationType; +import org.apache.cassandra.index.sasi.utils.RangeIntersectionIterator; +import org.apache.cassandra.index.sasi.utils.RangeIterator; +import org.apache.cassandra.index.sasi.utils.RangeUnionIterator; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.Pair; + +public class QueryController +{ + private final long executionQuota; + private final long executionStart; + + private final ColumnFamilyStore cfs; + private final PartitionRangeReadCommand command; + private final Map<Collection<Expression>, List<RangeIterator<Long, Token>>> resources = new HashMap<>(); + private final RefViewFragment scope; + private final Set<SSTableReader> sstables; + + public QueryController(ColumnFamilyStore cfs, PartitionRangeReadCommand command, long timeQuotaMs) + { + this.cfs = cfs; + this.command = command; + this.executionQuota = TimeUnit.MILLISECONDS.toNanos(timeQuotaMs); + this.executionStart = System.nanoTime(); + this.scope = getSSTableScope(cfs, command); + this.sstables = new HashSet<>(scope.sstables); + } + + public boolean isForThrift() + { + return command.isForThrift(); + } + + public CFMetaData metadata() + { + return command.metadata(); + } + + public Collection<RowFilter.Expression> getExpressions() + { + return command.rowFilter().getExpressions(); + } + + public DataRange dataRange() + { + return command.dataRange(); + } + + public AbstractType<?> getKeyValidator() + { + return cfs.metadata.getKeyValidator(); + } + + public ColumnIndex getIndex(RowFilter.Expression expression) + { + Optional<Index> index = cfs.indexManager.getBestIndexFor(expression); + return index.isPresent() ? ((SASIIndex) index.get()).getIndex() : null; + } + + + public UnfilteredRowIterator getPartition(DecoratedKey key, ReadExecutionController executionController) + { + if (key == null) + throw new NullPointerException(); + try + { + SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(command.isForThrift(), + cfs.metadata, + command.nowInSec(), + command.columnFilter(), + command.rowFilter().withoutExpressions(), + DataLimits.NONE, + key, + command.clusteringIndexFilter(key)); + + return partition.queryMemtableAndDisk(cfs, executionController.baseReadOpOrderGroup()); + } + finally + { + checkpoint(); + } + } + + /** + * Build a range iterator from the given list of expressions by applying given operation (OR/AND). + * Building of such iterator involves index search, results of which are persisted in the internal resources list + * and can be released later via {@link QueryController#releaseIndexes(Operation)}. + * + * @param op The operation type to coalesce expressions with. + * @param expressions The expressions to build range iterator from (expressions with not results are ignored). + * + * @return The range builder based on given expressions and operation type. + */ + public RangeIterator.Builder<Long, Token> getIndexes(OperationType op, Collection<Expression> expressions) + { + if (resources.containsKey(expressions)) + throw new IllegalArgumentException("Can't process the same expressions multiple times."); + + RangeIterator.Builder<Long, Token> builder = op == OperationType.OR + ? RangeUnionIterator.<Long, Token>builder() + : RangeIntersectionIterator.<Long, Token>builder(); + + List<RangeIterator<Long, Token>> perIndexUnions = new ArrayList<>(); + + for (Map.Entry<Expression, Set<SSTableIndex>> e : getView(op, expressions).entrySet()) + { + RangeIterator<Long, Token> index = TermIterator.build(e.getKey(), e.getValue()); + + if (index == null) + continue; + + builder.add(index); + perIndexUnions.add(index); + } + + resources.put(expressions, perIndexUnions); + return builder; + } + + public void checkpoint() + { + if ((System.nanoTime() - executionStart) >= executionQuota) + throw new TimeQuotaExceededException(); + } + + public void releaseIndexes(Operation operation) + { + if (operation.expressions != null) + releaseIndexes(resources.remove(operation.expressions.values())); + } + + private void releaseIndexes(List<RangeIterator<Long, Token>> indexes) + { + if (indexes == null) + return; + + indexes.forEach(FileUtils::closeQuietly); + } + + public void finish() + { + try + { + resources.values().forEach(this::releaseIndexes); + } + finally + { + scope.release(); + } + } + + private Map<Expression, Set<SSTableIndex>> getView(OperationType op, Collection<Expression> expressions) + { + // first let's determine the primary expression if op is AND + Pair<Expression, Set<SSTableIndex>> primary = (op == OperationType.AND) ? calculatePrimary(expressions) : null; + + Map<Expression, Set<SSTableIndex>> indexes = new HashMap<>(); + for (Expression e : expressions) + { + // NO_EQ and non-index column query should only act as FILTER BY for satisfiedBy(Row) method + // because otherwise it likely to go through the whole index. + if (!e.isIndexed() || e.getOp() == Expression.Op.NOT_EQ) + continue; + + // primary expression, we'll have to add as is + if (primary != null && e.equals(primary.left)) + { + indexes.put(primary.left, primary.right); + continue; + } + + View view = e.index.getView(); + if (view == null) + continue; + + Set<SSTableIndex> readers = new HashSet<>(); + if (primary != null && primary.right.size() > 0) + { + for (SSTableIndex index : primary.right) + readers.addAll(view.match(index.minKey(), index.maxKey())); + } + else + { + readers.addAll(view.match(sstables, e)); + } + + indexes.put(e, readers); + } + + return indexes; + } + + private Pair<Expression, Set<SSTableIndex>> calculatePrimary(Collection<Expression> expressions) + { + Expression expression = null; + Set<SSTableIndex> primaryIndexes = Collections.emptySet(); + + for (Expression e : expressions) + { + if (!e.isIndexed()) + continue; + + View view = e.index.getView(); + if (view == null) + continue; + + Set<SSTableIndex> indexes = view.match(sstables, e); + if (primaryIndexes.size() > indexes.size()) + { + primaryIndexes = indexes; + expression = e; + } + } + + return expression == null ? null : Pair.create(expression, primaryIndexes); + } + + private static RefViewFragment getSSTableScope(ColumnFamilyStore cfs, PartitionRangeReadCommand command) + { + return cfs.selectAndReference(org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL, command.dataRange().keyRange())); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java new file mode 100644 index 0000000..d34b05a --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.plan; + +import java.util.*; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.index.sasi.disk.Token; +import org.apache.cassandra.index.sasi.plan.Operation.OperationType; +import org.apache.cassandra.exceptions.RequestTimeoutException; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.AbstractIterator; + +public class QueryPlan +{ + private final QueryController controller; + + public QueryPlan(ColumnFamilyStore cfs, ReadCommand command, long executionQuotaMs) + { + this.controller = new QueryController(cfs, (PartitionRangeReadCommand) command, executionQuotaMs); + } + + /** + * Converts expressions into operation tree (which is currently just a single AND). + * + * Operation tree allows us to do a couple of important optimizations + * namely, group flattening for AND operations (query rewrite), expression bounds checks, + * "satisfies by" checks for resulting rows with an early exit. + * + * @return root of the operations tree. + */ + private Operation analyze() + { + try + { + Operation.Builder and = new Operation.Builder(OperationType.AND, controller); + controller.getExpressions().forEach(and::add); + return and.complete(); + } + catch (Exception | Error e) + { + controller.finish(); + throw e; + } + } + + public UnfilteredPartitionIterator execute(ReadExecutionController executionController) throws RequestTimeoutException + { + return new ResultIterator(analyze(), controller, executionController); + } + + private static class ResultIterator extends AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator + { + private final AbstractBounds<PartitionPosition> keyRange; + private final Operation operationTree; + private final QueryController controller; + private final ReadExecutionController executionController; + + private Iterator<DecoratedKey> currentKeys = null; + + public ResultIterator(Operation operationTree, QueryController controller, ReadExecutionController executionController) + { + this.keyRange = controller.dataRange().keyRange(); + this.operationTree = operationTree; + this.controller = controller; + this.executionController = executionController; + if (operationTree != null) + operationTree.skipTo((Long) keyRange.left.getToken().getTokenValue()); + } + + protected UnfilteredRowIterator computeNext() + { + if (operationTree == null) + return endOfData(); + + for (;;) + { + if (currentKeys == null || !currentKeys.hasNext()) + { + if (!operationTree.hasNext()) + return endOfData(); + + Token token = operationTree.next(); + currentKeys = token.iterator(); + } + + while (currentKeys.hasNext()) + { + DecoratedKey key = currentKeys.next(); + + if (!keyRange.right.isMinimum() && keyRange.right.compareTo(key) < 0) + return endOfData(); + + try (UnfilteredRowIterator partition = controller.getPartition(key, executionController)) + { + List<Unfiltered> clusters = new ArrayList<>(); + while (partition.hasNext()) + { + Unfiltered row = partition.next(); + if (operationTree.satisfiedBy(row, true)) + clusters.add(row); + } + + if (!clusters.isEmpty()) + return new PartitionIterator(partition, clusters); + } + } + } + } + + private static class PartitionIterator extends AbstractUnfilteredRowIterator + { + private final Iterator<Unfiltered> rows; + + public PartitionIterator(UnfilteredRowIterator partition, Collection<Unfiltered> content) + { + super(partition.metadata(), + partition.partitionKey(), + partition.partitionLevelDeletion(), + partition.columns(), + partition.staticRow(), + partition.isReverseOrder(), + partition.stats()); + + rows = content.iterator(); + } + + @Override + protected Unfiltered computeNext() + { + return rows.hasNext() ? rows.next() : endOfData(); + } + } + + public boolean isForThrift() + { + return controller.isForThrift(); + } + + public CFMetaData metadata() + { + return controller.metadata(); + } + + public void close() + { + FileUtils.closeQuietly(operationTree); + controller.finish(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/ByteTerm.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/sa/ByteTerm.java b/src/java/org/apache/cassandra/index/sasi/sa/ByteTerm.java new file mode 100644 index 0000000..c7bbab7 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/sa/ByteTerm.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.sa; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder; +import org.apache.cassandra.db.marshal.AbstractType; + +public class ByteTerm extends Term<ByteBuffer> +{ + public ByteTerm(int position, ByteBuffer value, TokenTreeBuilder tokens) + { + super(position, value, tokens); + } + + public ByteBuffer getTerm() + { + return value.duplicate(); + } + + public ByteBuffer getSuffix(int start) + { + return (ByteBuffer) value.duplicate().position(value.position() + start); + } + + public int compareTo(AbstractType<?> comparator, Term other) + { + return comparator.compare(value, (ByteBuffer) other.value); + } + + public int length() + { + return value.remaining(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/CharTerm.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/sa/CharTerm.java b/src/java/org/apache/cassandra/index/sasi/sa/CharTerm.java new file mode 100644 index 0000000..533b566 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/sa/CharTerm.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.sa; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; + +import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder; +import org.apache.cassandra.db.marshal.AbstractType; + +import com.google.common.base.Charsets; + +public class CharTerm extends Term<CharBuffer> +{ + public CharTerm(int position, CharBuffer value, TokenTreeBuilder tokens) + { + super(position, value, tokens); + } + + public ByteBuffer getTerm() + { + return Charsets.UTF_8.encode(value.duplicate()); + } + + public ByteBuffer getSuffix(int start) + { + return Charsets.UTF_8.encode(value.subSequence(value.position() + start, value.remaining())); + } + + public int compareTo(AbstractType<?> comparator, Term other) + { + return value.compareTo((CharBuffer) other.value); + } + + public int length() + { + return value.length(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/IntegralSA.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/sa/IntegralSA.java b/src/java/org/apache/cassandra/index/sasi/sa/IntegralSA.java new file mode 100644 index 0000000..8356585 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/sa/IntegralSA.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.sa; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; + +import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; +import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.utils.Pair; + +public class IntegralSA extends SA<ByteBuffer> +{ + public IntegralSA(AbstractType<?> comparator, OnDiskIndexBuilder.Mode mode) + { + super(comparator, mode); + } + + public Term<ByteBuffer> getTerm(ByteBuffer termValue, TokenTreeBuilder tokens) + { + return new ByteTerm(charCount, termValue, tokens); + } + + public TermIterator finish() + { + return new IntegralSuffixIterator(); + } + + + private class IntegralSuffixIterator extends TermIterator + { + private final Iterator<Term<ByteBuffer>> termIterator; + + public IntegralSuffixIterator() + { + Collections.sort(terms, new Comparator<Term<?>>() + { + public int compare(Term<?> a, Term<?> b) + { + return a.compareTo(comparator, b); + } + }); + + termIterator = terms.iterator(); + } + + public ByteBuffer minTerm() + { + return terms.get(0).getTerm(); + } + + public ByteBuffer maxTerm() + { + return terms.get(terms.size() - 1).getTerm(); + } + + protected Pair<ByteBuffer, TokenTreeBuilder> computeNext() + { + if (!termIterator.hasNext()) + return endOfData(); + + Term<ByteBuffer> term = termIterator.next(); + return Pair.create(term.getTerm(), term.getTokens().finish()); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/SA.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/sa/SA.java b/src/java/org/apache/cassandra/index/sasi/sa/SA.java new file mode 100644 index 0000000..75f9f92 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/sa/SA.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.sa; + +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder.Mode; +import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder; +import org.apache.cassandra.db.marshal.AbstractType; + +public abstract class SA<T extends Buffer> +{ + protected final AbstractType<?> comparator; + protected final Mode mode; + + protected final List<Term<T>> terms = new ArrayList<>(); + protected int charCount = 0; + + public SA(AbstractType<?> comparator, Mode mode) + { + this.comparator = comparator; + this.mode = mode; + } + + public Mode getMode() + { + return mode; + } + + public void add(ByteBuffer termValue, TokenTreeBuilder tokens) + { + Term<T> term = getTerm(termValue, tokens); + terms.add(term); + charCount += term.length(); + } + + public abstract TermIterator finish(); + + protected abstract Term<T> getTerm(ByteBuffer termValue, TokenTreeBuilder tokens); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java b/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java new file mode 100644 index 0000000..63f6c5b --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.sa; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; + +import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; +import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.utils.Pair; + +import com.google.common.base.Charsets; +import net.mintern.primitive.Primitive; + +public class SuffixSA extends SA<CharBuffer> +{ + public SuffixSA(AbstractType<?> comparator, OnDiskIndexBuilder.Mode mode) + { + super(comparator, mode); + } + + protected Term<CharBuffer> getTerm(ByteBuffer termValue, TokenTreeBuilder tokens) + { + return new CharTerm(charCount, Charsets.UTF_8.decode(termValue.duplicate()), tokens); + } + + public TermIterator finish() + { + return new SASuffixIterator(); + } + + private class SASuffixIterator extends TermIterator + { + private final long[] suffixes; + + private int current = 0; + private ByteBuffer lastProcessedSuffix; + private TokenTreeBuilder container; + + public SASuffixIterator() + { + // each element has term index and char position encoded as two 32-bit integers + // to avoid binary search per suffix while sorting suffix array. + suffixes = new long[charCount]; + + long termIndex = -1, currentTermLength = -1; + for (int i = 0; i < charCount; i++) + { + if (i >= currentTermLength || currentTermLength == -1) + { + Term currentTerm = terms.get((int) ++termIndex); + currentTermLength = currentTerm.getPosition() + currentTerm.length(); + } + + suffixes[i] = (termIndex << 32) | i; + } + + Primitive.sort(suffixes, (a, b) -> { + Term aTerm = terms.get((int) (a >>> 32)); + Term bTerm = terms.get((int) (b >>> 32)); + return comparator.compare(aTerm.getSuffix(((int) a) - aTerm.getPosition()), + bTerm.getSuffix(((int) b) - bTerm.getPosition())); + }); + } + + private Pair<ByteBuffer, TokenTreeBuilder> suffixAt(int position) + { + long index = suffixes[position]; + Term term = terms.get((int) (index >>> 32)); + return Pair.create(term.getSuffix(((int) index) - term.getPosition()), term.getTokens()); + } + + public ByteBuffer minTerm() + { + return suffixAt(0).left; + } + + public ByteBuffer maxTerm() + { + return suffixAt(suffixes.length - 1).left; + } + + protected Pair<ByteBuffer, TokenTreeBuilder> computeNext() + { + while (true) + { + if (current >= suffixes.length) + { + if (lastProcessedSuffix == null) + return endOfData(); + + Pair<ByteBuffer, TokenTreeBuilder> result = finishSuffix(); + + lastProcessedSuffix = null; + return result; + } + + Pair<ByteBuffer, TokenTreeBuilder> suffix = suffixAt(current++); + + if (lastProcessedSuffix == null) + { + lastProcessedSuffix = suffix.left; + container = new TokenTreeBuilder(suffix.right.getTokens()); + } + else if (comparator.compare(lastProcessedSuffix, suffix.left) == 0) + { + lastProcessedSuffix = suffix.left; + container.add(suffix.right.getTokens()); + } + else + { + Pair<ByteBuffer, TokenTreeBuilder> result = finishSuffix(); + + lastProcessedSuffix = suffix.left; + container = new TokenTreeBuilder(suffix.right.getTokens()); + + return result; + } + } + } + + private Pair<ByteBuffer, TokenTreeBuilder> finishSuffix() + { + return Pair.create(lastProcessedSuffix, container.finish()); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/Term.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/sa/Term.java b/src/java/org/apache/cassandra/index/sasi/sa/Term.java new file mode 100644 index 0000000..fe6eca8 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/sa/Term.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.sa; + +import java.nio.Buffer; +import java.nio.ByteBuffer; + +import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder; +import org.apache.cassandra.db.marshal.AbstractType; + +public abstract class Term<T extends Buffer> +{ + protected final int position; + protected final T value; + protected TokenTreeBuilder tokens; + + + public Term(int position, T value, TokenTreeBuilder tokens) + { + this.position = position; + this.value = value; + this.tokens = tokens; + } + + public int getPosition() + { + return position; + } + + public abstract ByteBuffer getTerm(); + public abstract ByteBuffer getSuffix(int start); + + public TokenTreeBuilder getTokens() + { + return tokens; + } + + public abstract int compareTo(AbstractType<?> comparator, Term other); + + public abstract int length(); + +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/TermIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/sa/TermIterator.java b/src/java/org/apache/cassandra/index/sasi/sa/TermIterator.java new file mode 100644 index 0000000..916aa07 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/sa/TermIterator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.sa; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder; +import org.apache.cassandra.utils.Pair; + +import com.google.common.collect.AbstractIterator; + +public abstract class TermIterator extends AbstractIterator<Pair<ByteBuffer, TokenTreeBuilder>> +{ + public abstract ByteBuffer minTerm(); + public abstract ByteBuffer maxTerm(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/AbstractIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/utils/AbstractIterator.java b/src/java/org/apache/cassandra/index/sasi/utils/AbstractIterator.java new file mode 100644 index 0000000..cf918c1 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/utils/AbstractIterator.java @@ -0,0 +1,155 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sasi.utils; + +import java.util.NoSuchElementException; + +import com.google.common.collect.PeekingIterator; + +import static com.google.common.base.Preconditions.checkState; + +// This is fork of the Guava AbstractIterator, the only difference +// is that state & next variables are now protected, this was required +// for SkippableIterator.skipTo(..) to void all previous state. +public abstract class AbstractIterator<T> implements PeekingIterator<T> +{ + protected State state = State.NOT_READY; + + /** Constructor for use by subclasses. */ + protected AbstractIterator() {} + + protected enum State + { + /** We have computed the next element and haven't returned it yet. */ + READY, + + /** We haven't yet computed or have already returned the element. */ + NOT_READY, + + /** We have reached the end of the data and are finished. */ + DONE, + + /** We've suffered an exception and are kaput. */ + FAILED, + } + + protected T next; + + /** + * Returns the next element. <b>Note:</b> the implementation must call {@link + * #endOfData()} when there are no elements left in the iteration. Failure to + * do so could result in an infinite loop. + * + * <p>The initial invocation of {@link #hasNext()} or {@link #next()} calls + * this method, as does the first invocation of {@code hasNext} or {@code + * next} following each successful call to {@code next}. Once the + * implementation either invokes {@code endOfData} or throws an exception, + * {@code computeNext} is guaranteed to never be called again. + * + * <p>If this method throws an exception, it will propagate outward to the + * {@code hasNext} or {@code next} invocation that invoked this method. Any + * further attempts to use the iterator will result in an {@link + * IllegalStateException}. + * + * <p>The implementation of this method may not invoke the {@code hasNext}, + * {@code next}, or {@link #peek()} methods on this instance; if it does, an + * {@code IllegalStateException} will result. + * + * @return the next element if there was one. If {@code endOfData} was called + * during execution, the return value will be ignored. + * @throws RuntimeException if any unrecoverable error happens. This exception + * will propagate outward to the {@code hasNext()}, {@code next()}, or + * {@code peek()} invocation that invoked this method. Any further + * attempts to use the iterator will result in an + * {@link IllegalStateException}. + */ + protected abstract T computeNext(); + + /** + * Implementations of {@link #computeNext} <b>must</b> invoke this method when + * there are no elements left in the iteration. + * + * @return {@code null}; a convenience so your {@code computeNext} + * implementation can use the simple statement {@code return endOfData();} + */ + protected final T endOfData() + { + state = State.DONE; + return null; + } + + public final boolean hasNext() + { + checkState(state != State.FAILED); + + switch (state) + { + case DONE: + return false; + + case READY: + return true; + + default: + } + + return tryToComputeNext(); + } + + protected boolean tryToComputeNext() + { + state = State.FAILED; // temporary pessimism + next = computeNext(); + + if (state != State.DONE) + { + state = State.READY; + return true; + } + + return false; + } + + public final T next() + { + if (!hasNext()) + throw new NoSuchElementException(); + + state = State.NOT_READY; + return next; + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + /** + * Returns the next element in the iteration without advancing the iteration, + * according to the contract of {@link PeekingIterator#peek()}. + * + * <p>Implementations of {@code AbstractIterator} that wish to expose this + * functionality should implement {@code PeekingIterator}. + */ + public final T peek() + { + if (!hasNext()) + throw new NoSuchElementException(); + + return next; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java new file mode 100644 index 0000000..2bf5a07 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.utils; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.cassandra.index.sasi.disk.OnDiskIndex.DataTerm; +import org.apache.cassandra.index.sasi.disk.Token; +import org.apache.cassandra.index.sasi.disk.TokenTree; +import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder; +import org.apache.cassandra.db.marshal.AbstractType; + +import com.carrotsearch.hppc.LongOpenHashSet; +import com.carrotsearch.hppc.LongSet; +import com.carrotsearch.hppc.cursors.LongCursor; + +public class CombinedTerm implements CombinedValue<DataTerm> +{ + private final AbstractType<?> comparator; + private final DataTerm term; + private final TreeMap<Long, LongSet> tokens; + + public CombinedTerm(AbstractType<?> comparator, DataTerm term) + { + this.comparator = comparator; + this.term = term; + this.tokens = new TreeMap<>(); + + RangeIterator<Long, Token> tokens = term.getTokens(); + while (tokens.hasNext()) + { + Token current = tokens.next(); + LongSet offsets = this.tokens.get(current.get()); + if (offsets == null) + this.tokens.put(current.get(), (offsets = new LongOpenHashSet())); + + for (Long offset : ((TokenTree.OnDiskToken) current).getOffsets()) + offsets.add(offset); + } + } + + public ByteBuffer getTerm() + { + return term.getTerm(); + } + + public Map<Long, LongSet> getTokens() + { + return tokens; + } + + public TokenTreeBuilder getTokenTreeBuilder() + { + return new TokenTreeBuilder(tokens).finish(); + } + + public void merge(CombinedValue<DataTerm> other) + { + if (!(other instanceof CombinedTerm)) + return; + + CombinedTerm o = (CombinedTerm) other; + + assert comparator == o.comparator; + + for (Map.Entry<Long, LongSet> token : o.tokens.entrySet()) + { + LongSet offsets = this.tokens.get(token.getKey()); + if (offsets == null) + this.tokens.put(token.getKey(), (offsets = new LongOpenHashSet())); + + for (LongCursor offset : token.getValue()) + offsets.add(offset.value); + } + } + + public DataTerm get() + { + return term; + } + + public int compareTo(CombinedValue<DataTerm> o) + { + return term.compareTo(comparator, o.get().getTerm()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/CombinedTermIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/utils/CombinedTermIterator.java b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTermIterator.java new file mode 100644 index 0000000..06c27bf --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTermIterator.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.utils; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.index.sasi.disk.Descriptor; +import org.apache.cassandra.index.sasi.disk.OnDiskIndex; +import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder; +import org.apache.cassandra.index.sasi.sa.TermIterator; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.utils.Pair; + +public class CombinedTermIterator extends TermIterator +{ + final Descriptor descriptor; + final RangeIterator<OnDiskIndex.DataTerm, CombinedTerm> union; + final ByteBuffer min; + final ByteBuffer max; + + public CombinedTermIterator(OnDiskIndex... sas) + { + this(Descriptor.CURRENT, sas); + } + + public CombinedTermIterator(Descriptor d, OnDiskIndex... parts) + { + descriptor = d; + union = OnDiskIndexIterator.union(parts); + + AbstractType<?> comparator = parts[0].getComparator(); // assumes all SAs have same comparator + ByteBuffer minimum = parts[0].minTerm(); + ByteBuffer maximum = parts[0].maxTerm(); + + for (int i = 1; i < parts.length; i++) + { + OnDiskIndex part = parts[i]; + if (part == null) + continue; + + minimum = comparator.compare(minimum, part.minTerm()) > 0 ? part.minTerm() : minimum; + maximum = comparator.compare(maximum, part.maxTerm()) < 0 ? part.maxTerm() : maximum; + } + + min = minimum; + max = maximum; + } + + public ByteBuffer minTerm() + { + return min; + } + + public ByteBuffer maxTerm() + { + return max; + } + + protected Pair<ByteBuffer, TokenTreeBuilder> computeNext() + { + if (!union.hasNext()) + { + return endOfData(); + } + else + { + CombinedTerm term = union.next(); + return Pair.create(term.getTerm(), term.getTokenTreeBuilder()); + } + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/CombinedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/utils/CombinedValue.java b/src/java/org/apache/cassandra/index/sasi/utils/CombinedValue.java new file mode 100644 index 0000000..ca5f9be --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/utils/CombinedValue.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.utils; + +public interface CombinedValue<V> extends Comparable<CombinedValue<V>> +{ + void merge(CombinedValue<V> other); + + V get(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/MappedBuffer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/utils/MappedBuffer.java b/src/java/org/apache/cassandra/index/sasi/utils/MappedBuffer.java new file mode 100644 index 0000000..37ab1be --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/utils/MappedBuffer.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.utils; + +import java.io.Closeable; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel.MapMode; + +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.RandomAccessReader; + +import com.google.common.annotations.VisibleForTesting; + +public class MappedBuffer implements Closeable +{ + private final MappedByteBuffer[] pages; + + private long position, limit; + private final long capacity; + private final int pageSize, sizeBits; + + private MappedBuffer(MappedBuffer other) + { + this.sizeBits = other.sizeBits; + this.pageSize = other.pageSize; + this.position = other.position; + this.limit = other.limit; + this.capacity = other.capacity; + this.pages = other.pages; + } + + public MappedBuffer(RandomAccessReader file) + { + this(file.getChannel(), 30); + } + + public MappedBuffer(ChannelProxy file) + { + this(file, 30); + } + + @VisibleForTesting + protected MappedBuffer(ChannelProxy file, int numPageBits) + { + if (numPageBits > Integer.SIZE - 1) + throw new IllegalArgumentException("page size can't be bigger than 1G"); + + sizeBits = numPageBits; + pageSize = 1 << sizeBits; + position = 0; + limit = capacity = file.size(); + pages = new MappedByteBuffer[(int) (file.size() / pageSize) + 1]; + + try + { + long offset = 0; + for (int i = 0; i < pages.length; i++) + { + long pageSize = Math.min(this.pageSize, (capacity - offset)); + pages[i] = file.map(MapMode.READ_ONLY, offset, pageSize); + offset += pageSize; + } + } + finally + { + file.close(); + } + } + + public int comparePageTo(long offset, int length, AbstractType<?> comparator, ByteBuffer other) + { + return comparator.compare(getPageRegion(offset, length), other); + } + + public long capacity() + { + return capacity; + } + + public long position() + { + return position; + } + + public MappedBuffer position(long newPosition) + { + if (newPosition < 0 || newPosition > limit) + throw new IllegalArgumentException("position: " + newPosition + ", limit: " + limit); + + position = newPosition; + return this; + } + + public long limit() + { + return limit; + } + + public MappedBuffer limit(long newLimit) + { + if (newLimit < position || newLimit > capacity) + throw new IllegalArgumentException(); + + limit = newLimit; + return this; + } + + public long remaining() + { + return limit - position; + } + + public boolean hasRemaining() + { + return remaining() > 0; + } + + public byte get() + { + return get(position++); + } + + public byte get(long pos) + { + return pages[getPage(pos)].get(getPageOffset(pos)); + } + + public short getShort() + { + short value = getShort(position); + position += 2; + return value; + } + + public short getShort(long pos) + { + if (isPageAligned(pos, 2)) + return pages[getPage(pos)].getShort(getPageOffset(pos)); + + int ch1 = get(pos) & 0xff; + int ch2 = get(pos + 1) & 0xff; + return (short) ((ch1 << 8) + ch2); + } + + public int getInt() + { + int value = getInt(position); + position += 4; + return value; + } + + public int getInt(long pos) + { + if (isPageAligned(pos, 4)) + return pages[getPage(pos)].getInt(getPageOffset(pos)); + + int ch1 = get(pos) & 0xff; + int ch2 = get(pos + 1) & 0xff; + int ch3 = get(pos + 2) & 0xff; + int ch4 = get(pos + 3) & 0xff; + + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4); + } + + public long getLong() + { + long value = getLong(position); + position += 8; + return value; + } + + + public long getLong(long pos) + { + // fast path if the long could be retrieved from a single page + // that would avoid multiple expensive look-ups into page array. + return (isPageAligned(pos, 8)) + ? pages[getPage(pos)].getLong(getPageOffset(pos)) + : ((long) (getInt(pos)) << 32) + (getInt(pos + 4) & 0xFFFFFFFFL); + } + + public ByteBuffer getPageRegion(long position, int length) + { + if (!isPageAligned(position, length)) + throw new IllegalArgumentException(String.format("range: %s-%s wraps more than one page", position, length)); + + ByteBuffer slice = pages[getPage(position)].duplicate(); + + int pageOffset = getPageOffset(position); + slice.position(pageOffset).limit(pageOffset + length); + + return slice; + } + + public MappedBuffer duplicate() + { + return new MappedBuffer(this); + } + + public void close() + { + if (!FileUtils.isCleanerAvailable()) + return; + + /* + * Try forcing the unmapping of pages using undocumented unsafe sun APIs. + * If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping. + * If this works and a thread tries to access any page, hell will unleash on earth. + */ + try + { + for (MappedByteBuffer segment : pages) + FileUtils.clean(segment); + } + catch (Exception e) + { + // This is not supposed to happen + } + } + + private int getPage(long position) + { + return (int) (position >> sizeBits); + } + + private int getPageOffset(long position) + { + return (int) (position & pageSize - 1); + } + + private boolean isPageAligned(long position, int length) + { + return pageSize - (getPageOffset(position) + length) > 0; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/OnDiskIndexIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/utils/OnDiskIndexIterator.java b/src/java/org/apache/cassandra/index/sasi/utils/OnDiskIndexIterator.java new file mode 100644 index 0000000..ae97cab --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/utils/OnDiskIndexIterator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.utils; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.cassandra.index.sasi.disk.OnDiskIndex; +import org.apache.cassandra.index.sasi.disk.OnDiskIndex.DataTerm; +import org.apache.cassandra.db.marshal.AbstractType; + +public class OnDiskIndexIterator extends RangeIterator<DataTerm, CombinedTerm> +{ + private final AbstractType<?> comparator; + private final Iterator<DataTerm> terms; + + public OnDiskIndexIterator(OnDiskIndex index) + { + super(index.min(), index.max(), Long.MAX_VALUE); + + this.comparator = index.getComparator(); + this.terms = index.iterator(); + } + + public static RangeIterator<DataTerm, CombinedTerm> union(OnDiskIndex... union) + { + RangeUnionIterator.Builder<DataTerm, CombinedTerm> builder = RangeUnionIterator.builder(); + for (OnDiskIndex e : union) + { + if (e != null) + builder.add(new OnDiskIndexIterator(e)); + } + + return builder.build(); + } + + protected CombinedTerm computeNext() + { + return terms.hasNext() ? new CombinedTerm(comparator, terms.next()) : endOfData(); + } + + protected void performSkipTo(DataTerm nextToken) + { + throw new UnsupportedOperationException(); + } + + public void close() throws IOException + {} +}