http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/conf/view/PrefixTermTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/PrefixTermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/PrefixTermTree.java new file mode 100644 index 0000000..72b6daf --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/conf/view/PrefixTermTree.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.conf.view; + +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.cassandra.index.sasi.SSTableIndex; +import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; +import org.apache.cassandra.index.sasi.plan.Expression; +import org.apache.cassandra.index.sasi.utils.trie.KeyAnalyzer; +import org.apache.cassandra.index.sasi.utils.trie.PatriciaTrie; +import org.apache.cassandra.index.sasi.utils.trie.Trie; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.utils.Interval; +import org.apache.cassandra.utils.IntervalTree; + +import com.google.common.collect.Sets; + +/** + * This class is an extension over RangeTermTree for string terms, + * it is required because interval tree can't handle matching if search is on the + * prefix of min/max of the range, so for ascii/utf8 fields we build an additional + * prefix trie (including both min/max terms of the index) and do union of the results + * of the prefix tree search and results from the interval tree lookup. + */ +public class PrefixTermTree extends RangeTermTree +{ + private final OnDiskIndexBuilder.Mode mode; + private final Trie<ByteBuffer, Set<SSTableIndex>> trie; + + public PrefixTermTree(ByteBuffer min, ByteBuffer max, + Trie<ByteBuffer, Set<SSTableIndex>> trie, + IntervalTree<ByteBuffer, SSTableIndex, Interval<ByteBuffer, SSTableIndex>> ranges, + OnDiskIndexBuilder.Mode mode) + { + super(min, max, ranges); + + this.mode = mode; + this.trie = trie; + } + + public Set<SSTableIndex> search(Expression e) + { + Map<ByteBuffer, Set<SSTableIndex>> indexes = (e == null || e.lower == null || mode == OnDiskIndexBuilder.Mode.CONTAINS) + ? trie : trie.prefixMap(e.lower.value); + + Set<SSTableIndex> view = new HashSet<>(indexes.size()); + indexes.values().forEach(view::addAll); + + return Sets.union(view, super.search(e)); + } + + public static class Builder extends RangeTermTree.Builder + { + private final PatriciaTrie<ByteBuffer, Set<SSTableIndex>> trie; + + protected Builder(OnDiskIndexBuilder.Mode mode, final AbstractType<?> comparator) + { + super(mode, comparator); + trie = new PatriciaTrie<>(new ByteBufferKeyAnalyzer(comparator)); + } + + public void addIndex(SSTableIndex index) + { + super.addIndex(index); + addTerm(index.minTerm(), index); + addTerm(index.maxTerm(), index); + } + + public TermTree build() + { + return new PrefixTermTree(min, max, trie, IntervalTree.build(intervals), mode); + } + + private void addTerm(ByteBuffer term, SSTableIndex index) + { + Set<SSTableIndex> indexes = trie.get(term); + if (indexes == null) + trie.put(term, (indexes = new HashSet<>())); + + indexes.add(index); + } + } + + private static class ByteBufferKeyAnalyzer implements KeyAnalyzer<ByteBuffer> + { + private final AbstractType<?> comparator; + + public ByteBufferKeyAnalyzer(AbstractType<?> comparator) + { + this.comparator = comparator; + } + + /** + * A bit mask where the first bit is 1 and the others are zero + */ + private static final int MSB = 1 << Byte.SIZE-1; + + public int compare(ByteBuffer a, ByteBuffer b) + { + return comparator.compare(a, b); + } + + public int lengthInBits(ByteBuffer o) + { + return o.remaining() * Byte.SIZE; + } + + public boolean isBitSet(ByteBuffer key, int bitIndex) + { + if (bitIndex >= lengthInBits(key)) + return false; + + int index = bitIndex / Byte.SIZE; + int bit = bitIndex % Byte.SIZE; + return (key.get(index) & mask(bit)) != 0; + } + + public int bitIndex(ByteBuffer key, ByteBuffer otherKey) + { + int length = Math.max(key.remaining(), otherKey.remaining()); + + boolean allNull = true; + for (int i = 0; i < length; i++) + { + byte b1 = valueAt(key, i); + byte b2 = valueAt(otherKey, i); + + if (b1 != b2) + { + int xor = b1 ^ b2; + for (int j = 0; j < Byte.SIZE; j++) + { + if ((xor & mask(j)) != 0) + return (i * Byte.SIZE) + j; + } + } + + if (b1 != 0) + allNull = false; + } + + return allNull ? KeyAnalyzer.NULL_BIT_KEY : KeyAnalyzer.EQUAL_BIT_KEY; + } + + public boolean isPrefix(ByteBuffer key, ByteBuffer prefix) + { + if (key.remaining() < prefix.remaining()) + return false; + + for (int i = 0; i < prefix.remaining(); i++) + { + if (key.get(i) != prefix.get(i)) + return false; + } + + return true; + } + + /** + * Returns the {@code byte} value at the given index. + */ + private byte valueAt(ByteBuffer value, int index) + { + return index >= 0 && index < value.remaining() ? value.get(index) : 0; + } + + /** + * Returns a bit mask where the given bit is set + */ + private int mask(int bit) + { + return MSB >>> bit; + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java new file mode 100644 index 0000000..62e5636 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.conf.view; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.cassandra.index.sasi.SSTableIndex; +import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; +import org.apache.cassandra.index.sasi.plan.Expression; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.utils.Interval; +import org.apache.cassandra.utils.IntervalTree; + +public class RangeTermTree implements TermTree +{ + protected final ByteBuffer min, max; + protected final IntervalTree<ByteBuffer, SSTableIndex, Interval<ByteBuffer, SSTableIndex>> rangeTree; + + public RangeTermTree(ByteBuffer min, ByteBuffer max, IntervalTree<ByteBuffer, SSTableIndex, Interval<ByteBuffer, SSTableIndex>> rangeTree) + { + this.min = min; + this.max = max; + this.rangeTree = rangeTree; + } + + public Set<SSTableIndex> search(Expression e) + { + ByteBuffer minTerm = e.lower == null ? min : e.lower.value; + ByteBuffer maxTerm = e.upper == null ? max : e.upper.value; + + return new HashSet<>(rangeTree.search(Interval.create(minTerm, maxTerm, (SSTableIndex) null))); + } + + public int intervalCount() + { + return rangeTree.intervalCount(); + } + + static class Builder extends TermTree.Builder + { + protected final List<Interval<ByteBuffer, SSTableIndex>> intervals = new ArrayList<>(); + + protected Builder(OnDiskIndexBuilder.Mode mode, AbstractType<?> comparator) + { + super(mode, comparator); + } + + public void addIndex(SSTableIndex index) + { + intervals.add(Interval.create(index.minTerm(), index.maxTerm(), index)); + } + + public TermTree build() + { + return new RangeTermTree(min, max, IntervalTree.build(intervals)); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/conf/view/TermTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/TermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/TermTree.java new file mode 100644 index 0000000..a175e22 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/conf/view/TermTree.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.conf.view; + +import java.nio.ByteBuffer; +import java.util.Set; + +import org.apache.cassandra.index.sasi.SSTableIndex; +import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; +import org.apache.cassandra.index.sasi.plan.Expression; +import org.apache.cassandra.db.marshal.AbstractType; + +public interface TermTree +{ + Set<SSTableIndex> search(Expression e); + + int intervalCount(); + + abstract class Builder + { + protected final OnDiskIndexBuilder.Mode mode; + protected final AbstractType<?> comparator; + protected ByteBuffer min, max; + + protected Builder(OnDiskIndexBuilder.Mode mode, AbstractType<?> comparator) + { + this.mode = mode; + this.comparator = comparator; + } + + public final void add(SSTableIndex index) + { + addIndex(index); + + min = min == null || comparator.compare(min, index.minTerm()) > 0 ? index.minTerm() : min; + max = max == null || comparator.compare(max, index.maxTerm()) < 0 ? index.maxTerm() : max; + } + + protected abstract void addIndex(SSTableIndex index); + + public abstract TermTree build(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/conf/view/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/View.java b/src/java/org/apache/cassandra/index/sasi/conf/view/View.java new file mode 100644 index 0000000..378c3c6 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/conf/view/View.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.conf.view; + +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.index.sasi.SSTableIndex; +import org.apache.cassandra.index.sasi.conf.ColumnIndex; +import org.apache.cassandra.index.sasi.plan.Expression; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.Interval; +import org.apache.cassandra.utils.IntervalTree; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +public class View implements Iterable<SSTableIndex> +{ + private final Map<Descriptor, SSTableIndex> view; + + private final TermTree termTree; + private final IntervalTree<ByteBuffer, SSTableIndex, Interval<ByteBuffer, SSTableIndex>> keyIntervalTree; + + public View(ColumnIndex index, Set<SSTableIndex> indexes) + { + this(index, Collections.<SSTableIndex>emptyList(), Collections.<SSTableReader>emptyList(), indexes); + } + + public View(ColumnIndex index, + Collection<SSTableIndex> currentView, + Collection<SSTableReader> oldSSTables, + Set<SSTableIndex> newIndexes) + { + Map<Descriptor, SSTableIndex> newView = new HashMap<>(); + + AbstractType<?> validator = index.getValidator(); + TermTree.Builder termTreeBuilder = (validator instanceof AsciiType || validator instanceof UTF8Type) + ? new PrefixTermTree.Builder(index.getMode().mode, validator) + : new RangeTermTree.Builder(index.getMode().mode, validator); + + List<Interval<ByteBuffer, SSTableIndex>> keyIntervals = new ArrayList<>(); + for (SSTableIndex sstableIndex : Iterables.concat(currentView, newIndexes)) + { + SSTableReader sstable = sstableIndex.getSSTable(); + if (oldSSTables.contains(sstable) || sstable.isMarkedCompacted() || newView.containsKey(sstable.descriptor)) + { + sstableIndex.release(); + continue; + } + + newView.put(sstable.descriptor, sstableIndex); + + termTreeBuilder.add(sstableIndex); + keyIntervals.add(Interval.create(sstableIndex.minKey(), sstableIndex.maxKey(), sstableIndex)); + } + + this.view = newView; + this.termTree = termTreeBuilder.build(); + this.keyIntervalTree = IntervalTree.build(keyIntervals); + + if (keyIntervalTree.intervalCount() != termTree.intervalCount()) + throw new IllegalStateException(String.format("mismatched sizes for intervals tree for keys vs terms: %d != %d", keyIntervalTree.intervalCount(), termTree.intervalCount())); + } + + public Set<SSTableIndex> match(final Set<SSTableReader> scope, Expression expression) + { + return Sets.filter(termTree.search(expression), index -> scope.contains(index.getSSTable())); + } + + public List<SSTableIndex> match(ByteBuffer minKey, ByteBuffer maxKey) + { + return keyIntervalTree.search(Interval.create(minKey, maxKey, (SSTableIndex) null)); + } + + public Iterator<SSTableIndex> iterator() + { + return view.values().iterator(); + } + + public Collection<SSTableIndex> getIndexes() + { + return view.values(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java new file mode 100644 index 0000000..a719f50 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.disk; + +/** + * Object descriptor for SSTableAttachedSecondaryIndex files. Similar to, and based upon, the sstable descriptor. + */ +public class Descriptor +{ + public static final String VERSION_AA = "aa"; + public static final String VERSION_AB = "ab"; + public static final String CURRENT_VERSION = VERSION_AB; + public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION); + + public static class Version + { + public final String version; + + public Version(String version) + { + this.version = version; + } + + public String toString() + { + return version; + } + } + + public final Version version; + + public Descriptor(String v) + { + this.version = new Version(v); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/OnDiskBlock.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskBlock.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskBlock.java new file mode 100644 index 0000000..32cda53 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskBlock.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.disk; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.index.sasi.Term; +import org.apache.cassandra.index.sasi.utils.MappedBuffer; +import org.apache.cassandra.db.marshal.AbstractType; + +public abstract class OnDiskBlock<T extends Term> +{ + public enum BlockType + { + POINTER, DATA + } + + // this contains offsets of the terms and term data + protected final MappedBuffer blockIndex; + protected final int blockIndexSize; + + protected final boolean hasCombinedIndex; + protected final TokenTree combinedIndex; + + public OnDiskBlock(Descriptor descriptor, MappedBuffer block, BlockType blockType) + { + blockIndex = block; + + if (blockType == BlockType.POINTER) + { + hasCombinedIndex = false; + combinedIndex = null; + blockIndexSize = block.getInt() << 1; // num terms * sizeof(short) + return; + } + + long blockOffset = block.position(); + int combinedIndexOffset = block.getInt(blockOffset + OnDiskIndexBuilder.BLOCK_SIZE); + + hasCombinedIndex = (combinedIndexOffset >= 0); + long blockIndexOffset = blockOffset + OnDiskIndexBuilder.BLOCK_SIZE + 4 + combinedIndexOffset; + + combinedIndex = hasCombinedIndex ? new TokenTree(descriptor, blockIndex.duplicate().position(blockIndexOffset)) : null; + blockIndexSize = block.getInt() * 2; + } + + public SearchResult<T> search(AbstractType<?> comparator, ByteBuffer query) + { + int cmp = -1, start = 0, end = termCount() - 1, middle = 0; + + T element = null; + while (start <= end) + { + middle = start + ((end - start) >> 1); + element = getTerm(middle); + + cmp = element.compareTo(comparator, query); + if (cmp == 0) + return new SearchResult<>(element, cmp, middle); + else if (cmp < 0) + start = middle + 1; + else + end = middle - 1; + } + + return new SearchResult<>(element, cmp, middle); + } + + protected T getTerm(int index) + { + MappedBuffer dup = blockIndex.duplicate(); + long startsAt = getTermPosition(index); + if (termCount() - 1 == index) // last element + dup.position(startsAt); + else + dup.position(startsAt).limit(getTermPosition(index + 1)); + + return cast(dup); + } + + protected long getTermPosition(int idx) + { + return getTermPosition(blockIndex, idx, blockIndexSize); + } + + protected int termCount() + { + return blockIndexSize >> 1; + } + + protected abstract T cast(MappedBuffer data); + + static long getTermPosition(MappedBuffer data, int idx, int indexSize) + { + idx <<= 1; + assert idx < indexSize; + return data.position() + indexSize + data.getShort(data.position() + idx); + } + + public TokenTree getBlockIndex() + { + return combinedIndex; + } + + public int minOffset(OnDiskIndex.IteratorOrder order) + { + return order == OnDiskIndex.IteratorOrder.DESC ? 0 : termCount() - 1; + } + + public int maxOffset(OnDiskIndex.IteratorOrder order) + { + return minOffset(order) == 0 ? termCount() - 1 : 0; + } + + public static class SearchResult<T> + { + public final T result; + public final int index, cmp; + + public SearchResult(T result, int cmp, int index) + { + this.result = result; + this.index = index; + this.cmp = cmp; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java new file mode 100644 index 0000000..0f9e389 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java @@ -0,0 +1,773 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.disk; + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.stream.Collectors; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.index.sasi.Term; +import org.apache.cassandra.index.sasi.plan.Expression; +import org.apache.cassandra.index.sasi.plan.Expression.Op; +import org.apache.cassandra.index.sasi.utils.MappedBuffer; +import org.apache.cassandra.index.sasi.utils.RangeUnionIterator; +import org.apache.cassandra.index.sasi.utils.AbstractIterator; +import org.apache.cassandra.index.sasi.utils.RangeIterator; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; + +import static org.apache.cassandra.index.sasi.disk.OnDiskBlock.SearchResult; + +public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable +{ + public enum IteratorOrder + { + DESC(1), ASC(-1); + + public final int step; + + IteratorOrder(int step) + { + this.step = step; + } + + public int startAt(OnDiskBlock<DataTerm> block, Expression e) + { + switch (this) + { + case DESC: + return e.lower == null + ? 0 + : startAt(block.search(e.validator, e.lower.value), e.lower.inclusive); + + case ASC: + return e.upper == null + ? block.termCount() - 1 + : startAt(block.search(e.validator, e.upper.value), e.upper.inclusive); + + default: + throw new IllegalArgumentException("Unknown order: " + this); + } + } + + public int startAt(SearchResult<DataTerm> found, boolean inclusive) + { + switch (this) + { + case DESC: + if (found.cmp < 0) + return found.index + 1; + + return inclusive || found.cmp != 0 ? found.index : found.index + 1; + + case ASC: + if (found.cmp < 0) // search term was bigger then whole data set + return found.index; + return inclusive && (found.cmp == 0 || found.cmp < 0) ? found.index : found.index - 1; + + default: + throw new IllegalArgumentException("Unknown order: " + this); + } + } + } + + public final Descriptor descriptor; + protected final OnDiskIndexBuilder.Mode mode; + protected final OnDiskIndexBuilder.TermSize termSize; + + protected final AbstractType<?> comparator; + protected final MappedBuffer indexFile; + protected final long indexSize; + + protected final Function<Long, DecoratedKey> keyFetcher; + + protected final String indexPath; + + protected final PointerLevel[] levels; + protected final DataLevel dataLevel; + + protected final ByteBuffer minTerm, maxTerm, minKey, maxKey; + + public OnDiskIndex(File index, AbstractType<?> cmp, Function<Long, DecoratedKey> keyReader) + { + keyFetcher = keyReader; + + comparator = cmp; + indexPath = index.getAbsolutePath(); + + RandomAccessFile backingFile = null; + try + { + backingFile = new RandomAccessFile(index, "r"); + + descriptor = new Descriptor(backingFile.readUTF()); + + termSize = OnDiskIndexBuilder.TermSize.of(backingFile.readShort()); + + minTerm = ByteBufferUtil.readWithShortLength(backingFile); + maxTerm = ByteBufferUtil.readWithShortLength(backingFile); + + minKey = ByteBufferUtil.readWithShortLength(backingFile); + maxKey = ByteBufferUtil.readWithShortLength(backingFile); + + mode = OnDiskIndexBuilder.Mode.mode(backingFile.readUTF()); + + indexSize = backingFile.length(); + indexFile = new MappedBuffer(new ChannelProxy(indexPath, backingFile.getChannel())); + + // start of the levels + indexFile.position(indexFile.getLong(indexSize - 8)); + + int numLevels = indexFile.getInt(); + levels = new PointerLevel[numLevels]; + for (int i = 0; i < levels.length; i++) + { + int blockCount = indexFile.getInt(); + levels[i] = new PointerLevel(indexFile.position(), blockCount); + indexFile.position(indexFile.position() + blockCount * 8); + } + + int blockCount = indexFile.getInt(); + dataLevel = new DataLevel(indexFile.position(), blockCount); + } + catch (IOException e) + { + throw new FSReadError(e, index); + } + finally + { + FileUtils.closeQuietly(backingFile); + } + } + + public ByteBuffer minTerm() + { + return minTerm; + } + + public ByteBuffer maxTerm() + { + return maxTerm; + } + + public ByteBuffer minKey() + { + return minKey; + } + + public ByteBuffer maxKey() + { + return maxKey; + } + + public DataTerm min() + { + return dataLevel.getBlock(0).getTerm(0); + } + + public DataTerm max() + { + DataBlock block = dataLevel.getBlock(dataLevel.blockCount - 1); + return block.getTerm(block.termCount() - 1); + } + + /** + * Search for rows which match all of the terms inside the given expression in the index file. + * + * @param exp The expression to use for the query. + * + * @return Iterator which contains rows for all of the terms from the given range. + */ + public RangeIterator<Long, Token> search(Expression exp) + { + // convert single NOT_EQ to range with exclusion + final Expression expression = (exp.getOp() != Op.NOT_EQ) + ? exp + : new Expression(exp).setOp(Op.RANGE) + .setLower(new Expression.Bound(minTerm, true)) + .setUpper(new Expression.Bound(maxTerm, true)) + .addExclusion(exp.lower.value); + + List<ByteBuffer> exclusions = new ArrayList<>(expression.exclusions.size()); + + Iterables.addAll(exclusions, expression.exclusions.stream().filter(exclusion -> { + // accept only exclusions which are in the bounds of lower/upper + return !(expression.lower != null && comparator.compare(exclusion, expression.lower.value) < 0) + && !(expression.upper != null && comparator.compare(exclusion, expression.upper.value) > 0); + }).collect(Collectors.toList())); + + Collections.sort(exclusions, comparator); + + if (exclusions.size() == 0) + return searchRange(expression); + + List<Expression> ranges = new ArrayList<>(exclusions.size()); + + // calculate range splits based on the sorted exclusions + Iterator<ByteBuffer> exclusionsIterator = exclusions.iterator(); + + Expression.Bound min = expression.lower, max = null; + while (exclusionsIterator.hasNext()) + { + max = new Expression.Bound(exclusionsIterator.next(), false); + ranges.add(new Expression(expression).setOp(Op.RANGE).setLower(min).setUpper(max)); + min = max; + } + + assert max != null; + ranges.add(new Expression(expression).setOp(Op.RANGE).setLower(max).setUpper(expression.upper)); + + RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder(); + for (Expression e : ranges) + { + RangeIterator<Long, Token> range = searchRange(e); + if (range != null) + builder.add(range); + } + + return builder.build(); + } + + private RangeIterator<Long, Token> searchRange(Expression range) + { + Expression.Bound lower = range.lower; + Expression.Bound upper = range.upper; + + int lowerBlock = lower == null ? 0 : getDataBlock(lower.value); + int upperBlock = upper == null + ? dataLevel.blockCount - 1 + // optimization so we don't have to fetch upperBlock when query has lower == upper + : (lower != null && comparator.compare(lower.value, upper.value) == 0) ? lowerBlock : getDataBlock(upper.value); + + return (mode != OnDiskIndexBuilder.Mode.SPARSE || lowerBlock == upperBlock || upperBlock - lowerBlock <= 1) + ? searchPoint(lowerBlock, range) + : searchRange(lowerBlock, lower, upperBlock, upper); + } + + private RangeIterator<Long, Token> searchRange(int lowerBlock, Expression.Bound lower, int upperBlock, Expression.Bound upper) + { + // if lower is at the beginning of the block that means we can just do a single iterator per block + SearchResult<DataTerm> lowerPosition = (lower == null) ? null : searchIndex(lower.value, lowerBlock); + SearchResult<DataTerm> upperPosition = (upper == null) ? null : searchIndex(upper.value, upperBlock); + + RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder(); + + // optimistically assume that first and last blocks are full block reads, saves at least 3 'else' conditions + int firstFullBlockIdx = lowerBlock, lastFullBlockIdx = upperBlock; + + // 'lower' doesn't cover the whole block so we need to do a partial iteration + // Two reasons why that can happen: + // - 'lower' is not the first element of the block + // - 'lower' is first element but it's not inclusive in the query + if (lowerPosition != null && (lowerPosition.index > 0 || !lower.inclusive)) + { + DataBlock block = dataLevel.getBlock(lowerBlock); + int start = (lower.inclusive || lowerPosition.cmp != 0) ? lowerPosition.index : lowerPosition.index + 1; + + builder.add(block.getRange(start, block.termCount())); + firstFullBlockIdx = lowerBlock + 1; + } + + if (upperPosition != null) + { + DataBlock block = dataLevel.getBlock(upperBlock); + int lastIndex = block.termCount() - 1; + + // The save as with 'lower' but here we need to check if the upper is the last element of the block, + // which means that we only have to get individual results if: + // - if it *is not* the last element, or + // - it *is* but shouldn't be included (dictated by upperInclusive) + if (upperPosition.index != lastIndex || !upper.inclusive) + { + int end = (upperPosition.cmp < 0 || (upperPosition.cmp == 0 && upper.inclusive)) + ? upperPosition.index + 1 : upperPosition.index; + + builder.add(block.getRange(0, end)); + lastFullBlockIdx = upperBlock - 1; + } + } + + int totalSuperBlocks = (lastFullBlockIdx - firstFullBlockIdx) / OnDiskIndexBuilder.SUPER_BLOCK_SIZE; + + // if there are no super-blocks, we can simply read all of the block iterators in sequence + if (totalSuperBlocks == 0) + { + for (int i = firstFullBlockIdx; i <= lastFullBlockIdx; i++) + builder.add(dataLevel.getBlock(i).getBlockIndex().iterator(keyFetcher)); + + return builder.build(); + } + + // first get all of the blocks which are aligned before the first super-block in the sequence, + // e.g. if the block range was (1, 9) and super-block-size = 4, we need to read 1, 2, 3, 4 - 7 is covered by + // super-block, 8, 9 is a remainder. + + int superBlockAlignedStart = firstFullBlockIdx == 0 ? 0 : (int) FBUtilities.align(firstFullBlockIdx, OnDiskIndexBuilder.SUPER_BLOCK_SIZE); + for (int blockIdx = firstFullBlockIdx; blockIdx < Math.min(superBlockAlignedStart, lastFullBlockIdx); blockIdx++) + builder.add(getBlockIterator(blockIdx)); + + // now read all of the super-blocks matched by the request, from the previous comment + // it's a block with index 1 (which covers everything from 4 to 7) + + int superBlockIdx = superBlockAlignedStart / OnDiskIndexBuilder.SUPER_BLOCK_SIZE; + for (int offset = 0; offset < totalSuperBlocks - 1; offset++) + builder.add(dataLevel.getSuperBlock(superBlockIdx++).iterator()); + + // now it's time for a remainder read, again from the previous example it's 8, 9 because + // we have over-shot previous block but didn't request enough to cover next super-block. + + int lastCoveredBlock = superBlockIdx * OnDiskIndexBuilder.SUPER_BLOCK_SIZE; + for (int offset = 0; offset <= (lastFullBlockIdx - lastCoveredBlock); offset++) + builder.add(getBlockIterator(lastCoveredBlock + offset)); + + return builder.build(); + } + + private RangeIterator<Long, Token> searchPoint(int lowerBlock, Expression expression) + { + Iterator<DataTerm> terms = new TermIterator(lowerBlock, expression, IteratorOrder.DESC); + RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder(); + + while (terms.hasNext()) + { + try + { + builder.add(terms.next().getTokens()); + } + finally + { + expression.checkpoint(); + } + } + + return builder.build(); + } + + private RangeIterator<Long, Token> getBlockIterator(int blockIdx) + { + DataBlock block = dataLevel.getBlock(blockIdx); + return (block.hasCombinedIndex) + ? block.getBlockIndex().iterator(keyFetcher) + : block.getRange(0, block.termCount()); + } + + public Iterator<DataTerm> iteratorAt(ByteBuffer query, IteratorOrder order, boolean inclusive) + { + Expression e = new Expression("", comparator); + Expression.Bound bound = new Expression.Bound(query, inclusive); + + switch (order) + { + case DESC: + e.setLower(bound); + break; + + case ASC: + e.setUpper(bound); + break; + + default: + throw new IllegalArgumentException("Unknown order: " + order); + } + + return new TermIterator(levels.length == 0 ? 0 : getBlockIdx(findPointer(query), query), e, order); + } + + private int getDataBlock(ByteBuffer query) + { + return levels.length == 0 ? 0 : getBlockIdx(findPointer(query), query); + } + + public Iterator<DataTerm> iterator() + { + return new TermIterator(0, new Expression("", comparator), IteratorOrder.DESC); + } + + public void close() throws IOException + { + FileUtils.closeQuietly(indexFile); + } + + private PointerTerm findPointer(ByteBuffer query) + { + PointerTerm ptr = null; + for (PointerLevel level : levels) + { + if ((ptr = level.getPointer(ptr, query)) == null) + return null; + } + + return ptr; + } + + private SearchResult<DataTerm> searchIndex(ByteBuffer query, int blockIdx) + { + return dataLevel.getBlock(blockIdx).search(comparator, query); + } + + private int getBlockIdx(PointerTerm ptr, ByteBuffer query) + { + int blockIdx = 0; + if (ptr != null) + { + int cmp = ptr.compareTo(comparator, query); + blockIdx = (cmp == 0 || cmp > 0) ? ptr.getBlock() : ptr.getBlock() + 1; + } + + return blockIdx; + } + + protected class PointerLevel extends Level<PointerBlock> + { + public PointerLevel(long offset, int count) + { + super(offset, count); + } + + public PointerTerm getPointer(PointerTerm parent, ByteBuffer query) + { + return getBlock(getBlockIdx(parent, query)).search(comparator, query).result; + } + + protected PointerBlock cast(MappedBuffer block) + { + return new PointerBlock(block); + } + } + + protected class DataLevel extends Level<DataBlock> + { + protected final int superBlockCnt; + protected final long superBlocksOffset; + + public DataLevel(long offset, int count) + { + super(offset, count); + long baseOffset = blockOffsets + blockCount * 8; + superBlockCnt = indexFile.getInt(baseOffset); + superBlocksOffset = baseOffset + 4; + } + + protected DataBlock cast(MappedBuffer block) + { + return new DataBlock(block); + } + + public OnDiskSuperBlock getSuperBlock(int idx) + { + assert idx < superBlockCnt : String.format("requested index %d is greater than super block count %d", idx, superBlockCnt); + long blockOffset = indexFile.getLong(superBlocksOffset + idx * 8); + return new OnDiskSuperBlock(indexFile.duplicate().position(blockOffset)); + } + } + + protected class OnDiskSuperBlock + { + private final TokenTree tokenTree; + + public OnDiskSuperBlock(MappedBuffer buffer) + { + tokenTree = new TokenTree(descriptor, buffer); + } + + public RangeIterator<Long, Token> iterator() + { + return tokenTree.iterator(keyFetcher); + } + } + + protected abstract class Level<T extends OnDiskBlock> + { + protected final long blockOffsets; + protected final int blockCount; + + public Level(long offsets, int count) + { + this.blockOffsets = offsets; + this.blockCount = count; + } + + public T getBlock(int idx) throws FSReadError + { + assert idx >= 0 && idx < blockCount; + + // calculate block offset and move there + // (long is intentional, we'll just need mmap implementation which supports long positions) + long blockOffset = indexFile.getLong(blockOffsets + idx * 8); + return cast(indexFile.duplicate().position(blockOffset)); + } + + protected abstract T cast(MappedBuffer block); + } + + protected class DataBlock extends OnDiskBlock<DataTerm> + { + public DataBlock(MappedBuffer data) + { + super(descriptor, data, BlockType.DATA); + } + + protected DataTerm cast(MappedBuffer data) + { + return new DataTerm(data, termSize, getBlockIndex()); + } + + public RangeIterator<Long, Token> getRange(int start, int end) + { + RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder(); + NavigableMap<Long, Token> sparse = new TreeMap<>(); + + for (int i = start; i < end; i++) + { + DataTerm term = getTerm(i); + + if (term.isSparse()) + { + NavigableMap<Long, Token> tokens = term.getSparseTokens(); + for (Map.Entry<Long, Token> t : tokens.entrySet()) + { + Token token = sparse.get(t.getKey()); + if (token == null) + sparse.put(t.getKey(), t.getValue()); + else + token.merge(t.getValue()); + } + } + else + { + builder.add(term.getTokens()); + } + } + + PrefetchedTokensIterator prefetched = sparse.isEmpty() ? null : new PrefetchedTokensIterator(sparse); + + if (builder.rangeCount() == 0) + return prefetched; + + builder.add(prefetched); + return builder.build(); + } + } + + protected class PointerBlock extends OnDiskBlock<PointerTerm> + { + public PointerBlock(MappedBuffer block) + { + super(descriptor, block, BlockType.POINTER); + } + + protected PointerTerm cast(MappedBuffer data) + { + return new PointerTerm(data, termSize); + } + } + + public class DataTerm extends Term implements Comparable<DataTerm> + { + private final TokenTree perBlockIndex; + + protected DataTerm(MappedBuffer content, OnDiskIndexBuilder.TermSize size, TokenTree perBlockIndex) + { + super(content, size); + this.perBlockIndex = perBlockIndex; + } + + public RangeIterator<Long, Token> getTokens() + { + final long blockEnd = FBUtilities.align(content.position(), OnDiskIndexBuilder.BLOCK_SIZE); + + if (isSparse()) + return new PrefetchedTokensIterator(getSparseTokens()); + + long offset = blockEnd + 4 + content.getInt(getDataOffset() + 1); + return new TokenTree(descriptor, indexFile.duplicate().position(offset)).iterator(keyFetcher); + } + + public boolean isSparse() + { + return content.get(getDataOffset()) > 0; + } + + public NavigableMap<Long, Token> getSparseTokens() + { + long ptrOffset = getDataOffset(); + + byte size = content.get(ptrOffset); + + assert size > 0; + + NavigableMap<Long, Token> individualTokens = new TreeMap<>(); + for (int i = 0; i < size; i++) + { + Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + (8 * i)), keyFetcher); + + assert token != null; + individualTokens.put(token.get(), token); + } + + return individualTokens; + } + + public int compareTo(DataTerm other) + { + return other == null ? 1 : compareTo(comparator, other.getTerm()); + } + } + + protected static class PointerTerm extends Term + { + public PointerTerm(MappedBuffer content, OnDiskIndexBuilder.TermSize size) + { + super(content, size); + } + + public int getBlock() + { + return content.getInt(getDataOffset()); + } + } + + private static class PrefetchedTokensIterator extends RangeIterator<Long, Token> + { + private final NavigableMap<Long, Token> tokens; + private PeekingIterator<Token> currentIterator; + + public PrefetchedTokensIterator(NavigableMap<Long, Token> tokens) + { + super(tokens.firstKey(), tokens.lastKey(), tokens.size()); + this.tokens = tokens; + this.currentIterator = Iterators.peekingIterator(tokens.values().iterator()); + } + + protected Token computeNext() + { + return currentIterator != null && currentIterator.hasNext() + ? currentIterator.next() + : endOfData(); + } + + protected void performSkipTo(Long nextToken) + { + currentIterator = Iterators.peekingIterator(tokens.tailMap(nextToken, true).values().iterator()); + } + + public void close() throws IOException + { + endOfData(); + } + } + + public AbstractType<?> getComparator() + { + return comparator; + } + + public String getIndexPath() + { + return indexPath; + } + + private class TermIterator extends AbstractIterator<DataTerm> + { + private final Expression e; + private final IteratorOrder order; + + protected OnDiskBlock<DataTerm> currentBlock; + protected int blockIndex, offset; + + private boolean checkLower = true, checkUpper = true; + + public TermIterator(int startBlock, Expression expression, IteratorOrder order) + { + this.e = expression; + this.order = order; + this.blockIndex = startBlock; + + nextBlock(); + } + + protected DataTerm computeNext() + { + for (;;) + { + if (currentBlock == null) + return endOfData(); + + if (offset >= 0 && offset < currentBlock.termCount()) + { + DataTerm currentTerm = currentBlock.getTerm(nextOffset()); + + if (checkLower && !e.isLowerSatisfiedBy(currentTerm)) + continue; + + // flip the flag right on the first bounds match + // to avoid expensive comparisons + checkLower = false; + + if (checkUpper && !e.isUpperSatisfiedBy(currentTerm)) + return endOfData(); + + return currentTerm; + } + + nextBlock(); + } + } + + protected void nextBlock() + { + currentBlock = null; + + if (blockIndex < 0 || blockIndex >= dataLevel.blockCount) + return; + + currentBlock = dataLevel.getBlock(nextBlockIndex()); + offset = checkLower ? order.startAt(currentBlock, e) : currentBlock.minOffset(order); + + // let's check the last term of the new block right away + // if expression's upper bound is satisfied by it such means that we can avoid + // doing any expensive upper bound checks for that block. + checkUpper = e.hasUpper() && !e.isUpperSatisfiedBy(currentBlock.getTerm(currentBlock.maxOffset(order))); + } + + protected int nextBlockIndex() + { + int current = blockIndex; + blockIndex += order.step; + return current; + } + + protected int nextOffset() + { + int current = offset; + offset += order.step; + return current; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java new file mode 100644 index 0000000..7b8f5c9 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java @@ -0,0 +1,627 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.disk; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.index.sasi.sa.IntegralSA; +import org.apache.cassandra.index.sasi.sa.SA; +import org.apache.cassandra.index.sasi.sa.TermIterator; +import org.apache.cassandra.index.sasi.sa.SuffixSA; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.util.*; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +import com.carrotsearch.hppc.LongArrayList; +import com.carrotsearch.hppc.LongSet; +import com.carrotsearch.hppc.ShortArrayList; +import com.google.common.annotations.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OnDiskIndexBuilder +{ + private static final Logger logger = LoggerFactory.getLogger(OnDiskIndexBuilder.class); + + public enum Mode + { + PREFIX, CONTAINS, SPARSE; + + public static Mode mode(String mode) + { + return Mode.valueOf(mode.toUpperCase()); + } + } + + public enum TermSize + { + INT(4), LONG(8), UUID(16), VARIABLE(-1); + + public final int size; + + TermSize(int size) + { + this.size = size; + } + + public boolean isConstant() + { + return this != VARIABLE; + } + + public static TermSize of(int size) + { + switch (size) + { + case -1: + return VARIABLE; + + case 4: + return INT; + + case 8: + return LONG; + + case 16: + return UUID; + + default: + throw new IllegalStateException("unknown state: " + size); + } + } + + public static TermSize sizeOf(AbstractType<?> comparator) + { + if (comparator instanceof Int32Type || comparator instanceof FloatType) + return INT; + + if (comparator instanceof LongType || comparator instanceof DoubleType + || comparator instanceof TimestampType || comparator instanceof DateType) + return LONG; + + if (comparator instanceof TimeUUIDType || comparator instanceof UUIDType) + return UUID; + + return VARIABLE; + } + } + + public static final int BLOCK_SIZE = 4096; + public static final int MAX_TERM_SIZE = 1024; + public static final int SUPER_BLOCK_SIZE = 64; + + private final List<MutableLevel<InMemoryPointerTerm>> levels = new ArrayList<>(); + private MutableLevel<InMemoryDataTerm> dataLevel; + + private final TermSize termSize; + + private final AbstractType<?> keyComparator, termComparator; + + private final Map<ByteBuffer, TokenTreeBuilder> terms; + private final Mode mode; + + private ByteBuffer minKey, maxKey; + private long estimatedBytes; + + public OnDiskIndexBuilder(AbstractType<?> keyComparator, AbstractType<?> comparator, Mode mode) + { + this.keyComparator = keyComparator; + this.termComparator = comparator; + this.terms = new HashMap<>(); + this.termSize = TermSize.sizeOf(comparator); + this.mode = mode; + } + + public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long keyPosition) + { + if (term.remaining() >= MAX_TERM_SIZE) + { + logger.error("Rejecting value (value size {}, maximum size {} bytes).", term.remaining(), Short.MAX_VALUE); + return this; + } + + TokenTreeBuilder tokens = terms.get(term); + if (tokens == null) + { + terms.put(term, (tokens = new TokenTreeBuilder())); + + // on-heap size estimates from jol + // 64 bytes for TTB + 48 bytes for TreeMap in TTB + size bytes for the term (map key) + estimatedBytes += 64 + 48 + term.remaining(); + } + + tokens.add((Long) key.getToken().getTokenValue(), keyPosition); + + // calculate key range (based on actual key values) for current index + minKey = (minKey == null || keyComparator.compare(minKey, key.getKey()) > 0) ? key.getKey() : minKey; + maxKey = (maxKey == null || keyComparator.compare(maxKey, key.getKey()) < 0) ? key.getKey() : maxKey; + + // 60 ((boolean(1)*4) + (long(8)*4) + 24) bytes for the LongOpenHashSet created when the keyPosition was added + // + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key). + // in the case of hash collision for the token we may overestimate but this is extremely rare + estimatedBytes += 60 + 40 + 8; + + return this; + } + + public long estimatedMemoryUse() + { + return estimatedBytes; + } + + private void addTerm(InMemoryDataTerm term, SequentialWriter out) throws IOException + { + InMemoryPointerTerm ptr = dataLevel.add(term); + if (ptr == null) + return; + + int levelIdx = 0; + for (;;) + { + MutableLevel<InMemoryPointerTerm> level = getIndexLevel(levelIdx++, out); + if ((ptr = level.add(ptr)) == null) + break; + } + } + + public boolean isEmpty() + { + return terms.isEmpty(); + } + + public void finish(Pair<ByteBuffer, ByteBuffer> range, File file, TermIterator terms) + { + finish(Descriptor.CURRENT, range, file, terms); + } + + /** + * Finishes up index building process by creating/populating index file. + * + * @param indexFile The file to write index contents to. + * + * @return true if index was written successfully, false otherwise (e.g. if index was empty). + * + * @throws FSWriteError on I/O error. + */ + public boolean finish(File indexFile) throws FSWriteError + { + return finish(Descriptor.CURRENT, indexFile); + } + + @VisibleForTesting + protected boolean finish(Descriptor descriptor, File file) throws FSWriteError + { + // no terms means there is nothing to build + if (terms.isEmpty()) + return false; + + // split terms into suffixes only if it's text, otherwise (even if CONTAINS is set) use terms in original form + SA sa = ((termComparator instanceof UTF8Type || termComparator instanceof AsciiType) && mode == Mode.CONTAINS) + ? new SuffixSA(termComparator, mode) : new IntegralSA(termComparator, mode); + + for (Map.Entry<ByteBuffer, TokenTreeBuilder> term : terms.entrySet()) + sa.add(term.getKey(), term.getValue()); + + finish(descriptor, Pair.create(minKey, maxKey), file, sa.finish()); + return true; + } + + protected void finish(Descriptor descriptor, Pair<ByteBuffer, ByteBuffer> range, File file, TermIterator terms) + { + SequentialWriter out = null; + + try + { + out = new SequentialWriter(file, BLOCK_SIZE, BufferType.ON_HEAP); + + out.writeUTF(descriptor.version.toString()); + + out.writeShort(termSize.size); + + // min, max term (useful to find initial scan range from search expressions) + ByteBufferUtil.writeWithShortLength(terms.minTerm(), out); + ByteBufferUtil.writeWithShortLength(terms.maxTerm(), out); + + // min, max keys covered by index (useful when searching across multiple indexes) + ByteBufferUtil.writeWithShortLength(range.left, out); + ByteBufferUtil.writeWithShortLength(range.right, out); + + out.writeUTF(mode.toString()); + + out.skipBytes((int) (BLOCK_SIZE - out.position())); + + dataLevel = mode == Mode.SPARSE ? new DataBuilderLevel(out, new MutableDataBlock(mode)) + : new MutableLevel<>(out, new MutableDataBlock(mode)); + while (terms.hasNext()) + { + Pair<ByteBuffer, TokenTreeBuilder> term = terms.next(); + addTerm(new InMemoryDataTerm(term.left, term.right), out); + } + + dataLevel.finalFlush(); + for (MutableLevel l : levels) + l.flush(); // flush all of the buffers + + // and finally write levels index + final long levelIndexPosition = out.position(); + + out.writeInt(levels.size()); + for (int i = levels.size() - 1; i >= 0; i--) + levels.get(i).flushMetadata(); + + dataLevel.flushMetadata(); + + out.writeLong(levelIndexPosition); + + // sync contents of the output and disk, + // since it's not done implicitly on close + out.sync(); + } + catch (IOException e) + { + throw new FSWriteError(e, file); + } + finally + { + FileUtils.closeQuietly(out); + } + } + + private MutableLevel<InMemoryPointerTerm> getIndexLevel(int idx, SequentialWriter out) + { + if (levels.size() == 0) + levels.add(new MutableLevel<>(out, new MutableBlock<>())); + + if (levels.size() - 1 < idx) + { + int toAdd = idx - (levels.size() - 1); + for (int i = 0; i < toAdd; i++) + levels.add(new MutableLevel<>(out, new MutableBlock<>())); + } + + return levels.get(idx); + } + + protected static void alignToBlock(SequentialWriter out) throws IOException + { + long endOfBlock = out.position(); + if ((endOfBlock & (BLOCK_SIZE - 1)) != 0) // align on the block boundary if needed + out.skipBytes((int) (FBUtilities.align(endOfBlock, BLOCK_SIZE) - endOfBlock)); + } + + private class InMemoryTerm + { + protected final ByteBuffer term; + + public InMemoryTerm(ByteBuffer term) + { + this.term = term; + } + + public int serializedSize() + { + return (termSize.isConstant() ? 0 : 2) + term.remaining(); + } + + public void serialize(DataOutputPlus out) throws IOException + { + if (termSize.isConstant()) + out.write(term); + else + ByteBufferUtil.writeWithShortLength(term, out); + } + } + + private class InMemoryPointerTerm extends InMemoryTerm + { + protected final int blockCnt; + + public InMemoryPointerTerm(ByteBuffer term, int blockCnt) + { + super(term); + this.blockCnt = blockCnt; + } + + public int serializedSize() + { + return super.serializedSize() + 4; + } + + public void serialize(DataOutputPlus out) throws IOException + { + super.serialize(out); + out.writeInt(blockCnt); + } + } + + private class InMemoryDataTerm extends InMemoryTerm + { + private final TokenTreeBuilder keys; + + public InMemoryDataTerm(ByteBuffer term, TokenTreeBuilder keys) + { + super(term); + this.keys = keys; + } + } + + private class MutableLevel<T extends InMemoryTerm> + { + private final LongArrayList blockOffsets = new LongArrayList(); + + protected final SequentialWriter out; + + private final MutableBlock<T> inProcessBlock; + private InMemoryPointerTerm lastTerm; + + public MutableLevel(SequentialWriter out, MutableBlock<T> block) + { + this.out = out; + this.inProcessBlock = block; + } + + /** + * @return If we flushed a block, return the last term of that block; else, null. + */ + public InMemoryPointerTerm add(T term) throws IOException + { + InMemoryPointerTerm toPromote = null; + + if (!inProcessBlock.hasSpaceFor(term)) + { + flush(); + toPromote = lastTerm; + } + + inProcessBlock.add(term); + + lastTerm = new InMemoryPointerTerm(term.term, blockOffsets.size()); + return toPromote; + } + + public void flush() throws IOException + { + blockOffsets.add(out.position()); + inProcessBlock.flushAndClear(out); + } + + public void finalFlush() throws IOException + { + flush(); + } + + public void flushMetadata() throws IOException + { + flushMetadata(blockOffsets); + } + + protected void flushMetadata(LongArrayList longArrayList) throws IOException + { + out.writeInt(longArrayList.size()); + for (int i = 0; i < longArrayList.size(); i++) + out.writeLong(longArrayList.get(i)); + } + } + + /** builds standard data blocks and super blocks, as well */ + private class DataBuilderLevel extends MutableLevel<InMemoryDataTerm> + { + private final LongArrayList superBlockOffsets = new LongArrayList(); + + /** count of regular data blocks written since current super block was init'd */ + private int dataBlocksCnt; + private TokenTreeBuilder superBlockTree; + + public DataBuilderLevel(SequentialWriter out, MutableBlock<InMemoryDataTerm> block) + { + super(out, block); + superBlockTree = new TokenTreeBuilder(); + } + + public InMemoryPointerTerm add(InMemoryDataTerm term) throws IOException + { + InMemoryPointerTerm ptr = super.add(term); + if (ptr != null) + { + dataBlocksCnt++; + flushSuperBlock(false); + } + superBlockTree.add(term.keys.getTokens()); + return ptr; + } + + public void flushSuperBlock(boolean force) throws IOException + { + if (dataBlocksCnt == SUPER_BLOCK_SIZE || (force && !superBlockTree.getTokens().isEmpty())) + { + superBlockOffsets.add(out.position()); + superBlockTree.finish().write(out); + alignToBlock(out); + + dataBlocksCnt = 0; + superBlockTree = new TokenTreeBuilder(); + } + } + + public void finalFlush() throws IOException + { + super.flush(); + flushSuperBlock(true); + } + + public void flushMetadata() throws IOException + { + super.flushMetadata(); + flushMetadata(superBlockOffsets); + } + } + + private static class MutableBlock<T extends InMemoryTerm> + { + protected final DataOutputBufferFixed buffer; + protected final ShortArrayList offsets; + + public MutableBlock() + { + buffer = new DataOutputBufferFixed(BLOCK_SIZE); + offsets = new ShortArrayList(); + } + + public final void add(T term) throws IOException + { + offsets.add((short) buffer.position()); + addInternal(term); + } + + protected void addInternal(T term) throws IOException + { + term.serialize(buffer); + } + + public boolean hasSpaceFor(T element) + { + return sizeAfter(element) < BLOCK_SIZE; + } + + protected int sizeAfter(T element) + { + return getWatermark() + 4 + element.serializedSize(); + } + + protected int getWatermark() + { + return 4 + offsets.size() * 2 + (int) buffer.position(); + } + + public void flushAndClear(SequentialWriter out) throws IOException + { + out.writeInt(offsets.size()); + for (int i = 0; i < offsets.size(); i++) + out.writeShort(offsets.get(i)); + + out.write(buffer.buffer()); + + alignToBlock(out); + + offsets.clear(); + buffer.clear(); + } + } + + private static class MutableDataBlock extends MutableBlock<InMemoryDataTerm> + { + private final Mode mode; + + private int offset = 0; + private int sparseValueTerms = 0; + + private final List<TokenTreeBuilder> containers = new ArrayList<>(); + private TokenTreeBuilder combinedIndex; + + public MutableDataBlock(Mode mode) + { + this.mode = mode; + this.combinedIndex = new TokenTreeBuilder(); + } + + protected void addInternal(InMemoryDataTerm term) throws IOException + { + TokenTreeBuilder keys = term.keys; + + if (mode == Mode.SPARSE && keys.getTokenCount() <= 5) + { + writeTerm(term, keys); + sparseValueTerms++; + } + else + { + writeTerm(term, offset); + + offset += keys.serializedSize(); + containers.add(keys); + } + + if (mode == Mode.SPARSE) + combinedIndex.add(keys.getTokens()); + } + + protected int sizeAfter(InMemoryDataTerm element) + { + return super.sizeAfter(element) + ptrLength(element); + } + + public void flushAndClear(SequentialWriter out) throws IOException + { + super.flushAndClear(out); + + out.writeInt((sparseValueTerms == 0) ? -1 : offset); + + if (containers.size() > 0) + { + for (TokenTreeBuilder tokens : containers) + tokens.write(out); + } + + if (sparseValueTerms > 0) + { + combinedIndex.finish().write(out); + } + + alignToBlock(out); + + containers.clear(); + combinedIndex = new TokenTreeBuilder(); + + offset = 0; + sparseValueTerms = 0; + } + + private int ptrLength(InMemoryDataTerm term) + { + return (term.keys.getTokenCount() > 5) + ? 5 // 1 byte type + 4 byte offset to the tree + : 1 + (8 * (int) term.keys.getTokenCount()); // 1 byte size + n 8 byte tokens + } + + private void writeTerm(InMemoryTerm term, TokenTreeBuilder keys) throws IOException + { + term.serialize(buffer); + buffer.writeByte((byte) keys.getTokenCount()); + + Iterator<Pair<Long, LongSet>> tokens = keys.iterator(); + while (tokens.hasNext()) + buffer.writeLong(tokens.next().left); + } + + private void writeTerm(InMemoryTerm term, int offset) throws IOException + { + term.serialize(buffer); + buffer.writeByte(0x0); + buffer.writeInt(offset); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java new file mode 100644 index 0000000..6e63c71 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.disk; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.*; + +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer; +import org.apache.cassandra.index.sasi.conf.ColumnIndex; +import org.apache.cassandra.index.sasi.utils.CombinedTermIterator; +import org.apache.cassandra.index.sasi.utils.TypeUtil; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.Uninterruptibles; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PerSSTableIndexWriter implements SSTableFlushObserver +{ + private static final Logger logger = LoggerFactory.getLogger(PerSSTableIndexWriter.class); + + private static final ThreadPoolExecutor INDEX_FLUSHER_MEMTABLE; + private static final ThreadPoolExecutor INDEX_FLUSHER_GENERAL; + + static + { + INDEX_FLUSHER_GENERAL = new JMXEnabledThreadPoolExecutor(1, 8, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("SASI-General"), + "internal"); + INDEX_FLUSHER_GENERAL.allowCoreThreadTimeOut(true); + + INDEX_FLUSHER_MEMTABLE = new JMXEnabledThreadPoolExecutor(1, 8, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("SASI-Memtable"), + "internal"); + INDEX_FLUSHER_MEMTABLE.allowCoreThreadTimeOut(true); + } + + private final int nowInSec = FBUtilities.nowInSeconds(); + + private final Descriptor descriptor; + private final OperationType source; + + private final AbstractType<?> keyValidator; + private final Map<ColumnDefinition, ColumnIndex> supportedIndexes; + + @VisibleForTesting + protected final Map<ColumnDefinition, Index> indexes; + + private DecoratedKey currentKey; + private long currentKeyPosition; + private boolean isComplete; + + public PerSSTableIndexWriter(AbstractType<?> keyValidator, + Descriptor descriptor, + OperationType source, + Map<ColumnDefinition, ColumnIndex> supportedIndexes) + { + this.keyValidator = keyValidator; + this.descriptor = descriptor; + this.source = source; + this.supportedIndexes = supportedIndexes; + this.indexes = new HashMap<>(); + } + + public void begin() + {} + + public void startPartition(DecoratedKey key, long curPosition) + { + currentKey = key; + currentKeyPosition = curPosition; + } + + public void nextUnfilteredCluster(Unfiltered unfiltered) + { + if (!unfiltered.isRow()) + return; + + Row row = (Row) unfiltered; + + supportedIndexes.keySet().forEach((column) -> { + ByteBuffer value = ColumnIndex.getValueOf(column, row, nowInSec); + if (value == null) + return; + + ColumnIndex columnIndex = supportedIndexes.get(column); + if (columnIndex == null) + return; + + Index index = indexes.get(column); + if (index == null) + indexes.put(column, (index = new Index(columnIndex))); + + index.add(value.duplicate(), currentKey, currentKeyPosition); + }); + } + + public void complete() + { + if (isComplete) + return; + + currentKey = null; + + try + { + CountDownLatch latch = new CountDownLatch(indexes.size()); + for (Index index : indexes.values()) + index.complete(latch); + + Uninterruptibles.awaitUninterruptibly(latch); + } + finally + { + indexes.clear(); + isComplete = true; + } + } + + public Index getIndex(ColumnDefinition columnDef) + { + return indexes.get(columnDef); + } + + public Descriptor getDescriptor() + { + return descriptor; + } + + @VisibleForTesting + protected class Index + { + private final ColumnIndex columnIndex; + private final String outputFile; + private final AbstractAnalyzer analyzer; + private final long maxMemorySize; + + @VisibleForTesting + protected final Set<Future<OnDiskIndex>> segments; + private int segmentNumber = 0; + + private OnDiskIndexBuilder currentBuilder; + + public Index(ColumnIndex columnIndex) + { + this.columnIndex = columnIndex; + this.outputFile = descriptor.filenameFor(columnIndex.getComponent()); + this.analyzer = columnIndex.getAnalyzer(); + this.segments = new HashSet<>(); + this.maxMemorySize = maxMemorySize(columnIndex); + this.currentBuilder = newIndexBuilder(); + } + + public void add(ByteBuffer term, DecoratedKey key, long keyPosition) + { + if (term.remaining() == 0) + return; + + boolean isAdded = false; + + analyzer.reset(term); + while (analyzer.hasNext()) + { + ByteBuffer token = analyzer.next(); + int size = token.remaining(); + + if (token.remaining() >= OnDiskIndexBuilder.MAX_TERM_SIZE) + { + logger.info("Rejecting value (size {}, maximum {} bytes) for column {} (analyzed {}) at {} SSTable.", + term.remaining(), + OnDiskIndexBuilder.MAX_TERM_SIZE, + columnIndex.getColumnName(), + columnIndex.getMode().isAnalyzed, + descriptor); + continue; + } + + if (!TypeUtil.isValid(token, columnIndex.getValidator())) + { + if ((token = TypeUtil.tryUpcast(token, columnIndex.getValidator())) == null) + { + logger.info("({}) Failed to add {} to index for key: {}, value size was {} bytes, validator is {}.", + outputFile, + columnIndex.getColumnName(), + keyValidator.getString(key.getKey()), + size, + columnIndex.getValidator()); + continue; + } + } + + currentBuilder.add(token, key, keyPosition); + isAdded = true; + } + + if (!isAdded || currentBuilder.estimatedMemoryUse() < maxMemorySize) + return; // non of the generated tokens were added to the index or memory size wasn't reached + + segments.add(getExecutor().submit(scheduleSegmentFlush(false))); + } + + @VisibleForTesting + protected Callable<OnDiskIndex> scheduleSegmentFlush(final boolean isFinal) + { + final OnDiskIndexBuilder builder = currentBuilder; + currentBuilder = newIndexBuilder(); + + final String segmentFile = filename(isFinal); + + return () -> { + long start1 = System.nanoTime(); + + try + { + File index = new File(segmentFile); + return builder.finish(index) ? new OnDiskIndex(index, columnIndex.getValidator(), null) : null; + } + finally + { + if (!isFinal) + logger.info("Flushed index segment {}, took {} ms.", segmentFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start1)); + } + }; + } + + public void complete(final CountDownLatch latch) + { + logger.info("Scheduling index flush to {}", outputFile); + + getExecutor().submit((Runnable) () -> { + long start1 = System.nanoTime(); + + OnDiskIndex[] parts = new OnDiskIndex[segments.size() + 1]; + + try + { + // no parts present, build entire index from memory + if (segments.isEmpty()) + { + scheduleSegmentFlush(true).call(); + return; + } + + // parts are present but there is something still in memory, let's flush that inline + if (!currentBuilder.isEmpty()) + { + OnDiskIndex last = scheduleSegmentFlush(false).call(); + segments.add(Futures.immediateFuture(last)); + } + + int index = 0; + ByteBuffer combinedMin = null, combinedMax = null; + + for (Future<OnDiskIndex> f : segments) + { + OnDiskIndex part = Futures.getUnchecked(f); + if (part == null) + continue; + + parts[index++] = part; + combinedMin = (combinedMin == null || keyValidator.compare(combinedMin, part.minKey()) > 0) ? part.minKey() : combinedMin; + combinedMax = (combinedMax == null || keyValidator.compare(combinedMax, part.maxKey()) < 0) ? part.maxKey() : combinedMax; + } + + OnDiskIndexBuilder builder = newIndexBuilder(); + builder.finish(Pair.create(combinedMin, combinedMax), + new File(outputFile), + new CombinedTermIterator(parts)); + } + catch (Exception e) + { + logger.error("Failed to flush index {}.", outputFile, e); + FileUtils.delete(outputFile); + } + finally + { + logger.info("Index flush to {} took {} ms.", outputFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start1)); + + for (OnDiskIndex part : parts) + { + if (part == null) + continue; + + FileUtils.closeQuietly(part); + FileUtils.delete(part.getIndexPath()); + } + + latch.countDown(); + } + }); + } + + private ExecutorService getExecutor() + { + return source == OperationType.FLUSH ? INDEX_FLUSHER_MEMTABLE : INDEX_FLUSHER_GENERAL; + } + + private OnDiskIndexBuilder newIndexBuilder() + { + return new OnDiskIndexBuilder(keyValidator, columnIndex.getValidator(), columnIndex.getMode().mode); + } + + public String filename(boolean isFinal) + { + return outputFile + (isFinal ? "" : "_" + segmentNumber++); + } + } + + protected long maxMemorySize(ColumnIndex columnIndex) + { + // 1G for memtable and configuration for compaction + return source == OperationType.FLUSH ? 1073741824L : columnIndex.getMode().maxCompactionFlushMemoryInMb; + } + + public int hashCode() + { + return descriptor.hashCode(); + } + + public boolean equals(Object o) + { + return !(o == null || !(o instanceof PerSSTableIndexWriter)) && descriptor.equals(((PerSSTableIndexWriter) o).descriptor); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/Token.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Token.java b/src/java/org/apache/cassandra/index/sasi/disk/Token.java new file mode 100644 index 0000000..02130a3 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/disk/Token.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.disk; + +import com.google.common.primitives.Longs; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.index.sasi.utils.CombinedValue; + +public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKey> +{ + protected final long token; + + public Token(long token) + { + this.token = token; + } + + public Long get() + { + return token; + } + + public int compareTo(CombinedValue<Long> o) + { + return Longs.compare(token, ((Token) o).token); + } +}