Avoid index segment stitching in RAM which lead to OOM on big SSTable files
patch by jrwest and xedin; reviewed by xedin for CASSANDRA-11383 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5c4d5c73 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5c4d5c73 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5c4d5c73 Branch: refs/heads/trunk Commit: 5c4d5c731f1299ba310c81603914a1a8956e644c Parents: f6c5d72 Author: Jordan West <jorda...@gmail.com> Authored: Mon Mar 21 12:00:31 2016 -0700 Committer: Pavel Yaskevich <xe...@apache.org> Committed: Sun Mar 27 15:21:16 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../sasi/disk/AbstractTokenTreeBuilder.java | 672 ++++++++++++++++ .../sasi/disk/DynamicTokenTreeBuilder.java | 189 +++++ .../index/sasi/disk/OnDiskIndexBuilder.java | 52 +- .../index/sasi/disk/PerSSTableIndexWriter.java | 37 +- .../index/sasi/disk/StaticTokenTreeBuilder.java | 266 ++++++ .../apache/cassandra/index/sasi/disk/Token.java | 5 + .../cassandra/index/sasi/disk/TokenTree.java | 6 +- .../index/sasi/disk/TokenTreeBuilder.java | 805 +------------------ .../index/sasi/memory/KeyRangeIterator.java | 11 + .../cassandra/index/sasi/sa/SuffixSA.java | 7 +- .../index/sasi/utils/CombinedTerm.java | 46 +- .../index/sasi/disk/OnDiskIndexTest.java | 20 +- .../sasi/disk/PerSSTableIndexWriterTest.java | 90 +++ .../index/sasi/disk/TokenTreeTest.java | 217 +++-- .../index/sasi/utils/LongIterator.java | 8 + 16 files changed, 1482 insertions(+), 950 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f86c91f..2907df9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.5 + * Avoid index segment stitching in RAM which lead to OOM on big SSTable files (CASSANDRA-11383) * Fix clustering and row filters for LIKE queries on clustering columns (CASSANDRA-11397) Merged from 3.0: * Enable SO_REUSEADDR for JMX RMI server sockets (CASSANDRA-11093) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java new file mode 100644 index 0000000..4e93b2b --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java @@ -0,0 +1,672 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sasi.disk; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.AbstractIterator; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +import com.carrotsearch.hppc.LongArrayList; +import com.carrotsearch.hppc.LongSet; +import com.carrotsearch.hppc.cursors.LongCursor; + +public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder +{ + protected int numBlocks; + protected Node root; + protected InteriorNode rightmostParent; + protected Leaf leftmostLeaf; + protected Leaf rightmostLeaf; + protected long tokenCount = 0; + protected long treeMinToken; + protected long treeMaxToken; + + public void add(TokenTreeBuilder other) + { + add(other.iterator()); + } + + public TokenTreeBuilder finish() + { + if (root == null) + constructTree(); + + return this; + } + + public long getTokenCount() + { + return tokenCount; + } + + public int serializedSize() + { + if (numBlocks == 1) + return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16)); + else + return numBlocks * BLOCK_BYTES; + } + + public void write(DataOutputPlus out) throws IOException + { + ByteBuffer blockBuffer = ByteBuffer.allocate(BLOCK_BYTES); + Iterator<Node> levelIterator = root.levelIterator(); + long childBlockIndex = 1; + + while (levelIterator != null) + { + Node firstChild = null; + while (levelIterator.hasNext()) + { + Node block = levelIterator.next(); + + if (firstChild == null && !block.isLeaf()) + firstChild = ((InteriorNode) block).children.get(0); + + if (block.isSerializable()) + { + block.serialize(childBlockIndex, blockBuffer); + flushBuffer(blockBuffer, out, numBlocks != 1); + } + + childBlockIndex += block.childCount(); + } + + levelIterator = (firstChild == null) ? null : firstChild.levelIterator(); + } + } + + protected abstract void constructTree(); + + protected void flushBuffer(ByteBuffer buffer, DataOutputPlus o, boolean align) throws IOException + { + // seek to end of last block before flushing + if (align) + alignBuffer(buffer, BLOCK_BYTES); + + buffer.flip(); + o.write(buffer); + buffer.clear(); + } + + protected abstract class Node + { + protected InteriorNode parent; + protected Node next; + protected Long nodeMinToken, nodeMaxToken; + + public Node(Long minToken, Long maxToken) + { + nodeMinToken = minToken; + nodeMaxToken = maxToken; + } + + public abstract boolean isSerializable(); + public abstract void serialize(long childBlockIndex, ByteBuffer buf); + public abstract int childCount(); + public abstract int tokenCount(); + + public Long smallestToken() + { + return nodeMinToken; + } + + public Long largestToken() + { + return nodeMaxToken; + } + + public Iterator<Node> levelIterator() + { + return new LevelIterator(this); + } + + public boolean isLeaf() + { + return (this instanceof Leaf); + } + + protected boolean isLastLeaf() + { + return this == rightmostLeaf; + } + + protected boolean isRoot() + { + return this == root; + } + + protected void updateTokenRange(long token) + { + nodeMinToken = nodeMinToken == null ? token : Math.min(nodeMinToken, token); + nodeMaxToken = nodeMaxToken == null ? token : Math.max(nodeMaxToken, token); + } + + protected void serializeHeader(ByteBuffer buf) + { + Header header; + if (isRoot()) + header = new RootHeader(); + else if (!isLeaf()) + header = new InteriorNodeHeader(); + else + header = new LeafHeader(); + + header.serialize(buf); + alignBuffer(buf, BLOCK_HEADER_BYTES); + } + + private abstract class Header + { + public void serialize(ByteBuffer buf) + { + buf.put(infoByte()) + .putShort((short) (tokenCount())) + .putLong(nodeMinToken) + .putLong(nodeMaxToken); + } + + protected abstract byte infoByte(); + } + + private class RootHeader extends Header + { + public void serialize(ByteBuffer buf) + { + super.serialize(buf); + writeMagic(buf); + buf.putLong(tokenCount) + .putLong(treeMinToken) + .putLong(treeMaxToken); + } + + protected byte infoByte() + { + // if leaf, set leaf indicator and last leaf indicator (bits 0 & 1) + // if not leaf, clear both bits + return (byte) ((isLeaf()) ? 3 : 0); + } + + protected void writeMagic(ByteBuffer buf) + { + switch (Descriptor.CURRENT_VERSION) + { + case Descriptor.VERSION_AB: + buf.putShort(AB_MAGIC); + break; + + default: + break; + } + + } + } + + private class InteriorNodeHeader extends Header + { + // bit 0 (leaf indicator) & bit 1 (last leaf indicator) cleared + protected byte infoByte() + { + return 0; + } + } + + private class LeafHeader extends Header + { + // bit 0 set as leaf indicator + // bit 1 set if this is last leaf of data + protected byte infoByte() + { + byte infoByte = 1; + infoByte |= (isLastLeaf()) ? (1 << LAST_LEAF_SHIFT) : 0; + + return infoByte; + } + } + + } + + protected abstract class Leaf extends Node + { + protected LongArrayList overflowCollisions; + + public Leaf(Long minToken, Long maxToken) + { + super(minToken, maxToken); + } + + public int childCount() + { + return 0; + } + + protected void serializeOverflowCollisions(ByteBuffer buf) + { + if (overflowCollisions != null) + for (LongCursor offset : overflowCollisions) + buf.putLong(offset.value); + } + + public void serialize(long childBlockIndex, ByteBuffer buf) + { + serializeHeader(buf); + serializeData(buf); + serializeOverflowCollisions(buf); + } + + protected abstract void serializeData(ByteBuffer buf); + + protected LeafEntry createEntry(final long tok, final LongSet offsets) + { + int offsetCount = offsets.size(); + switch (offsetCount) + { + case 0: + throw new AssertionError("no offsets for token " + tok); + case 1: + long offset = offsets.toArray()[0]; + if (offset > MAX_OFFSET) + throw new AssertionError("offset " + offset + " cannot be greater than " + MAX_OFFSET); + else if (offset <= Integer.MAX_VALUE) + return new SimpleLeafEntry(tok, offset); + else + return new FactoredOffsetLeafEntry(tok, offset); + case 2: + long[] rawOffsets = offsets.toArray(); + if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= Integer.MAX_VALUE && + (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] <= Short.MAX_VALUE)) + return new PackedCollisionLeafEntry(tok, rawOffsets); + else + return createOverflowEntry(tok, offsetCount, offsets); + default: + return createOverflowEntry(tok, offsetCount, offsets); + } + } + + private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongSet offsets) + { + if (overflowCollisions == null) + overflowCollisions = new LongArrayList(); + + LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) overflowCollisions.size(), (short) offsetCount); + for (LongCursor o : offsets) { + if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY) + throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf"); + else + overflowCollisions.add(o.value); + } + return entry; + } + + protected abstract class LeafEntry + { + protected final long token; + + abstract public EntryType type(); + abstract public int offsetData(); + abstract public short offsetExtra(); + + public LeafEntry(final long tok) + { + token = tok; + } + + public void serialize(ByteBuffer buf) + { + buf.putShort((short) type().ordinal()) + .putShort(offsetExtra()) + .putLong(token) + .putInt(offsetData()); + } + + } + + + // assumes there is a single offset and the offset is <= Integer.MAX_VALUE + protected class SimpleLeafEntry extends LeafEntry + { + private final long offset; + + public SimpleLeafEntry(final long tok, final long off) + { + super(tok); + offset = off; + } + + public EntryType type() + { + return EntryType.SIMPLE; + } + + public int offsetData() + { + return (int) offset; + } + + public short offsetExtra() + { + return 0; + } + } + + // assumes there is a single offset and Integer.MAX_VALUE < offset <= MAX_OFFSET + // take the middle 32 bits of offset (or the top 32 when considering offset is max 48 bits) + // and store where offset is normally stored. take bottom 16 bits of offset and store in entry header + private class FactoredOffsetLeafEntry extends LeafEntry + { + private final long offset; + + public FactoredOffsetLeafEntry(final long tok, final long off) + { + super(tok); + offset = off; + } + + public EntryType type() + { + return EntryType.FACTORED; + } + + public int offsetData() + { + return (int) (offset >>> Short.SIZE); + } + + public short offsetExtra() + { + return (short) offset; + } + } + + // holds an entry with two offsets that can be packed in an int & a short + // the int offset is stored where offset is normally stored. short offset is + // stored in entry header + private class PackedCollisionLeafEntry extends LeafEntry + { + private short smallerOffset; + private int largerOffset; + + public PackedCollisionLeafEntry(final long tok, final long[] offs) + { + super(tok); + + smallerOffset = (short) Math.min(offs[0], offs[1]); + largerOffset = (int) Math.max(offs[0], offs[1]); + } + + public EntryType type() + { + return EntryType.PACKED; + } + + public int offsetData() + { + return largerOffset; + } + + public short offsetExtra() + { + return smallerOffset; + } + } + + // holds an entry with three or more offsets, or two offsets that cannot + // be packed into an int & a short. the index into the overflow list + // is stored where the offset is normally stored. the number of overflowed offsets + // for the entry is stored in the entry header + private class OverflowCollisionLeafEntry extends LeafEntry + { + private final short startIndex; + private final short count; + + public OverflowCollisionLeafEntry(final long tok, final short collisionStartIndex, final short collisionCount) + { + super(tok); + startIndex = collisionStartIndex; + count = collisionCount; + } + + public EntryType type() + { + return EntryType.OVERFLOW; + } + + public int offsetData() + { + return startIndex; + } + + public short offsetExtra() + { + return count; + } + + } + + } + + protected class InteriorNode extends Node + { + protected List<Long> tokens = new ArrayList<>(TOKENS_PER_BLOCK); + protected List<Node> children = new ArrayList<>(TOKENS_PER_BLOCK + 1); + protected int position = 0; + + public InteriorNode() + { + super(null, null); + } + + public boolean isSerializable() + { + return true; + } + + public void serialize(long childBlockIndex, ByteBuffer buf) + { + serializeHeader(buf); + serializeTokens(buf); + serializeChildOffsets(childBlockIndex, buf); + } + + public int childCount() + { + return children.size(); + } + + public int tokenCount() + { + return tokens.size(); + } + + public Long smallestToken() + { + return tokens.get(0); + } + + protected void add(Long token, InteriorNode leftChild, InteriorNode rightChild) + { + int pos = tokens.size(); + if (pos == TOKENS_PER_BLOCK) + { + InteriorNode sibling = split(); + sibling.add(token, leftChild, rightChild); + + } + else { + if (leftChild != null) + children.add(pos, leftChild); + + if (rightChild != null) + { + children.add(pos + 1, rightChild); + rightChild.parent = this; + } + + updateTokenRange(token); + tokens.add(pos, token); + } + } + + protected void add(Leaf node) + { + + if (position == (TOKENS_PER_BLOCK + 1)) + { + rightmostParent = split(); + rightmostParent.add(node); + } + else + { + + node.parent = this; + children.add(position, node); + position++; + + // the first child is referenced only during bulk load. we don't take a value + // to store into the tree, one is subtracted since position has already been incremented + // for the next node to be added + if (position - 1 == 0) + return; + + + // tokens are inserted one behind the current position, but 2 is subtracted because + // position has already been incremented for the next add + Long smallestToken = node.smallestToken(); + updateTokenRange(smallestToken); + tokens.add(position - 2, smallestToken); + } + + } + + protected InteriorNode split() + { + Pair<Long, InteriorNode> splitResult = splitBlock(); + Long middleValue = splitResult.left; + InteriorNode sibling = splitResult.right; + InteriorNode leftChild = null; + + // create a new root if necessary + if (parent == null) + { + parent = new InteriorNode(); + root = parent; + sibling.parent = parent; + leftChild = this; + numBlocks++; + } + + parent.add(middleValue, leftChild, sibling); + + return sibling; + } + + protected Pair<Long, InteriorNode> splitBlock() + { + final int splitPosition = TOKENS_PER_BLOCK - 2; + InteriorNode sibling = new InteriorNode(); + sibling.parent = parent; + next = sibling; + + Long middleValue = tokens.get(splitPosition); + + for (int i = splitPosition; i < TOKENS_PER_BLOCK; i++) + { + if (i != TOKENS_PER_BLOCK && i != splitPosition) + { + long token = tokens.get(i); + sibling.updateTokenRange(token); + sibling.tokens.add(token); + } + + Node child = children.get(i + 1); + child.parent = sibling; + sibling.children.add(child); + sibling.position++; + } + + for (int i = TOKENS_PER_BLOCK; i >= splitPosition; i--) + { + if (i != TOKENS_PER_BLOCK) + tokens.remove(i); + + if (i != splitPosition) + children.remove(i); + } + + nodeMinToken = smallestToken(); + nodeMaxToken = tokens.get(tokens.size() - 1); + numBlocks++; + + return Pair.create(middleValue, sibling); + } + + protected boolean isFull() + { + return (position >= TOKENS_PER_BLOCK + 1); + } + + private void serializeTokens(ByteBuffer buf) + { + tokens.forEach(buf::putLong); + } + + private void serializeChildOffsets(long childBlockIndex, ByteBuffer buf) + { + for (int i = 0; i < children.size(); i++) + buf.putLong((childBlockIndex + i) * BLOCK_BYTES); + } + } + + public static class LevelIterator extends AbstractIterator<Node> + { + private Node currentNode; + + LevelIterator(Node first) + { + currentNode = first; + } + + public Node computeNext() + { + if (currentNode == null) + return endOfData(); + + Node returnNode = currentNode; + currentNode = returnNode.next; + + return returnNode; + } + } + + + protected static void alignBuffer(ByteBuffer buffer, int blockSize) + { + long curPos = buffer.position(); + if ((curPos & (blockSize - 1)) != 0) // align on the block boundary if needed + buffer.position((int) FBUtilities.align(curPos, blockSize)); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java new file mode 100644 index 0000000..2ddfd89 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.disk; + +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.utils.AbstractIterator; +import org.apache.cassandra.utils.Pair; + +import com.carrotsearch.hppc.LongOpenHashSet; +import com.carrotsearch.hppc.LongSet; +import com.carrotsearch.hppc.cursors.LongCursor; + +public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder +{ + private final SortedMap<Long, LongSet> tokens = new TreeMap<>(); + + + public DynamicTokenTreeBuilder() + {} + + public DynamicTokenTreeBuilder(TokenTreeBuilder data) + { + add(data); + } + + public DynamicTokenTreeBuilder(SortedMap<Long, LongSet> data) + { + add(data); + } + + public void add(Long token, long keyPosition) + { + LongSet found = tokens.get(token); + if (found == null) + tokens.put(token, (found = new LongOpenHashSet(2))); + + found.add(keyPosition); + } + + public void add(Iterator<Pair<Long, LongSet>> data) + { + while (data.hasNext()) + { + Pair<Long, LongSet> entry = data.next(); + for (LongCursor l : entry.right) + add(entry.left, l.value); + } + } + + public void add(SortedMap<Long, LongSet> data) + { + for (Map.Entry<Long, LongSet> newEntry : data.entrySet()) + { + LongSet found = tokens.get(newEntry.getKey()); + if (found == null) + tokens.put(newEntry.getKey(), (found = new LongOpenHashSet(4))); + + for (LongCursor offset : newEntry.getValue()) + found.add(offset.value); + } + } + + public Iterator<Pair<Long, LongSet>> iterator() + { + final Iterator<Map.Entry<Long, LongSet>> iterator = tokens.entrySet().iterator(); + return new AbstractIterator<Pair<Long, LongSet>>() + { + protected Pair<Long, LongSet> computeNext() + { + if (!iterator.hasNext()) + return endOfData(); + + Map.Entry<Long, LongSet> entry = iterator.next(); + return Pair.create(entry.getKey(), entry.getValue()); + } + }; + } + + public boolean isEmpty() + { + return tokens.size() == 0; + } + + protected void constructTree() + { + tokenCount = tokens.size(); + treeMinToken = tokens.firstKey(); + treeMaxToken = tokens.lastKey(); + numBlocks = 1; + + // special case the tree that only has a single block in it (so we don't create a useless root) + if (tokenCount <= TOKENS_PER_BLOCK) + { + leftmostLeaf = new DynamicLeaf(tokens); + rightmostLeaf = leftmostLeaf; + root = leftmostLeaf; + } + else + { + root = new InteriorNode(); + rightmostParent = (InteriorNode) root; + + int i = 0; + Leaf lastLeaf = null; + Long firstToken = tokens.firstKey(); + Long finalToken = tokens.lastKey(); + Long lastToken; + for (Long token : tokens.keySet()) + { + if (i == 0 || (i % TOKENS_PER_BLOCK != 0 && i != (tokenCount - 1))) + { + i++; + continue; + } + + lastToken = token; + Leaf leaf = (i != (tokenCount - 1) || token.equals(finalToken)) ? + new DynamicLeaf(tokens.subMap(firstToken, lastToken)) : new DynamicLeaf(tokens.tailMap(firstToken)); + + if (i == TOKENS_PER_BLOCK) + leftmostLeaf = leaf; + else + lastLeaf.next = leaf; + + rightmostParent.add(leaf); + lastLeaf = leaf; + rightmostLeaf = leaf; + firstToken = lastToken; + i++; + numBlocks++; + + if (token.equals(finalToken)) + { + Leaf finalLeaf = new DynamicLeaf(tokens.tailMap(token)); + lastLeaf.next = finalLeaf; + rightmostParent.add(finalLeaf); + rightmostLeaf = finalLeaf; + numBlocks++; + } + } + + } + } + + private class DynamicLeaf extends Leaf + { + private final SortedMap<Long, LongSet> tokens; + + DynamicLeaf(SortedMap<Long, LongSet> data) + { + super(data.firstKey(), data.lastKey()); + tokens = data; + } + + public int tokenCount() + { + return tokens.size(); + } + + public boolean isSerializable() + { + return true; + } + + protected void serializeData(ByteBuffer buf) + { + for (Map.Entry<Long, LongSet> entry : tokens.entrySet()) + createEntry(entry.getKey(), entry.getValue()).serialize(buf); + } + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java index 04b7b1c..c14f76c 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java @@ -162,7 +162,7 @@ public class OnDiskIndexBuilder TokenTreeBuilder tokens = terms.get(term); if (tokens == null) { - terms.put(term, (tokens = new TokenTreeBuilder())); + terms.put(term, (tokens = new DynamicTokenTreeBuilder())); // on-heap size estimates from jol // 64 bytes for TTB + 48 bytes for TreeMap in TTB + size bytes for the term (map key) @@ -269,8 +269,8 @@ public class OnDiskIndexBuilder out.skipBytes((int) (BLOCK_SIZE - out.position())); - dataLevel = mode == Mode.SPARSE ? new DataBuilderLevel(out, new MutableDataBlock(mode)) - : new MutableLevel<>(out, new MutableDataBlock(mode)); + dataLevel = mode == Mode.SPARSE ? new DataBuilderLevel(out, new MutableDataBlock(termComparator, mode)) + : new MutableLevel<>(out, new MutableDataBlock(termComparator, mode)); while (terms.hasNext()) { Pair<ByteBuffer, TokenTreeBuilder> term = terms.next(); @@ -454,7 +454,7 @@ public class OnDiskIndexBuilder public DataBuilderLevel(SequentialWriter out, MutableBlock<InMemoryDataTerm> block) { super(out, block); - superBlockTree = new TokenTreeBuilder(); + superBlockTree = new DynamicTokenTreeBuilder(); } public InMemoryPointerTerm add(InMemoryDataTerm term) throws IOException @@ -465,20 +465,20 @@ public class OnDiskIndexBuilder dataBlocksCnt++; flushSuperBlock(false); } - superBlockTree.add(term.keys.getTokens()); + superBlockTree.add(term.keys); return ptr; } public void flushSuperBlock(boolean force) throws IOException { - if (dataBlocksCnt == SUPER_BLOCK_SIZE || (force && !superBlockTree.getTokens().isEmpty())) + if (dataBlocksCnt == SUPER_BLOCK_SIZE || (force && !superBlockTree.isEmpty())) { superBlockOffsets.add(out.position()); superBlockTree.finish().write(out); alignToBlock(out); dataBlocksCnt = 0; - superBlockTree = new TokenTreeBuilder(); + superBlockTree = new DynamicTokenTreeBuilder(); } } @@ -549,28 +549,34 @@ public class OnDiskIndexBuilder private static class MutableDataBlock extends MutableBlock<InMemoryDataTerm> { + private static final int MAX_KEYS_SPARSE = 5; + + private final AbstractType<?> comparator; private final Mode mode; private int offset = 0; - private int sparseValueTerms = 0; private final List<TokenTreeBuilder> containers = new ArrayList<>(); private TokenTreeBuilder combinedIndex; - public MutableDataBlock(Mode mode) + public MutableDataBlock(AbstractType<?> comparator, Mode mode) { + this.comparator = comparator; this.mode = mode; - this.combinedIndex = new TokenTreeBuilder(); + this.combinedIndex = initCombinedIndex(); } protected void addInternal(InMemoryDataTerm term) throws IOException { TokenTreeBuilder keys = term.keys; - if (mode == Mode.SPARSE && keys.getTokenCount() <= 5) + if (mode == Mode.SPARSE) { + if (keys.getTokenCount() > MAX_KEYS_SPARSE) + throw new IOException(String.format("Term - '%s' belongs to more than %d keys in %s mode, which is not allowed.", + comparator.getString(term.term), MAX_KEYS_SPARSE, mode.name())); + writeTerm(term, keys); - sparseValueTerms++; } else { @@ -581,7 +587,7 @@ public class OnDiskIndexBuilder } if (mode == Mode.SPARSE) - combinedIndex.add(keys.getTokens()); + combinedIndex.add(keys); } protected int sizeAfter(InMemoryDataTerm element) @@ -593,7 +599,7 @@ public class OnDiskIndexBuilder { super.flushAndClear(out); - out.writeInt((sparseValueTerms == 0) ? -1 : offset); + out.writeInt(mode == Mode.SPARSE ? offset : -1); if (containers.size() > 0) { @@ -601,18 +607,15 @@ public class OnDiskIndexBuilder tokens.write(out); } - if (sparseValueTerms > 0) - { + if (mode == Mode.SPARSE && combinedIndex != null) combinedIndex.finish().write(out); - } alignToBlock(out); containers.clear(); - combinedIndex = new TokenTreeBuilder(); + combinedIndex = initCombinedIndex(); offset = 0; - sparseValueTerms = 0; } private int ptrLength(InMemoryDataTerm term) @@ -626,10 +629,8 @@ public class OnDiskIndexBuilder { term.serialize(buffer); buffer.writeByte((byte) keys.getTokenCount()); - - Iterator<Pair<Long, LongSet>> tokens = keys.iterator(); - while (tokens.hasNext()) - buffer.writeLong(tokens.next().left); + for (Pair<Long, LongSet> key : keys) + buffer.writeLong(key.left); } private void writeTerm(InMemoryTerm term, int offset) throws IOException @@ -638,5 +639,10 @@ public class OnDiskIndexBuilder buffer.writeByte(0x0); buffer.writeInt(offset); } + + private TokenTreeBuilder initCombinedIndex() + { + return mode == Mode.SPARSE ? new DynamicTokenTreeBuilder() : null; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java index 6e63c71..34737ae 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java @@ -37,6 +37,7 @@ import org.apache.cassandra.index.sasi.conf.ColumnIndex; import org.apache.cassandra.index.sasi.utils.CombinedTermIterator; import org.apache.cassandra.index.sasi.utils.TypeUtil; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; import org.apache.cassandra.io.util.FileUtils; @@ -126,7 +127,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver Index index = indexes.get(column); if (index == null) - indexes.put(column, (index = new Index(columnIndex))); + indexes.put(column, (index = newIndex(columnIndex))); index.add(value.duplicate(), currentKey, currentKeyPosition); }); @@ -165,10 +166,18 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver } @VisibleForTesting + protected Index newIndex(ColumnIndex columnIndex) + { + return new Index(columnIndex); + } + + @VisibleForTesting protected class Index { + @VisibleForTesting + protected final String outputFile; + private final ColumnIndex columnIndex; - private final String outputFile; private final AbstractAnalyzer analyzer; private final long maxMemorySize; @@ -245,17 +254,22 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver final String segmentFile = filename(isFinal); return () -> { - long start1 = System.nanoTime(); + long start = System.nanoTime(); try { File index = new File(segmentFile); return builder.finish(index) ? new OnDiskIndex(index, columnIndex.getValidator(), null) : null; } + catch (Exception | FSError e) + { + logger.error("Failed to build index segment {}", segmentFile, e); + return null; + } finally { if (!isFinal) - logger.info("Flushed index segment {}, took {} ms.", segmentFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start1)); + logger.info("Flushed index segment {}, took {} ms.", segmentFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); } }; } @@ -290,7 +304,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver for (Future<OnDiskIndex> f : segments) { - OnDiskIndex part = Futures.getUnchecked(f); + OnDiskIndex part = f.get(); if (part == null) continue; @@ -304,7 +318,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver new File(outputFile), new CombinedTermIterator(parts)); } - catch (Exception e) + catch (Exception | FSError e) { logger.error("Failed to flush index {}.", outputFile, e); FileUtils.delete(outputFile); @@ -313,13 +327,14 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver { logger.info("Index flush to {} took {} ms.", outputFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start1)); - for (OnDiskIndex part : parts) + for (int segment = 0; segment < segmentNumber; segment++) { - if (part == null) - continue; + OnDiskIndex part = parts[segment]; + + if (part != null) + FileUtils.closeQuietly(part); - FileUtils.closeQuietly(part); - FileUtils.delete(part.getIndexPath()); + FileUtils.delete(outputFile + "_" + segment); } latch.countDown(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java new file mode 100644 index 0000000..147427e --- /dev/null +++ b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sasi.disk; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.SortedMap; + +import org.apache.cassandra.index.sasi.utils.CombinedTerm; +import org.apache.cassandra.index.sasi.utils.RangeIterator; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.AbstractIterator; +import org.apache.cassandra.utils.Pair; + +import com.carrotsearch.hppc.LongSet; +import com.google.common.collect.Iterators; + +/** + * Intended usage of this class is to be used in place of {@link DynamicTokenTreeBuilder} + * when multiple index segments produced by {@link PerSSTableIndexWriter} are stitched together + * by {@link PerSSTableIndexWriter#complete()}. + * + * This class uses the RangeIterator, now provided by + * {@link CombinedTerm#getTokenIterator()}, to iterate the data twice. + * The first iteration builds the tree with leaves that contain only enough + * information to build the upper layers -- these leaves do not store more + * than their minimum and maximum tokens plus their total size, which makes them + * un-serializable. + * + * When the tree is written to disk the final layer is not + * written. Its at this point the data is iterated once again to write + * the leaves to disk. This (logarithmically) reduces copying of the + * token values while building and writing upper layers of the tree, + * removes the use of SortedMap when combining SAs, and relies on the + * memory mapped SAs otherwise, greatly improving performance and no + * longer causing OOMs when TokenTree sizes are big. + * + * See https://issues.apache.org/jira/browse/CASSANDRA-11383 for more details. + */ +public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder +{ + private final CombinedTerm combinedTerm; + + public StaticTokenTreeBuilder(CombinedTerm term) + { + combinedTerm = term; + } + + public void add(Long token, long keyPosition) + { + throw new UnsupportedOperationException(); + } + + public void add(SortedMap<Long, LongSet> data) + { + throw new UnsupportedOperationException(); + } + + public void add(Iterator<Pair<Long, LongSet>> data) + { + throw new UnsupportedOperationException(); + } + + public boolean isEmpty() + { + return combinedTerm.getTokenIterator().getCount() == 0; + } + + public Iterator<Pair<Long, LongSet>> iterator() + { + Iterator<Token> iterator = combinedTerm.getTokenIterator(); + return new AbstractIterator<Pair<Long, LongSet>>() + { + protected Pair<Long, LongSet> computeNext() + { + if (!iterator.hasNext()) + return endOfData(); + + Token token = iterator.next(); + return Pair.create(token.get(), token.getOffsets()); + } + }; + } + + public long getTokenCount() + { + return combinedTerm.getTokenIterator().getCount(); + } + + @Override + public void write(DataOutputPlus out) throws IOException + { + // if the root is not a leaf then none of the leaves have been written (all are PartialLeaf) + // so write out the last layer of the tree by converting PartialLeaf to StaticLeaf and + // iterating the data once more + super.write(out); + if (root.isLeaf()) + return; + + RangeIterator<Long, Token> tokens = combinedTerm.getTokenIterator(); + ByteBuffer blockBuffer = ByteBuffer.allocate(BLOCK_BYTES); + Iterator<Node> leafIterator = leftmostLeaf.levelIterator(); + while (leafIterator.hasNext()) + { + Leaf leaf = (Leaf) leafIterator.next(); + Leaf writeableLeaf = new StaticLeaf(Iterators.limit(tokens, leaf.tokenCount()), leaf); + writeableLeaf.serialize(-1, blockBuffer); + flushBuffer(blockBuffer, out, true); + } + + } + + protected void constructTree() + { + RangeIterator<Long, Token> tokens = combinedTerm.getTokenIterator(); + + tokenCount = tokens.getCount(); + treeMinToken = tokens.getMinimum(); + treeMaxToken = tokens.getMaximum(); + numBlocks = 1; + + if (tokenCount <= TOKENS_PER_BLOCK) + { + leftmostLeaf = new StaticLeaf(tokens, tokens.getMinimum(), tokens.getMaximum(), tokens.getCount(), true); + rightmostLeaf = leftmostLeaf; + root = leftmostLeaf; + } + else + { + root = new InteriorNode(); + rightmostParent = (InteriorNode) root; + + // build all the leaves except for maybe + // the last leaf which is not completely full . + // This loop relies on the fact that multiple index segments + // will never have token intersection for a single term, + // because it's impossible to encounter the same value for + // the same column multiple times in a single key/sstable. + Leaf lastLeaf = null; + long numFullLeaves = tokenCount / TOKENS_PER_BLOCK; + for (long i = 0; i < numFullLeaves; i++) + { + Long firstToken = tokens.next().get(); + for (int j = 1; j < (TOKENS_PER_BLOCK - 1); j++) + tokens.next(); + + Long lastToken = tokens.next().get(); + Leaf leaf = new PartialLeaf(firstToken, lastToken, TOKENS_PER_BLOCK); + + if (lastLeaf == null) + leftmostLeaf = leaf; + else + lastLeaf.next = leaf; + + rightmostParent.add(leaf); + lastLeaf = rightmostLeaf = leaf; + numBlocks++; + } + + // build the last leaf out of any remaining tokens if necessary + // safe downcast since TOKENS_PER_BLOCK is an int + int remainingTokens = (int) (tokenCount % TOKENS_PER_BLOCK); + if (remainingTokens != 0) + { + Long firstToken = tokens.next().get(); + Long lastToken = firstToken; + while (tokens.hasNext()) + lastToken = tokens.next().get(); + + Leaf leaf = new PartialLeaf(firstToken, lastToken, remainingTokens); + rightmostParent.add(leaf); + lastLeaf.next = rightmostLeaf = leaf; + numBlocks++; + } + } + } + + // This denotes the leaf which only has min/max and token counts + // but doesn't have any associated data yet, so it can't be serialized. + private class PartialLeaf extends Leaf + { + private final int size; + public PartialLeaf(Long min, Long max, int count) + { + super(min, max); + size = count; + } + + public int tokenCount() + { + return size; + } + + public void serializeData(ByteBuffer buf) + { + throw new UnsupportedOperationException(); + } + + public boolean isSerializable() + { + return false; + } + } + + // This denotes the leaf which has been filled with data and is ready to be serialized + private class StaticLeaf extends Leaf + { + private final Iterator<Token> tokens; + private final int count; + private final boolean isLast; + + public StaticLeaf(Iterator<Token> tokens, Leaf leaf) + { + this(tokens, leaf.smallestToken(), leaf.largestToken(), leaf.tokenCount(), leaf.isLastLeaf()); + } + + public StaticLeaf(Iterator<Token> tokens, Long min, Long max, long count, boolean isLastLeaf) + { + super(min, max); + + this.count = (int) count; // downcast is safe since leaf size is always < Integer.MAX_VALUE + this.tokens = tokens; + this.isLast = isLastLeaf; + } + + public boolean isLastLeaf() + { + return isLast; + } + + public int tokenCount() + { + return count; + } + + public void serializeData(ByteBuffer buf) + { + while (tokens.hasNext()) + { + Token entry = tokens.next(); + createEntry(entry.get(), entry.getOffsets()).serialize(buf); + } + } + + public boolean isSerializable() + { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/Token.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Token.java b/src/java/org/apache/cassandra/index/sasi/disk/Token.java index 02130a3..4cd1ea3 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/Token.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/Token.java @@ -18,9 +18,12 @@ package org.apache.cassandra.index.sasi.disk; import com.google.common.primitives.Longs; + import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.index.sasi.utils.CombinedValue; +import com.carrotsearch.hppc.LongSet; + public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKey> { protected final long token; @@ -35,6 +38,8 @@ public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKe return token; } + public abstract LongSet getOffsets(); + public int compareTo(CombinedValue<Long> o) { return Longs.compare(token, ((Token) o).token); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java index 5d85d00..3f8182d 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java @@ -27,6 +27,8 @@ import org.apache.cassandra.index.sasi.utils.MappedBuffer; import org.apache.cassandra.index.sasi.utils.RangeIterator; import org.apache.cassandra.utils.MergeIterator; +import com.carrotsearch.hppc.LongOpenHashSet; +import com.carrotsearch.hppc.LongSet; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.Iterators; @@ -406,9 +408,9 @@ public class TokenTree }); } - public Set<Long> getOffsets() + public LongSet getOffsets() { - Set<Long> offsets = new HashSet<>(); + LongSet offsets = new LongOpenHashSet(4); for (TokenInfo i : info) { for (long offset : i.fetchOffsets()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java index e10b057..2210964 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java @@ -18,21 +18,26 @@ package org.apache.cassandra.index.sasi.disk; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.*; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; -import com.carrotsearch.hppc.LongArrayList; import com.carrotsearch.hppc.LongSet; -import com.carrotsearch.hppc.cursors.LongCursor; -import com.carrotsearch.hppc.LongOpenHashSet; -import com.google.common.collect.AbstractIterator; -public class TokenTreeBuilder +public interface TokenTreeBuilder extends Iterable<Pair<Long, LongSet>> { + int BLOCK_BYTES = 4096; + int BLOCK_HEADER_BYTES = 64; + int OVERFLOW_TRAILER_BYTES = 64; + int OVERFLOW_TRAILER_CAPACITY = OVERFLOW_TRAILER_BYTES / 8; + int TOKENS_PER_BLOCK = (BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / 16; + long MAX_OFFSET = (1L << 47) - 1; // 48 bits for (signed) offset + byte LAST_LEAF_SHIFT = 1; + byte SHARED_HEADER_BYTES = 19; + byte ENTRY_TYPE_MASK = 0x03; + short AB_MAGIC = 0x5A51; + // note: ordinal positions are used here, do not change order enum EntryType { @@ -56,784 +61,16 @@ public class TokenTreeBuilder } } - public static final int BLOCK_BYTES = 4096; - public static final int BLOCK_HEADER_BYTES = 64; - public static final int OVERFLOW_TRAILER_BYTES = 64; - public static final int OVERFLOW_TRAILER_CAPACITY = OVERFLOW_TRAILER_BYTES / 8; - public static final int TOKENS_PER_BLOCK = (BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / 16; - public static final long MAX_OFFSET = (1L << 47) - 1; // 48 bits for (signed) offset - public static final byte LAST_LEAF_SHIFT = 1; - public static final byte SHARED_HEADER_BYTES = 19; - public static final byte ENTRY_TYPE_MASK = 0x03; - public static final short AB_MAGIC = 0x5A51; - - private final SortedMap<Long, LongSet> tokens = new TreeMap<>(); - private int numBlocks; - - private Node root; - private InteriorNode rightmostParent; - private Leaf leftmostLeaf; - private Leaf rightmostLeaf; - private long tokenCount = 0; - private long treeMinToken; - private long treeMaxToken; - - public TokenTreeBuilder() - {} - - public TokenTreeBuilder(SortedMap<Long, LongSet> data) - { - add(data); - } - - public void add(Long token, long keyPosition) - { - LongSet found = tokens.get(token); - if (found == null) - tokens.put(token, (found = new LongOpenHashSet(2))); - - found.add(keyPosition); - } - - public void add(SortedMap<Long, LongSet> data) - { - for (Map.Entry<Long, LongSet> newEntry : data.entrySet()) - { - LongSet found = tokens.get(newEntry.getKey()); - if (found == null) - tokens.put(newEntry.getKey(), (found = new LongOpenHashSet(4))); - - for (LongCursor offset : newEntry.getValue()) - found.add(offset.value); - } - } - - public TokenTreeBuilder finish() - { - maybeBulkLoad(); - return this; - } - - public SortedMap<Long, LongSet> getTokens() - { - return tokens; - } - - public long getTokenCount() - { - return tokenCount; - } - - public int serializedSize() - { - if (numBlocks == 1) - return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16)); - else - return numBlocks * BLOCK_BYTES; - } - - public void write(DataOutputPlus out) throws IOException - { - ByteBuffer blockBuffer = ByteBuffer.allocate(BLOCK_BYTES); - Iterator<Node> levelIterator = root.levelIterator(); - long childBlockIndex = 1; - - while (levelIterator != null) - { - - Node firstChild = null; - while (levelIterator.hasNext()) - { - Node block = levelIterator.next(); - - if (firstChild == null && !block.isLeaf()) - firstChild = ((InteriorNode) block).children.get(0); - - block.serialize(childBlockIndex, blockBuffer); - flushBuffer(blockBuffer, out, numBlocks != 1); - - childBlockIndex += block.childCount(); - } - - levelIterator = (firstChild == null) ? null : firstChild.levelIterator(); - } - } - - public Iterator<Pair<Long, LongSet>> iterator() - { - return new TokenIterator(leftmostLeaf.levelIterator()); - } - - private void maybeBulkLoad() - { - if (root == null) - bulkLoad(); - } - - private void flushBuffer(ByteBuffer buffer, DataOutputPlus o, boolean align) throws IOException - { - // seek to end of last block before flushing - if (align) - alignBuffer(buffer, BLOCK_BYTES); - - buffer.flip(); - o.write(buffer); - buffer.clear(); - } - - private static void alignBuffer(ByteBuffer buffer, int blockSize) - { - long curPos = buffer.position(); - if ((curPos & (blockSize - 1)) != 0) // align on the block boundary if needed - buffer.position((int) FBUtilities.align(curPos, blockSize)); - } - - private void bulkLoad() - { - tokenCount = tokens.size(); - treeMinToken = tokens.firstKey(); - treeMaxToken = tokens.lastKey(); - numBlocks = 1; - - // special case the tree that only has a single block in it (so we don't create a useless root) - if (tokenCount <= TOKENS_PER_BLOCK) - { - leftmostLeaf = new Leaf(tokens); - rightmostLeaf = leftmostLeaf; - root = leftmostLeaf; - } - else - { - root = new InteriorNode(); - rightmostParent = (InteriorNode) root; - - int i = 0; - Leaf lastLeaf = null; - Long firstToken = tokens.firstKey(); - Long finalToken = tokens.lastKey(); - Long lastToken; - for (Long token : tokens.keySet()) - { - if (i == 0 || (i % TOKENS_PER_BLOCK != 0 && i != (tokenCount - 1))) - { - i++; - continue; - } - - lastToken = token; - Leaf leaf = (i != (tokenCount - 1) || token.equals(finalToken)) ? - new Leaf(tokens.subMap(firstToken, lastToken)) : new Leaf(tokens.tailMap(firstToken)); - - if (i == TOKENS_PER_BLOCK) - leftmostLeaf = leaf; - else - lastLeaf.next = leaf; - - rightmostParent.add(leaf); - lastLeaf = leaf; - rightmostLeaf = leaf; - firstToken = lastToken; - i++; - numBlocks++; - - if (token.equals(finalToken)) - { - Leaf finalLeaf = new Leaf(tokens.tailMap(token)); - lastLeaf.next = finalLeaf; - rightmostParent.add(finalLeaf); - rightmostLeaf = finalLeaf; - numBlocks++; - } - } - - } - } - - private abstract class Node - { - protected InteriorNode parent; - protected Node next; - protected Long nodeMinToken, nodeMaxToken; - - public abstract void serialize(long childBlockIndex, ByteBuffer buf); - public abstract int childCount(); - public abstract int tokenCount(); - public abstract Long smallestToken(); - - public Iterator<Node> levelIterator() - { - return new LevelIterator(this); - } - - public boolean isLeaf() - { - return (this instanceof Leaf); - } - - protected boolean isLastLeaf() - { - return this == rightmostLeaf; - } - - protected boolean isRoot() - { - return this == root; - } - - protected void updateTokenRange(long token) - { - nodeMinToken = nodeMinToken == null ? token : Math.min(nodeMinToken, token); - nodeMaxToken = nodeMaxToken == null ? token : Math.max(nodeMaxToken, token); - } - - protected void serializeHeader(ByteBuffer buf) - { - Header header; - if (isRoot()) - header = new RootHeader(); - else if (!isLeaf()) - header = new InteriorNodeHeader(); - else - header = new LeafHeader(); - - header.serialize(buf); - alignBuffer(buf, BLOCK_HEADER_BYTES); - } - - private abstract class Header - { - public void serialize(ByteBuffer buf) - { - buf.put(infoByte()) - .putShort((short) (tokenCount())) - .putLong(nodeMinToken) - .putLong(nodeMaxToken); - } - - protected abstract byte infoByte(); - } - - private class RootHeader extends Header - { - public void serialize(ByteBuffer buf) - { - super.serialize(buf); - writeMagic(buf); - buf.putLong(tokenCount) - .putLong(treeMinToken) - .putLong(treeMaxToken); - } - - protected byte infoByte() - { - // if leaf, set leaf indicator and last leaf indicator (bits 0 & 1) - // if not leaf, clear both bits - return (byte) ((isLeaf()) ? 3 : 0); - } - - protected void writeMagic(ByteBuffer buf) - { - switch (Descriptor.CURRENT_VERSION) - { - case Descriptor.VERSION_AB: - buf.putShort(AB_MAGIC); - break; - default: - break; - } - - } - } - - private class InteriorNodeHeader extends Header - { - // bit 0 (leaf indicator) & bit 1 (last leaf indicator) cleared - protected byte infoByte() - { - return 0; - } - } - - private class LeafHeader extends Header - { - // bit 0 set as leaf indicator - // bit 1 set if this is last leaf of data - protected byte infoByte() - { - byte infoByte = 1; - infoByte |= (isLastLeaf()) ? (1 << LAST_LEAF_SHIFT) : 0; - - return infoByte; - } - } - - } - - private class Leaf extends Node - { - private final SortedMap<Long, LongSet> tokens; - private LongArrayList overflowCollisions; - - Leaf(SortedMap<Long, LongSet> data) - { - nodeMinToken = data.firstKey(); - nodeMaxToken = data.lastKey(); - tokens = data; - } - - public Long largestToken() - { - return nodeMaxToken; - } - - public void serialize(long childBlockIndex, ByteBuffer buf) - { - serializeHeader(buf); - serializeData(buf); - serializeOverflowCollisions(buf); - } - - public int childCount() - { - return 0; - } - - public int tokenCount() - { - return tokens.size(); - } - - public Long smallestToken() - { - return nodeMinToken; - } - - public Iterator<Map.Entry<Long, LongSet>> tokenIterator() - { - return tokens.entrySet().iterator(); - } - - private void serializeData(ByteBuffer buf) - { - for (Map.Entry<Long, LongSet> entry : tokens.entrySet()) - createEntry(entry.getKey(), entry.getValue()).serialize(buf); - } - - private void serializeOverflowCollisions(ByteBuffer buf) - { - if (overflowCollisions != null) - for (LongCursor offset : overflowCollisions) - buf.putLong(offset.value); - } - - - private LeafEntry createEntry(final long tok, final LongSet offsets) - { - int offsetCount = offsets.size(); - switch (offsetCount) - { - case 0: - throw new AssertionError("no offsets for token " + tok); - case 1: - long offset = offsets.toArray()[0]; - if (offset > MAX_OFFSET) - throw new AssertionError("offset " + offset + " cannot be greater than " + MAX_OFFSET); - else if (offset <= Integer.MAX_VALUE) - return new SimpleLeafEntry(tok, offset); - else - return new FactoredOffsetLeafEntry(tok, offset); - case 2: - long[] rawOffsets = offsets.toArray(); - if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= Integer.MAX_VALUE && - (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] <= Short.MAX_VALUE)) - return new PackedCollisionLeafEntry(tok, rawOffsets); - else - return createOverflowEntry(tok, offsetCount, offsets); - default: - return createOverflowEntry(tok, offsetCount, offsets); - } - } - - private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongSet offsets) - { - if (overflowCollisions == null) - overflowCollisions = new LongArrayList(); - - LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) overflowCollisions.size(), (short) offsetCount); - for (LongCursor o : offsets) { - if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY) - throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf"); - else - overflowCollisions.add(o.value); - } - return entry; - } - - private abstract class LeafEntry - { - protected final long token; - - abstract public EntryType type(); - abstract public int offsetData(); - abstract public short offsetExtra(); - - public LeafEntry(final long tok) - { - token = tok; - } - - public void serialize(ByteBuffer buf) - { - buf.putShort((short) type().ordinal()) - .putShort(offsetExtra()) - .putLong(token) - .putInt(offsetData()); - } - - } - - - // assumes there is a single offset and the offset is <= Integer.MAX_VALUE - private class SimpleLeafEntry extends LeafEntry - { - private final long offset; - - public SimpleLeafEntry(final long tok, final long off) - { - super(tok); - offset = off; - } - - public EntryType type() - { - return EntryType.SIMPLE; - } - - public int offsetData() - { - return (int) offset; - } - - public short offsetExtra() - { - return 0; - } - } - - // assumes there is a single offset and Integer.MAX_VALUE < offset <= MAX_OFFSET - // take the middle 32 bits of offset (or the top 32 when considering offset is max 48 bits) - // and store where offset is normally stored. take bottom 16 bits of offset and store in entry header - private class FactoredOffsetLeafEntry extends LeafEntry - { - private final long offset; - - public FactoredOffsetLeafEntry(final long tok, final long off) - { - super(tok); - offset = off; - } - - public EntryType type() - { - return EntryType.FACTORED; - } - - public int offsetData() - { - return (int) (offset >>> Short.SIZE); - } - - public short offsetExtra() - { - return (short) offset; - } - } - - // holds an entry with two offsets that can be packed in an int & a short - // the int offset is stored where offset is normally stored. short offset is - // stored in entry header - private class PackedCollisionLeafEntry extends LeafEntry - { - private short smallerOffset; - private int largerOffset; - - public PackedCollisionLeafEntry(final long tok, final long[] offs) - { - super(tok); - - smallerOffset = (short) Math.min(offs[0], offs[1]); - largerOffset = (int) Math.max(offs[0], offs[1]); - } - - public EntryType type() - { - return EntryType.PACKED; - } - - public int offsetData() - { - return largerOffset; - } - - public short offsetExtra() - { - return smallerOffset; - } - } - - // holds an entry with three or more offsets, or two offsets that cannot - // be packed into an int & a short. the index into the overflow list - // is stored where the offset is normally stored. the number of overflowed offsets - // for the entry is stored in the entry header - private class OverflowCollisionLeafEntry extends LeafEntry - { - private final short startIndex; - private final short count; - - public OverflowCollisionLeafEntry(final long tok, final short collisionStartIndex, final short collisionCount) - { - super(tok); - startIndex = collisionStartIndex; - count = collisionCount; - } - - public EntryType type() - { - return EntryType.OVERFLOW; - } - - public int offsetData() - { - return startIndex; - } - - public short offsetExtra() - { - return count; - } - - } - - } - - private class InteriorNode extends Node - { - private List<Long> tokens = new ArrayList<>(TOKENS_PER_BLOCK); - private List<Node> children = new ArrayList<>(TOKENS_PER_BLOCK + 1); - private int position = 0; // TODO (jwest): can get rid of this and use array size - - - public void serialize(long childBlockIndex, ByteBuffer buf) - { - serializeHeader(buf); - serializeTokens(buf); - serializeChildOffsets(childBlockIndex, buf); - } + void add(Long token, long keyPosition); + void add(SortedMap<Long, LongSet> data); + void add(Iterator<Pair<Long, LongSet>> data); + void add(TokenTreeBuilder ttb); - public int childCount() - { - return children.size(); - } + boolean isEmpty(); + long getTokenCount(); - public int tokenCount() - { - return tokens.size(); - } + TokenTreeBuilder finish(); - public Long smallestToken() - { - return tokens.get(0); - } - - protected void add(Long token, InteriorNode leftChild, InteriorNode rightChild) - { - int pos = tokens.size(); - if (pos == TOKENS_PER_BLOCK) - { - InteriorNode sibling = split(); - sibling.add(token, leftChild, rightChild); - - } - else { - if (leftChild != null) - children.add(pos, leftChild); - - if (rightChild != null) - { - children.add(pos + 1, rightChild); - rightChild.parent = this; - } - - updateTokenRange(token); - tokens.add(pos, token); - } - } - - protected void add(Leaf node) - { - - if (position == (TOKENS_PER_BLOCK + 1)) - { - rightmostParent = split(); - rightmostParent.add(node); - } - else - { - - node.parent = this; - children.add(position, node); - position++; - - // the first child is referenced only during bulk load. we don't take a value - // to store into the tree, one is subtracted since position has already been incremented - // for the next node to be added - if (position - 1 == 0) - return; - - - // tokens are inserted one behind the current position, but 2 is subtracted because - // position has already been incremented for the next add - Long smallestToken = node.smallestToken(); - updateTokenRange(smallestToken); - tokens.add(position - 2, smallestToken); - } - - } - - protected InteriorNode split() - { - Pair<Long, InteriorNode> splitResult = splitBlock(); - Long middleValue = splitResult.left; - InteriorNode sibling = splitResult.right; - InteriorNode leftChild = null; - - // create a new root if necessary - if (parent == null) - { - parent = new InteriorNode(); - root = parent; - sibling.parent = parent; - leftChild = this; - numBlocks++; - } - - parent.add(middleValue, leftChild, sibling); - - return sibling; - } - - protected Pair<Long, InteriorNode> splitBlock() - { - final int splitPosition = TOKENS_PER_BLOCK - 2; - InteriorNode sibling = new InteriorNode(); - sibling.parent = parent; - next = sibling; - - Long middleValue = tokens.get(splitPosition); - - for (int i = splitPosition; i < TOKENS_PER_BLOCK; i++) - { - if (i != TOKENS_PER_BLOCK && i != splitPosition) - { - long token = tokens.get(i); - sibling.updateTokenRange(token); - sibling.tokens.add(token); - } - - Node child = children.get(i + 1); - child.parent = sibling; - sibling.children.add(child); - sibling.position++; - } - - for (int i = TOKENS_PER_BLOCK; i >= splitPosition; i--) - { - if (i != TOKENS_PER_BLOCK) - tokens.remove(i); - - if (i != splitPosition) - children.remove(i); - } - - nodeMinToken = smallestToken(); - nodeMaxToken = tokens.get(tokens.size() - 1); - numBlocks++; - - return Pair.create(middleValue, sibling); - } - - protected boolean isFull() - { - return (position >= TOKENS_PER_BLOCK + 1); - } - - private void serializeTokens(ByteBuffer buf) - { - for (Long token : tokens) - buf.putLong(token); - } - - - private void serializeChildOffsets(long childBlockIndex, ByteBuffer buf) - { - for (int i = 0; i < children.size(); i++) - buf.putLong((childBlockIndex + i) * BLOCK_BYTES); - } - } - - public static class LevelIterator extends AbstractIterator<Node> - { - private Node currentNode; - - LevelIterator(Node first) - { - currentNode = first; - } - - public Node computeNext() - { - if (currentNode == null) - return endOfData(); - - Node returnNode = currentNode; - currentNode = returnNode.next; - - return returnNode; - } - - - } - - public static class TokenIterator extends AbstractIterator<Pair<Long, LongSet>> - { - private Iterator<Node> levelIterator; - private Iterator<Map.Entry<Long, LongSet>> currentIterator; - - TokenIterator(Iterator<Node> level) - { - levelIterator = level; - if (levelIterator.hasNext()) - currentIterator = ((Leaf) levelIterator.next()).tokenIterator(); - } - - public Pair<Long, LongSet> computeNext() - { - if (currentIterator != null && currentIterator.hasNext()) - { - Map.Entry<Long, LongSet> next = currentIterator.next(); - return Pair.create(next.getKey(), next.getValue()); - } - else - { - if (!levelIterator.hasNext()) - return endOfData(); - else - { - currentIterator = ((Leaf) levelIterator.next()).tokenIterator(); - return computeNext(); - } - } - - } - } + int serializedSize(); + void write(DataOutputPlus out) throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java index 293e2ee..a2f2c0e 100644 --- a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java +++ b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java @@ -29,6 +29,8 @@ import org.apache.cassandra.index.sasi.utils.AbstractIterator; import org.apache.cassandra.index.sasi.utils.CombinedValue; import org.apache.cassandra.index.sasi.utils.RangeIterator; +import com.carrotsearch.hppc.LongOpenHashSet; +import com.carrotsearch.hppc.LongSet; import com.google.common.collect.PeekingIterator; public class KeyRangeIterator extends RangeIterator<Long, Token> @@ -91,6 +93,15 @@ public class KeyRangeIterator extends RangeIterator<Long, Token> }}; } + public LongSet getOffsets() + { + LongSet offsets = new LongOpenHashSet(4); + for (DecoratedKey key : keys) + offsets.add((long) key.getToken().getTokenValue()); + + return offsets; + } + public void merge(CombinedValue<Long> other) { if (!(other instanceof Token)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java b/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java index 63f6c5b..592299e 100644 --- a/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java +++ b/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java @@ -20,6 +20,7 @@ package org.apache.cassandra.index.sasi.sa; import java.nio.ByteBuffer; import java.nio.CharBuffer; +import org.apache.cassandra.index.sasi.disk.DynamicTokenTreeBuilder; import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder; import org.apache.cassandra.db.marshal.AbstractType; @@ -116,19 +117,19 @@ public class SuffixSA extends SA<CharBuffer> if (lastProcessedSuffix == null) { lastProcessedSuffix = suffix.left; - container = new TokenTreeBuilder(suffix.right.getTokens()); + container = new DynamicTokenTreeBuilder(suffix.right); } else if (comparator.compare(lastProcessedSuffix, suffix.left) == 0) { lastProcessedSuffix = suffix.left; - container.add(suffix.right.getTokens()); + container.add(suffix.right); } else { Pair<ByteBuffer, TokenTreeBuilder> result = finishSuffix(); lastProcessedSuffix = suffix.left; - container = new TokenTreeBuilder(suffix.right.getTokens()); + container = new DynamicTokenTreeBuilder(suffix.right); return result; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java index 2bf5a07..ba7123a 100644 --- a/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java +++ b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java @@ -18,42 +18,22 @@ package org.apache.cassandra.index.sasi.utils; import java.nio.ByteBuffer; -import java.util.Map; -import java.util.TreeMap; +import java.util.*; +import org.apache.cassandra.index.sasi.disk.*; import org.apache.cassandra.index.sasi.disk.OnDiskIndex.DataTerm; -import org.apache.cassandra.index.sasi.disk.Token; -import org.apache.cassandra.index.sasi.disk.TokenTree; -import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder; import org.apache.cassandra.db.marshal.AbstractType; -import com.carrotsearch.hppc.LongOpenHashSet; -import com.carrotsearch.hppc.LongSet; -import com.carrotsearch.hppc.cursors.LongCursor; - public class CombinedTerm implements CombinedValue<DataTerm> { private final AbstractType<?> comparator; private final DataTerm term; - private final TreeMap<Long, LongSet> tokens; + private final List<DataTerm> mergedTerms = new ArrayList<>(); public CombinedTerm(AbstractType<?> comparator, DataTerm term) { this.comparator = comparator; this.term = term; - this.tokens = new TreeMap<>(); - - RangeIterator<Long, Token> tokens = term.getTokens(); - while (tokens.hasNext()) - { - Token current = tokens.next(); - LongSet offsets = this.tokens.get(current.get()); - if (offsets == null) - this.tokens.put(current.get(), (offsets = new LongOpenHashSet())); - - for (Long offset : ((TokenTree.OnDiskToken) current).getOffsets()) - offsets.add(offset); - } } public ByteBuffer getTerm() @@ -61,14 +41,18 @@ public class CombinedTerm implements CombinedValue<DataTerm> return term.getTerm(); } - public Map<Long, LongSet> getTokens() + public RangeIterator<Long, Token> getTokenIterator() { - return tokens; + RangeIterator.Builder<Long, Token> union = RangeUnionIterator.builder(); + union.add(term.getTokens()); + mergedTerms.stream().map(OnDiskIndex.DataTerm::getTokens).forEach(union::add); + + return union.build(); } public TokenTreeBuilder getTokenTreeBuilder() { - return new TokenTreeBuilder(tokens).finish(); + return new StaticTokenTreeBuilder(this).finish(); } public void merge(CombinedValue<DataTerm> other) @@ -80,15 +64,7 @@ public class CombinedTerm implements CombinedValue<DataTerm> assert comparator == o.comparator; - for (Map.Entry<Long, LongSet> token : o.tokens.entrySet()) - { - LongSet offsets = this.tokens.get(token.getKey()); - if (offsets == null) - this.tokens.put(token.getKey(), (offsets = new LongOpenHashSet())); - - for (LongCursor offset : token.getValue()) - offsets.add(offset.value); - } + mergedTerms.add(o.term); } public DataTerm get() http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java index 6353155..628bd36 100644 --- a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java +++ b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java @@ -605,6 +605,12 @@ public class OnDiskIndexTest } } + public void putAll(SortedMap<Long, LongSet> offsets, TokenTreeBuilder ttb) + { + for (Pair<Long, LongSet> entry : ttb) + offsets.put(entry.left, entry.right); + } + @Test public void testCombiningOfThePartitionedSA() throws Exception { @@ -620,7 +626,7 @@ public class OnDiskIndexTest expected.put(i, (offsets = new TreeMap<>())); builderA.add(LongType.instance.decompose(i), keyAt(i), i); - offsets.putAll(keyBuilder(i).getTokens()); + putAll(offsets, keyBuilder(i)); } for (long i = 50; i < 100; i++) @@ -631,7 +637,7 @@ public class OnDiskIndexTest long position = 100L + i; builderB.add(LongType.instance.decompose(i), keyAt(position), position); - offsets.putAll(keyBuilder(100L + i).getTokens()); + putAll(offsets, keyBuilder(100L + i)); } File indexA = File.createTempFile("on-disk-sa-partition-a", ".db"); @@ -659,7 +665,7 @@ public class OnDiskIndexTest if (offsets == null) actual.put(composedTerm, (offsets = new TreeMap<>())); - offsets.putAll(term.getTokens()); + putAll(offsets, term.getTokenTreeBuilder()); } Assert.assertEquals(actual, expected); @@ -684,7 +690,7 @@ public class OnDiskIndexTest if (offsets == null) actual.put(composedTerm, (offsets = new TreeMap<>())); - offsets.putAll(term.getTokens()); + putAll(offsets, term.getTokenTreeBuilder()); } Assert.assertEquals(actual, expected); @@ -735,7 +741,7 @@ public class OnDiskIndexTest private static TokenTreeBuilder keyBuilder(Long... keys) { - TokenTreeBuilder builder = new TokenTreeBuilder(); + TokenTreeBuilder builder = new DynamicTokenTreeBuilder(); for (final Long key : keys) { @@ -850,9 +856,9 @@ public class OnDiskIndexTest private static void addAll(OnDiskIndexBuilder builder, ByteBuffer term, TokenTreeBuilder tokens) { - for (Map.Entry<Long, LongSet> token : tokens.getTokens().entrySet()) + for (Pair<Long, LongSet> token : tokens) { - for (long position : token.getValue().toArray()) + for (long position : token.right.toArray()) builder.add(term, keyAt(position), position); } }