Avoid index segment stitching in RAM which lead to OOM on big SSTable files

patch by jrwest and xedin; reviewed by xedin for CASSANDRA-11383


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5c4d5c73
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5c4d5c73
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5c4d5c73

Branch: refs/heads/trunk
Commit: 5c4d5c731f1299ba310c81603914a1a8956e644c
Parents: f6c5d72
Author: Jordan West <jorda...@gmail.com>
Authored: Mon Mar 21 12:00:31 2016 -0700
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Sun Mar 27 15:21:16 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../sasi/disk/AbstractTokenTreeBuilder.java     | 672 ++++++++++++++++
 .../sasi/disk/DynamicTokenTreeBuilder.java      | 189 +++++
 .../index/sasi/disk/OnDiskIndexBuilder.java     |  52 +-
 .../index/sasi/disk/PerSSTableIndexWriter.java  |  37 +-
 .../index/sasi/disk/StaticTokenTreeBuilder.java | 266 ++++++
 .../apache/cassandra/index/sasi/disk/Token.java |   5 +
 .../cassandra/index/sasi/disk/TokenTree.java    |   6 +-
 .../index/sasi/disk/TokenTreeBuilder.java       | 805 +------------------
 .../index/sasi/memory/KeyRangeIterator.java     |  11 +
 .../cassandra/index/sasi/sa/SuffixSA.java       |   7 +-
 .../index/sasi/utils/CombinedTerm.java          |  46 +-
 .../index/sasi/disk/OnDiskIndexTest.java        |  20 +-
 .../sasi/disk/PerSSTableIndexWriterTest.java    |  90 +++
 .../index/sasi/disk/TokenTreeTest.java          | 217 +++--
 .../index/sasi/utils/LongIterator.java          |   8 +
 16 files changed, 1482 insertions(+), 950 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f86c91f..2907df9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.5
+ * Avoid index segment stitching in RAM which lead to OOM on big SSTable files 
(CASSANDRA-11383)
  * Fix clustering and row filters for LIKE queries on clustering columns 
(CASSANDRA-11397)
 Merged from 3.0:
  * Enable SO_REUSEADDR for JMX RMI server sockets (CASSANDRA-11093)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java 
b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
new file mode 100644
index 0000000..4e93b2b
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
@@ -0,0 +1,672 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import com.carrotsearch.hppc.LongArrayList;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
+
+public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
+{
+    protected int numBlocks;
+    protected Node root;
+    protected InteriorNode rightmostParent;
+    protected Leaf leftmostLeaf;
+    protected Leaf rightmostLeaf;
+    protected long tokenCount = 0;
+    protected long treeMinToken;
+    protected long treeMaxToken;
+
+    public void add(TokenTreeBuilder other)
+    {
+        add(other.iterator());
+    }
+
+    public TokenTreeBuilder finish()
+    {
+        if (root == null)
+            constructTree();
+
+        return this;
+    }
+
+    public long getTokenCount()
+    {
+        return tokenCount;
+    }
+
+    public int serializedSize()
+    {
+        if (numBlocks == 1)
+            return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16));
+        else
+            return numBlocks * BLOCK_BYTES;
+    }
+
+    public void write(DataOutputPlus out) throws IOException
+    {
+        ByteBuffer blockBuffer = ByteBuffer.allocate(BLOCK_BYTES);
+        Iterator<Node> levelIterator = root.levelIterator();
+        long childBlockIndex = 1;
+
+        while (levelIterator != null)
+        {
+            Node firstChild = null;
+            while (levelIterator.hasNext())
+            {
+                Node block = levelIterator.next();
+
+                if (firstChild == null && !block.isLeaf())
+                    firstChild = ((InteriorNode) block).children.get(0);
+
+                if (block.isSerializable())
+                {
+                    block.serialize(childBlockIndex, blockBuffer);
+                    flushBuffer(blockBuffer, out, numBlocks != 1);
+                }
+
+                childBlockIndex += block.childCount();
+            }
+
+            levelIterator = (firstChild == null) ? null : 
firstChild.levelIterator();
+        }
+    }
+
+    protected abstract void constructTree();
+
+    protected void flushBuffer(ByteBuffer buffer, DataOutputPlus o, boolean 
align) throws IOException
+    {
+        // seek to end of last block before flushing
+        if (align)
+            alignBuffer(buffer, BLOCK_BYTES);
+
+        buffer.flip();
+        o.write(buffer);
+        buffer.clear();
+    }
+
+    protected abstract class Node
+    {
+        protected InteriorNode parent;
+        protected Node next;
+        protected Long nodeMinToken, nodeMaxToken;
+
+        public Node(Long minToken, Long maxToken)
+        {
+            nodeMinToken = minToken;
+            nodeMaxToken = maxToken;
+        }
+
+        public abstract boolean isSerializable();
+        public abstract void serialize(long childBlockIndex, ByteBuffer buf);
+        public abstract int childCount();
+        public abstract int tokenCount();
+
+        public Long smallestToken()
+        {
+            return nodeMinToken;
+        }
+
+        public Long largestToken()
+        {
+            return nodeMaxToken;
+        }
+
+        public Iterator<Node> levelIterator()
+        {
+            return new LevelIterator(this);
+        }
+
+        public boolean isLeaf()
+        {
+            return (this instanceof Leaf);
+        }
+
+        protected boolean isLastLeaf()
+        {
+            return this == rightmostLeaf;
+        }
+
+        protected boolean isRoot()
+        {
+            return this == root;
+        }
+
+        protected void updateTokenRange(long token)
+        {
+            nodeMinToken = nodeMinToken == null ? token : 
Math.min(nodeMinToken, token);
+            nodeMaxToken = nodeMaxToken == null ? token : 
Math.max(nodeMaxToken, token);
+        }
+
+        protected void serializeHeader(ByteBuffer buf)
+        {
+            Header header;
+            if (isRoot())
+                header = new RootHeader();
+            else if (!isLeaf())
+                header = new InteriorNodeHeader();
+            else
+                header = new LeafHeader();
+
+            header.serialize(buf);
+            alignBuffer(buf, BLOCK_HEADER_BYTES);
+        }
+
+        private abstract class Header
+        {
+            public void serialize(ByteBuffer buf)
+            {
+                buf.put(infoByte())
+                   .putShort((short) (tokenCount()))
+                   .putLong(nodeMinToken)
+                   .putLong(nodeMaxToken);
+            }
+
+            protected abstract byte infoByte();
+        }
+
+        private class RootHeader extends Header
+        {
+            public void serialize(ByteBuffer buf)
+            {
+                super.serialize(buf);
+                writeMagic(buf);
+                buf.putLong(tokenCount)
+                   .putLong(treeMinToken)
+                   .putLong(treeMaxToken);
+            }
+
+            protected byte infoByte()
+            {
+                // if leaf, set leaf indicator and last leaf indicator (bits 0 
& 1)
+                // if not leaf, clear both bits
+                return (byte) ((isLeaf()) ? 3 : 0);
+            }
+
+            protected void writeMagic(ByteBuffer buf)
+            {
+                switch (Descriptor.CURRENT_VERSION)
+                {
+                    case Descriptor.VERSION_AB:
+                        buf.putShort(AB_MAGIC);
+                        break;
+
+                    default:
+                        break;
+                }
+
+            }
+        }
+
+        private class InteriorNodeHeader extends Header
+        {
+            // bit 0 (leaf indicator) & bit 1 (last leaf indicator) cleared
+            protected byte infoByte()
+            {
+                return 0;
+            }
+        }
+
+        private class LeafHeader extends Header
+        {
+            // bit 0 set as leaf indicator
+            // bit 1 set if this is last leaf of data
+            protected byte infoByte()
+            {
+                byte infoByte = 1;
+                infoByte |= (isLastLeaf()) ? (1 << LAST_LEAF_SHIFT) : 0;
+
+                return infoByte;
+            }
+        }
+
+    }
+
+    protected abstract class Leaf extends Node
+    {
+        protected LongArrayList overflowCollisions;
+
+        public Leaf(Long minToken, Long maxToken)
+        {
+            super(minToken, maxToken);
+        }
+
+        public int childCount()
+        {
+            return 0;
+        }
+
+        protected void serializeOverflowCollisions(ByteBuffer buf)
+        {
+            if (overflowCollisions != null)
+                for (LongCursor offset : overflowCollisions)
+                    buf.putLong(offset.value);
+        }
+
+        public void serialize(long childBlockIndex, ByteBuffer buf)
+        {
+            serializeHeader(buf);
+            serializeData(buf);
+            serializeOverflowCollisions(buf);
+        }
+
+        protected abstract void serializeData(ByteBuffer buf);
+
+        protected LeafEntry createEntry(final long tok, final LongSet offsets)
+        {
+            int offsetCount = offsets.size();
+            switch (offsetCount)
+            {
+                case 0:
+                    throw new AssertionError("no offsets for token " + tok);
+                case 1:
+                    long offset = offsets.toArray()[0];
+                    if (offset > MAX_OFFSET)
+                        throw new AssertionError("offset " + offset + " cannot 
be greater than " + MAX_OFFSET);
+                    else if (offset <= Integer.MAX_VALUE)
+                        return new SimpleLeafEntry(tok, offset);
+                    else
+                        return new FactoredOffsetLeafEntry(tok, offset);
+                case 2:
+                    long[] rawOffsets = offsets.toArray();
+                    if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= 
Integer.MAX_VALUE &&
+                        (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] <= 
Short.MAX_VALUE))
+                        return new PackedCollisionLeafEntry(tok, rawOffsets);
+                    else
+                        return createOverflowEntry(tok, offsetCount, offsets);
+                default:
+                    return createOverflowEntry(tok, offsetCount, offsets);
+            }
+        }
+
+        private LeafEntry createOverflowEntry(final long tok, final int 
offsetCount, final LongSet offsets)
+        {
+            if (overflowCollisions == null)
+                overflowCollisions = new LongArrayList();
+
+            LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) 
overflowCollisions.size(), (short) offsetCount);
+            for (LongCursor o : offsets) {
+                if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY)
+                    throw new AssertionError("cannot have more than " + 
OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf");
+                else
+                    overflowCollisions.add(o.value);
+            }
+            return entry;
+        }
+
+        protected abstract class LeafEntry
+        {
+            protected final long token;
+
+            abstract public EntryType type();
+            abstract public int offsetData();
+            abstract public short offsetExtra();
+
+            public LeafEntry(final long tok)
+            {
+                token = tok;
+            }
+
+            public void serialize(ByteBuffer buf)
+            {
+                buf.putShort((short) type().ordinal())
+                   .putShort(offsetExtra())
+                   .putLong(token)
+                   .putInt(offsetData());
+            }
+
+        }
+
+
+        // assumes there is a single offset and the offset is <= 
Integer.MAX_VALUE
+        protected class SimpleLeafEntry extends LeafEntry
+        {
+            private final long offset;
+
+            public SimpleLeafEntry(final long tok, final long off)
+            {
+                super(tok);
+                offset = off;
+            }
+
+            public EntryType type()
+            {
+                return EntryType.SIMPLE;
+            }
+
+            public int offsetData()
+            {
+                return (int) offset;
+            }
+
+            public short offsetExtra()
+            {
+                return 0;
+            }
+        }
+
+        // assumes there is a single offset and Integer.MAX_VALUE < offset <= 
MAX_OFFSET
+        // take the middle 32 bits of offset (or the top 32 when considering 
offset is max 48 bits)
+        // and store where offset is normally stored. take bottom 16 bits of 
offset and store in entry header
+        private class FactoredOffsetLeafEntry extends LeafEntry
+        {
+            private final long offset;
+
+            public FactoredOffsetLeafEntry(final long tok, final long off)
+            {
+                super(tok);
+                offset = off;
+            }
+
+            public EntryType type()
+            {
+                return EntryType.FACTORED;
+            }
+
+            public int offsetData()
+            {
+                return (int) (offset >>> Short.SIZE);
+            }
+
+            public short offsetExtra()
+            {
+                return (short) offset;
+            }
+        }
+
+        // holds an entry with two offsets that can be packed in an int & a 
short
+        // the int offset is stored where offset is normally stored. short 
offset is
+        // stored in entry header
+        private class PackedCollisionLeafEntry extends LeafEntry
+        {
+            private short smallerOffset;
+            private int largerOffset;
+
+            public PackedCollisionLeafEntry(final long tok, final long[] offs)
+            {
+                super(tok);
+
+                smallerOffset = (short) Math.min(offs[0], offs[1]);
+                largerOffset = (int) Math.max(offs[0], offs[1]);
+            }
+
+            public EntryType type()
+            {
+                return EntryType.PACKED;
+            }
+
+            public int offsetData()
+            {
+                return largerOffset;
+            }
+
+            public short offsetExtra()
+            {
+                return smallerOffset;
+            }
+        }
+
+        // holds an entry with three or more offsets, or two offsets that 
cannot
+        // be packed into an int & a short. the index into the overflow list
+        // is stored where the offset is normally stored. the number of 
overflowed offsets
+        // for the entry is stored in the entry header
+        private class OverflowCollisionLeafEntry extends LeafEntry
+        {
+            private final short startIndex;
+            private final short count;
+
+            public OverflowCollisionLeafEntry(final long tok, final short 
collisionStartIndex, final short collisionCount)
+            {
+                super(tok);
+                startIndex = collisionStartIndex;
+                count = collisionCount;
+            }
+
+            public EntryType type()
+            {
+                return EntryType.OVERFLOW;
+            }
+
+            public int offsetData()
+            {
+                return startIndex;
+            }
+
+            public short offsetExtra()
+            {
+                return count;
+            }
+
+        }
+
+    }
+
+    protected class InteriorNode extends Node
+    {
+        protected List<Long> tokens = new ArrayList<>(TOKENS_PER_BLOCK);
+        protected List<Node> children = new ArrayList<>(TOKENS_PER_BLOCK + 1);
+        protected int position = 0;
+
+        public InteriorNode()
+        {
+            super(null, null);
+        }
+
+        public boolean isSerializable()
+        {
+            return true;
+        }
+
+        public void serialize(long childBlockIndex, ByteBuffer buf)
+        {
+            serializeHeader(buf);
+            serializeTokens(buf);
+            serializeChildOffsets(childBlockIndex, buf);
+        }
+
+        public int childCount()
+        {
+            return children.size();
+        }
+
+        public int tokenCount()
+        {
+            return tokens.size();
+        }
+
+        public Long smallestToken()
+        {
+            return tokens.get(0);
+        }
+
+        protected void add(Long token, InteriorNode leftChild, InteriorNode 
rightChild)
+        {
+            int pos = tokens.size();
+            if (pos == TOKENS_PER_BLOCK)
+            {
+                InteriorNode sibling = split();
+                sibling.add(token, leftChild, rightChild);
+
+            }
+            else {
+                if (leftChild != null)
+                    children.add(pos, leftChild);
+
+                if (rightChild != null)
+                {
+                    children.add(pos + 1, rightChild);
+                    rightChild.parent = this;
+                }
+
+                updateTokenRange(token);
+                tokens.add(pos, token);
+            }
+        }
+
+        protected void add(Leaf node)
+        {
+
+            if (position == (TOKENS_PER_BLOCK + 1))
+            {
+                rightmostParent = split();
+                rightmostParent.add(node);
+            }
+            else
+            {
+
+                node.parent = this;
+                children.add(position, node);
+                position++;
+
+                // the first child is referenced only during bulk load. we 
don't take a value
+                // to store into the tree, one is subtracted since position 
has already been incremented
+                // for the next node to be added
+                if (position - 1 == 0)
+                    return;
+
+
+                // tokens are inserted one behind the current position, but 2 
is subtracted because
+                // position has already been incremented for the next add
+                Long smallestToken = node.smallestToken();
+                updateTokenRange(smallestToken);
+                tokens.add(position - 2, smallestToken);
+            }
+
+        }
+
+        protected InteriorNode split()
+        {
+            Pair<Long, InteriorNode> splitResult = splitBlock();
+            Long middleValue = splitResult.left;
+            InteriorNode sibling = splitResult.right;
+            InteriorNode leftChild = null;
+
+            // create a new root if necessary
+            if (parent == null)
+            {
+                parent = new InteriorNode();
+                root = parent;
+                sibling.parent = parent;
+                leftChild = this;
+                numBlocks++;
+            }
+
+            parent.add(middleValue, leftChild, sibling);
+
+            return sibling;
+        }
+
+        protected Pair<Long, InteriorNode> splitBlock()
+        {
+            final int splitPosition = TOKENS_PER_BLOCK - 2;
+            InteriorNode sibling = new InteriorNode();
+            sibling.parent = parent;
+            next = sibling;
+
+            Long middleValue = tokens.get(splitPosition);
+
+            for (int i = splitPosition; i < TOKENS_PER_BLOCK; i++)
+            {
+                if (i != TOKENS_PER_BLOCK && i != splitPosition)
+                {
+                    long token = tokens.get(i);
+                    sibling.updateTokenRange(token);
+                    sibling.tokens.add(token);
+                }
+
+                Node child = children.get(i + 1);
+                child.parent = sibling;
+                sibling.children.add(child);
+                sibling.position++;
+            }
+
+            for (int i = TOKENS_PER_BLOCK; i >= splitPosition; i--)
+            {
+                if (i != TOKENS_PER_BLOCK)
+                    tokens.remove(i);
+
+                if (i != splitPosition)
+                    children.remove(i);
+            }
+
+            nodeMinToken = smallestToken();
+            nodeMaxToken = tokens.get(tokens.size() - 1);
+            numBlocks++;
+
+            return Pair.create(middleValue, sibling);
+        }
+
+        protected boolean isFull()
+        {
+            return (position >= TOKENS_PER_BLOCK + 1);
+        }
+
+        private void serializeTokens(ByteBuffer buf)
+        {
+            tokens.forEach(buf::putLong);
+        }
+
+        private void serializeChildOffsets(long childBlockIndex, ByteBuffer 
buf)
+        {
+            for (int i = 0; i < children.size(); i++)
+                buf.putLong((childBlockIndex + i) * BLOCK_BYTES);
+        }
+    }
+
+    public static class LevelIterator extends AbstractIterator<Node>
+    {
+        private Node currentNode;
+
+        LevelIterator(Node first)
+        {
+            currentNode = first;
+        }
+
+        public Node computeNext()
+        {
+            if (currentNode == null)
+                return endOfData();
+
+            Node returnNode = currentNode;
+            currentNode = returnNode.next;
+
+            return returnNode;
+        }
+    }
+
+
+    protected static void alignBuffer(ByteBuffer buffer, int blockSize)
+    {
+        long curPos = buffer.position();
+        if ((curPos & (blockSize - 1)) != 0) // align on the block boundary if 
needed
+            buffer.position((int) FBUtilities.align(curPos, blockSize));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java 
b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
new file mode 100644
index 0000000..2ddfd89
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.Pair;
+
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
+
+public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
+{
+    private final SortedMap<Long, LongSet> tokens = new TreeMap<>();
+
+
+    public DynamicTokenTreeBuilder()
+    {}
+
+    public DynamicTokenTreeBuilder(TokenTreeBuilder data)
+    {
+        add(data);
+    }
+
+    public DynamicTokenTreeBuilder(SortedMap<Long, LongSet> data)
+    {
+        add(data);
+    }
+
+    public void add(Long token, long keyPosition)
+    {
+        LongSet found = tokens.get(token);
+        if (found == null)
+            tokens.put(token, (found = new LongOpenHashSet(2)));
+
+        found.add(keyPosition);
+    }
+
+    public void add(Iterator<Pair<Long, LongSet>> data)
+    {
+        while (data.hasNext())
+        {
+            Pair<Long, LongSet> entry = data.next();
+            for (LongCursor l : entry.right)
+                add(entry.left, l.value);
+        }
+    }
+
+    public void add(SortedMap<Long, LongSet> data)
+    {
+        for (Map.Entry<Long, LongSet> newEntry : data.entrySet())
+        {
+            LongSet found = tokens.get(newEntry.getKey());
+            if (found == null)
+                tokens.put(newEntry.getKey(), (found = new 
LongOpenHashSet(4)));
+
+            for (LongCursor offset : newEntry.getValue())
+                found.add(offset.value);
+        }
+    }
+
+    public Iterator<Pair<Long, LongSet>> iterator()
+    {
+        final Iterator<Map.Entry<Long, LongSet>> iterator = 
tokens.entrySet().iterator();
+        return new AbstractIterator<Pair<Long, LongSet>>()
+        {
+            protected Pair<Long, LongSet> computeNext()
+            {
+                if (!iterator.hasNext())
+                    return endOfData();
+
+                Map.Entry<Long, LongSet> entry = iterator.next();
+                return Pair.create(entry.getKey(), entry.getValue());
+            }
+        };
+    }
+
+    public boolean isEmpty()
+    {
+        return tokens.size() == 0;
+    }
+
+    protected void constructTree()
+    {
+        tokenCount = tokens.size();
+        treeMinToken = tokens.firstKey();
+        treeMaxToken = tokens.lastKey();
+        numBlocks = 1;
+
+        // special case the tree that only has a single block in it (so we 
don't create a useless root)
+        if (tokenCount <= TOKENS_PER_BLOCK)
+        {
+            leftmostLeaf = new DynamicLeaf(tokens);
+            rightmostLeaf = leftmostLeaf;
+            root = leftmostLeaf;
+        }
+        else
+        {
+            root = new InteriorNode();
+            rightmostParent = (InteriorNode) root;
+
+            int i = 0;
+            Leaf lastLeaf = null;
+            Long firstToken = tokens.firstKey();
+            Long finalToken = tokens.lastKey();
+            Long lastToken;
+            for (Long token : tokens.keySet())
+            {
+                if (i == 0 || (i % TOKENS_PER_BLOCK != 0 && i != (tokenCount - 
1)))
+                {
+                    i++;
+                    continue;
+                }
+
+                lastToken = token;
+                Leaf leaf = (i != (tokenCount - 1) || 
token.equals(finalToken)) ?
+                        new DynamicLeaf(tokens.subMap(firstToken, lastToken)) 
: new DynamicLeaf(tokens.tailMap(firstToken));
+
+                if (i == TOKENS_PER_BLOCK)
+                    leftmostLeaf = leaf;
+                else
+                    lastLeaf.next = leaf;
+
+                rightmostParent.add(leaf);
+                lastLeaf = leaf;
+                rightmostLeaf = leaf;
+                firstToken = lastToken;
+                i++;
+                numBlocks++;
+
+                if (token.equals(finalToken))
+                {
+                    Leaf finalLeaf = new DynamicLeaf(tokens.tailMap(token));
+                    lastLeaf.next = finalLeaf;
+                    rightmostParent.add(finalLeaf);
+                    rightmostLeaf = finalLeaf;
+                    numBlocks++;
+                }
+            }
+
+        }
+    }
+
+    private class DynamicLeaf extends Leaf
+    {
+        private final SortedMap<Long, LongSet> tokens;
+
+        DynamicLeaf(SortedMap<Long, LongSet> data)
+        {
+            super(data.firstKey(), data.lastKey());
+            tokens = data;
+        }
+
+        public int tokenCount()
+        {
+            return tokens.size();
+        }
+
+        public boolean isSerializable()
+        {
+            return true;
+        }
+
+        protected void serializeData(ByteBuffer buf)
+        {
+            for (Map.Entry<Long, LongSet> entry : tokens.entrySet())
+                createEntry(entry.getKey(), entry.getValue()).serialize(buf);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java 
b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
index 04b7b1c..c14f76c 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
@@ -162,7 +162,7 @@ public class OnDiskIndexBuilder
         TokenTreeBuilder tokens = terms.get(term);
         if (tokens == null)
         {
-            terms.put(term, (tokens = new TokenTreeBuilder()));
+            terms.put(term, (tokens = new DynamicTokenTreeBuilder()));
 
             // on-heap size estimates from jol
             // 64 bytes for TTB + 48 bytes for TreeMap in TTB + size bytes for 
the term (map key)
@@ -269,8 +269,8 @@ public class OnDiskIndexBuilder
 
             out.skipBytes((int) (BLOCK_SIZE - out.position()));
 
-            dataLevel = mode == Mode.SPARSE ? new DataBuilderLevel(out, new 
MutableDataBlock(mode))
-                                            : new MutableLevel<>(out, new 
MutableDataBlock(mode));
+            dataLevel = mode == Mode.SPARSE ? new DataBuilderLevel(out, new 
MutableDataBlock(termComparator, mode))
+                                            : new MutableLevel<>(out, new 
MutableDataBlock(termComparator, mode));
             while (terms.hasNext())
             {
                 Pair<ByteBuffer, TokenTreeBuilder> term = terms.next();
@@ -454,7 +454,7 @@ public class OnDiskIndexBuilder
         public DataBuilderLevel(SequentialWriter out, 
MutableBlock<InMemoryDataTerm> block)
         {
             super(out, block);
-            superBlockTree = new TokenTreeBuilder();
+            superBlockTree = new DynamicTokenTreeBuilder();
         }
 
         public InMemoryPointerTerm add(InMemoryDataTerm term) throws 
IOException
@@ -465,20 +465,20 @@ public class OnDiskIndexBuilder
                 dataBlocksCnt++;
                 flushSuperBlock(false);
             }
-            superBlockTree.add(term.keys.getTokens());
+            superBlockTree.add(term.keys);
             return ptr;
         }
 
         public void flushSuperBlock(boolean force) throws IOException
         {
-            if (dataBlocksCnt == SUPER_BLOCK_SIZE || (force && 
!superBlockTree.getTokens().isEmpty()))
+            if (dataBlocksCnt == SUPER_BLOCK_SIZE || (force && 
!superBlockTree.isEmpty()))
             {
                 superBlockOffsets.add(out.position());
                 superBlockTree.finish().write(out);
                 alignToBlock(out);
 
                 dataBlocksCnt = 0;
-                superBlockTree = new TokenTreeBuilder();
+                superBlockTree = new DynamicTokenTreeBuilder();
             }
         }
 
@@ -549,28 +549,34 @@ public class OnDiskIndexBuilder
 
     private static class MutableDataBlock extends 
MutableBlock<InMemoryDataTerm>
     {
+        private static final int MAX_KEYS_SPARSE = 5;
+
+        private final AbstractType<?> comparator;
         private final Mode mode;
 
         private int offset = 0;
-        private int sparseValueTerms = 0;
 
         private final List<TokenTreeBuilder> containers = new ArrayList<>();
         private TokenTreeBuilder combinedIndex;
 
-        public MutableDataBlock(Mode mode)
+        public MutableDataBlock(AbstractType<?> comparator, Mode mode)
         {
+            this.comparator = comparator;
             this.mode = mode;
-            this.combinedIndex = new TokenTreeBuilder();
+            this.combinedIndex = initCombinedIndex();
         }
 
         protected void addInternal(InMemoryDataTerm term) throws IOException
         {
             TokenTreeBuilder keys = term.keys;
 
-            if (mode == Mode.SPARSE && keys.getTokenCount() <= 5)
+            if (mode == Mode.SPARSE)
             {
+                if (keys.getTokenCount() > MAX_KEYS_SPARSE)
+                    throw new IOException(String.format("Term - '%s' belongs 
to more than %d keys in %s mode, which is not allowed.",
+                                                        
comparator.getString(term.term), MAX_KEYS_SPARSE, mode.name()));
+
                 writeTerm(term, keys);
-                sparseValueTerms++;
             }
             else
             {
@@ -581,7 +587,7 @@ public class OnDiskIndexBuilder
             }
 
             if (mode == Mode.SPARSE)
-                combinedIndex.add(keys.getTokens());
+                combinedIndex.add(keys);
         }
 
         protected int sizeAfter(InMemoryDataTerm element)
@@ -593,7 +599,7 @@ public class OnDiskIndexBuilder
         {
             super.flushAndClear(out);
 
-            out.writeInt((sparseValueTerms == 0) ? -1 : offset);
+            out.writeInt(mode == Mode.SPARSE ? offset : -1);
 
             if (containers.size() > 0)
             {
@@ -601,18 +607,15 @@ public class OnDiskIndexBuilder
                     tokens.write(out);
             }
 
-            if (sparseValueTerms > 0)
-            {
+            if (mode == Mode.SPARSE && combinedIndex != null)
                 combinedIndex.finish().write(out);
-            }
 
             alignToBlock(out);
 
             containers.clear();
-            combinedIndex = new TokenTreeBuilder();
+            combinedIndex = initCombinedIndex();
 
             offset = 0;
-            sparseValueTerms = 0;
         }
 
         private int ptrLength(InMemoryDataTerm term)
@@ -626,10 +629,8 @@ public class OnDiskIndexBuilder
         {
             term.serialize(buffer);
             buffer.writeByte((byte) keys.getTokenCount());
-
-            Iterator<Pair<Long, LongSet>> tokens = keys.iterator();
-            while (tokens.hasNext())
-                buffer.writeLong(tokens.next().left);
+            for (Pair<Long, LongSet> key : keys)
+                buffer.writeLong(key.left);
         }
 
         private void writeTerm(InMemoryTerm term, int offset) throws 
IOException
@@ -638,5 +639,10 @@ public class OnDiskIndexBuilder
             buffer.writeByte(0x0);
             buffer.writeInt(offset);
         }
+
+        private TokenTreeBuilder initCombinedIndex()
+        {
+            return mode == Mode.SPARSE ? new DynamicTokenTreeBuilder() : null;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java 
b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
index 6e63c71..34737ae 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.index.sasi.conf.ColumnIndex;
 import org.apache.cassandra.index.sasi.utils.CombinedTermIterator;
 import org.apache.cassandra.index.sasi.utils.TypeUtil;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
 import org.apache.cassandra.io.util.FileUtils;
@@ -126,7 +127,7 @@ public class PerSSTableIndexWriter implements 
SSTableFlushObserver
 
             Index index = indexes.get(column);
             if (index == null)
-                indexes.put(column, (index = new Index(columnIndex)));
+                indexes.put(column, (index = newIndex(columnIndex)));
 
             index.add(value.duplicate(), currentKey, currentKeyPosition);
         });
@@ -165,10 +166,18 @@ public class PerSSTableIndexWriter implements 
SSTableFlushObserver
     }
 
     @VisibleForTesting
+    protected Index newIndex(ColumnIndex columnIndex)
+    {
+        return new Index(columnIndex);
+    }
+
+    @VisibleForTesting
     protected class Index
     {
+        @VisibleForTesting
+        protected final String outputFile;
+
         private final ColumnIndex columnIndex;
-        private final String outputFile;
         private final AbstractAnalyzer analyzer;
         private final long maxMemorySize;
 
@@ -245,17 +254,22 @@ public class PerSSTableIndexWriter implements 
SSTableFlushObserver
             final String segmentFile = filename(isFinal);
 
             return () -> {
-                long start1 = System.nanoTime();
+                long start = System.nanoTime();
 
                 try
                 {
                     File index = new File(segmentFile);
                     return builder.finish(index) ? new OnDiskIndex(index, 
columnIndex.getValidator(), null) : null;
                 }
+                catch (Exception | FSError e)
+                {
+                    logger.error("Failed to build index segment {}", 
segmentFile, e);
+                    return null;
+                }
                 finally
                 {
                     if (!isFinal)
-                        logger.info("Flushed index segment {}, took {} ms.", 
segmentFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start1));
+                        logger.info("Flushed index segment {}, took {} ms.", 
segmentFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
                 }
             };
         }
@@ -290,7 +304,7 @@ public class PerSSTableIndexWriter implements 
SSTableFlushObserver
 
                     for (Future<OnDiskIndex> f : segments)
                     {
-                        OnDiskIndex part = Futures.getUnchecked(f);
+                        OnDiskIndex part = f.get();
                         if (part == null)
                             continue;
 
@@ -304,7 +318,7 @@ public class PerSSTableIndexWriter implements 
SSTableFlushObserver
                                    new File(outputFile),
                                    new CombinedTermIterator(parts));
                 }
-                catch (Exception e)
+                catch (Exception | FSError e)
                 {
                     logger.error("Failed to flush index {}.", outputFile, e);
                     FileUtils.delete(outputFile);
@@ -313,13 +327,14 @@ public class PerSSTableIndexWriter implements 
SSTableFlushObserver
                 {
                     logger.info("Index flush to {} took {} ms.", outputFile, 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start1));
 
-                    for (OnDiskIndex part : parts)
+                    for (int segment = 0; segment < segmentNumber; segment++)
                     {
-                        if (part == null)
-                            continue;
+                        OnDiskIndex part = parts[segment];
+
+                        if (part != null)
+                            FileUtils.closeQuietly(part);
 
-                        FileUtils.closeQuietly(part);
-                        FileUtils.delete(part.getIndexPath());
+                        FileUtils.delete(outputFile + "_" + segment);
                     }
 
                     latch.countDown();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java 
b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
new file mode 100644
index 0000000..147427e
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.SortedMap;
+
+import org.apache.cassandra.index.sasi.utils.CombinedTerm;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.Pair;
+
+import com.carrotsearch.hppc.LongSet;
+import com.google.common.collect.Iterators;
+
+/**
+ * Intended usage of this class is to be used in place of {@link 
DynamicTokenTreeBuilder}
+ * when multiple index segments produced by {@link PerSSTableIndexWriter} are 
stitched together
+ * by {@link PerSSTableIndexWriter#complete()}.
+ *
+ * This class uses the RangeIterator, now provided by
+ * {@link CombinedTerm#getTokenIterator()}, to iterate the data twice.
+ * The first iteration builds the tree with leaves that contain only enough
+ * information to build the upper layers -- these leaves do not store more
+ * than their minimum and maximum tokens plus their total size, which makes 
them
+ * un-serializable.
+ *
+ * When the tree is written to disk the final layer is not
+ * written. Its at this point the data is iterated once again to write
+ * the leaves to disk. This (logarithmically) reduces copying of the
+ * token values while building and writing upper layers of the tree,
+ * removes the use of SortedMap when combining SAs, and relies on the
+ * memory mapped SAs otherwise, greatly improving performance and no
+ * longer causing OOMs when TokenTree sizes are big.
+ *
+ * See https://issues.apache.org/jira/browse/CASSANDRA-11383 for more details.
+ */
+public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder
+{
+    private final CombinedTerm combinedTerm;
+
+    public StaticTokenTreeBuilder(CombinedTerm term)
+    {
+        combinedTerm = term;
+    }
+
+    public void add(Long token, long keyPosition)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public void add(SortedMap<Long, LongSet> data)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public void add(Iterator<Pair<Long, LongSet>> data)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean isEmpty()
+    {
+        return combinedTerm.getTokenIterator().getCount() == 0;
+    }
+
+    public Iterator<Pair<Long, LongSet>> iterator()
+    {
+        Iterator<Token> iterator = combinedTerm.getTokenIterator();
+        return new AbstractIterator<Pair<Long, LongSet>>()
+        {
+            protected Pair<Long, LongSet> computeNext()
+            {
+                if (!iterator.hasNext())
+                    return endOfData();
+
+                Token token = iterator.next();
+                return Pair.create(token.get(), token.getOffsets());
+            }
+        };
+    }
+
+    public long getTokenCount()
+    {
+        return combinedTerm.getTokenIterator().getCount();
+    }
+
+    @Override
+    public void write(DataOutputPlus out) throws IOException
+    {
+        // if the root is not a leaf then none of the leaves have been written 
(all are PartialLeaf)
+        // so write out the last layer of the tree by converting PartialLeaf 
to StaticLeaf and
+        // iterating the data once more
+        super.write(out);
+        if (root.isLeaf())
+            return;
+
+        RangeIterator<Long, Token> tokens = combinedTerm.getTokenIterator();
+        ByteBuffer blockBuffer = ByteBuffer.allocate(BLOCK_BYTES);
+        Iterator<Node> leafIterator = leftmostLeaf.levelIterator();
+        while (leafIterator.hasNext())
+        {
+            Leaf leaf = (Leaf) leafIterator.next();
+            Leaf writeableLeaf = new StaticLeaf(Iterators.limit(tokens, 
leaf.tokenCount()), leaf);
+            writeableLeaf.serialize(-1, blockBuffer);
+            flushBuffer(blockBuffer, out, true);
+        }
+
+    }
+
+    protected void constructTree()
+    {
+        RangeIterator<Long, Token> tokens = combinedTerm.getTokenIterator();
+
+        tokenCount = tokens.getCount();
+        treeMinToken = tokens.getMinimum();
+        treeMaxToken = tokens.getMaximum();
+        numBlocks = 1;
+
+        if (tokenCount <= TOKENS_PER_BLOCK)
+        {
+            leftmostLeaf = new StaticLeaf(tokens, tokens.getMinimum(), 
tokens.getMaximum(), tokens.getCount(), true);
+            rightmostLeaf = leftmostLeaf;
+            root = leftmostLeaf;
+        }
+        else
+        {
+            root = new InteriorNode();
+            rightmostParent = (InteriorNode) root;
+
+            // build all the leaves except for maybe
+            // the last leaf which is not completely full .
+            // This loop relies on the fact that multiple index segments
+            // will never have token intersection for a single term,
+            // because it's impossible to encounter the same value for
+            // the same column multiple times in a single key/sstable.
+            Leaf lastLeaf = null;
+            long numFullLeaves = tokenCount / TOKENS_PER_BLOCK;
+            for (long i = 0; i < numFullLeaves; i++)
+            {
+                Long firstToken = tokens.next().get();
+                for (int j = 1; j < (TOKENS_PER_BLOCK - 1); j++)
+                    tokens.next();
+
+                Long lastToken = tokens.next().get();
+                Leaf leaf = new PartialLeaf(firstToken, lastToken, 
TOKENS_PER_BLOCK);
+
+                if (lastLeaf == null)
+                    leftmostLeaf = leaf;
+                else
+                    lastLeaf.next = leaf;
+
+                rightmostParent.add(leaf);
+                lastLeaf = rightmostLeaf = leaf;
+                numBlocks++;
+            }
+
+            // build the last leaf out of any remaining tokens if necessary
+            // safe downcast since TOKENS_PER_BLOCK is an int
+            int remainingTokens = (int) (tokenCount % TOKENS_PER_BLOCK);
+            if (remainingTokens != 0)
+            {
+                Long firstToken = tokens.next().get();
+                Long lastToken = firstToken;
+                while (tokens.hasNext())
+                    lastToken = tokens.next().get();
+
+                Leaf leaf = new PartialLeaf(firstToken, lastToken, 
remainingTokens);
+                rightmostParent.add(leaf);
+                lastLeaf.next = rightmostLeaf = leaf;
+                numBlocks++;
+            }
+        }
+    }
+
+    // This denotes the leaf which only has min/max and token counts
+    // but doesn't have any associated data yet, so it can't be serialized.
+    private class PartialLeaf extends Leaf
+    {
+        private final int size;
+        public PartialLeaf(Long min, Long max, int count)
+        {
+            super(min, max);
+            size = count;
+        }
+
+        public int tokenCount()
+        {
+            return size;
+        }
+
+        public void serializeData(ByteBuffer buf)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean isSerializable()
+        {
+            return false;
+        }
+    }
+
+    // This denotes the leaf which has been filled with data and is ready to 
be serialized
+    private class StaticLeaf extends Leaf
+    {
+        private final Iterator<Token> tokens;
+        private final int count;
+        private final boolean isLast;
+
+        public StaticLeaf(Iterator<Token> tokens, Leaf leaf)
+        {
+            this(tokens, leaf.smallestToken(), leaf.largestToken(), 
leaf.tokenCount(), leaf.isLastLeaf());
+        }
+
+        public StaticLeaf(Iterator<Token> tokens, Long min, Long max, long 
count, boolean isLastLeaf)
+        {
+            super(min, max);
+
+            this.count = (int) count; // downcast is safe since leaf size is 
always < Integer.MAX_VALUE
+            this.tokens = tokens;
+            this.isLast = isLastLeaf;
+        }
+
+        public boolean isLastLeaf()
+        {
+            return isLast;
+        }
+
+        public int tokenCount()
+        {
+            return count;
+        }
+
+        public void serializeData(ByteBuffer buf)
+        {
+            while (tokens.hasNext())
+            {
+                Token entry = tokens.next();
+                createEntry(entry.get(), entry.getOffsets()).serialize(buf);
+            }
+        }
+
+        public boolean isSerializable()
+        {
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Token.java 
b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
index 02130a3..4cd1ea3 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/Token.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
@@ -18,9 +18,12 @@
 package org.apache.cassandra.index.sasi.disk;
 
 import com.google.common.primitives.Longs;
+
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.index.sasi.utils.CombinedValue;
 
+import com.carrotsearch.hppc.LongSet;
+
 public abstract class Token implements CombinedValue<Long>, 
Iterable<DecoratedKey>
 {
     protected final long token;
@@ -35,6 +38,8 @@ public abstract class Token implements CombinedValue<Long>, 
Iterable<DecoratedKe
         return token;
     }
 
+    public abstract LongSet getOffsets();
+
     public int compareTo(CombinedValue<Long> o)
     {
         return Longs.compare(token, ((Token) o).token);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java 
b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
index 5d85d00..3f8182d 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
@@ -27,6 +27,8 @@ import org.apache.cassandra.index.sasi.utils.MappedBuffer;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
 import org.apache.cassandra.utils.MergeIterator;
 
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
@@ -406,9 +408,9 @@ public class TokenTree
             });
         }
 
-        public Set<Long> getOffsets()
+        public LongSet getOffsets()
         {
-            Set<Long> offsets = new HashSet<>();
+            LongSet offsets = new LongOpenHashSet(4);
             for (TokenInfo i : info)
             {
                 for (long offset : i.fetchOffsets())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java 
b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
index e10b057..2210964 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
@@ -18,21 +18,26 @@
 package org.apache.cassandra.index.sasi.disk;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
-import com.carrotsearch.hppc.LongArrayList;
 import com.carrotsearch.hppc.LongSet;
-import com.carrotsearch.hppc.cursors.LongCursor;
-import com.carrotsearch.hppc.LongOpenHashSet;
-import com.google.common.collect.AbstractIterator;
 
-public class TokenTreeBuilder
+public interface TokenTreeBuilder extends Iterable<Pair<Long, LongSet>>
 {
+    int BLOCK_BYTES = 4096;
+    int BLOCK_HEADER_BYTES = 64;
+    int OVERFLOW_TRAILER_BYTES = 64;
+    int OVERFLOW_TRAILER_CAPACITY = OVERFLOW_TRAILER_BYTES / 8;
+    int TOKENS_PER_BLOCK = (BLOCK_BYTES - BLOCK_HEADER_BYTES - 
OVERFLOW_TRAILER_BYTES) / 16;
+    long MAX_OFFSET = (1L << 47) - 1; // 48 bits for (signed) offset
+    byte LAST_LEAF_SHIFT = 1;
+    byte SHARED_HEADER_BYTES = 19;
+    byte ENTRY_TYPE_MASK = 0x03;
+    short AB_MAGIC = 0x5A51;
+
     // note: ordinal positions are used here, do not change order
     enum EntryType
     {
@@ -56,784 +61,16 @@ public class TokenTreeBuilder
         }
     }
 
-    public static final int BLOCK_BYTES = 4096;
-    public static final int BLOCK_HEADER_BYTES = 64;
-    public static final int OVERFLOW_TRAILER_BYTES = 64;
-    public static final int OVERFLOW_TRAILER_CAPACITY = OVERFLOW_TRAILER_BYTES 
/ 8;
-    public static final int TOKENS_PER_BLOCK = (BLOCK_BYTES - 
BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / 16;
-    public static final long MAX_OFFSET = (1L << 47) - 1; // 48 bits for 
(signed) offset
-    public static final byte LAST_LEAF_SHIFT = 1;
-    public static final byte SHARED_HEADER_BYTES = 19;
-    public static final byte ENTRY_TYPE_MASK = 0x03;
-    public static final short AB_MAGIC = 0x5A51;
-
-    private final SortedMap<Long, LongSet> tokens = new TreeMap<>();
-    private int numBlocks;
-
-    private Node root;
-    private InteriorNode rightmostParent;
-    private Leaf leftmostLeaf;
-    private Leaf rightmostLeaf;
-    private long tokenCount = 0;
-    private long treeMinToken;
-    private long treeMaxToken;
-
-    public TokenTreeBuilder()
-    {}
-
-    public TokenTreeBuilder(SortedMap<Long, LongSet> data)
-    {
-        add(data);
-    }
-
-    public void add(Long token, long keyPosition)
-    {
-        LongSet found = tokens.get(token);
-        if (found == null)
-            tokens.put(token, (found = new LongOpenHashSet(2)));
-
-        found.add(keyPosition);
-    }
-
-    public void add(SortedMap<Long, LongSet> data)
-    {
-        for (Map.Entry<Long, LongSet> newEntry : data.entrySet())
-        {
-            LongSet found = tokens.get(newEntry.getKey());
-            if (found == null)
-                tokens.put(newEntry.getKey(), (found = new 
LongOpenHashSet(4)));
-
-            for (LongCursor offset : newEntry.getValue())
-                found.add(offset.value);
-        }
-    }
-
-    public TokenTreeBuilder finish()
-    {
-        maybeBulkLoad();
-        return this;
-    }
-
-    public SortedMap<Long, LongSet> getTokens()
-    {
-        return tokens;
-    }
-
-    public long getTokenCount()
-    {
-        return tokenCount;
-    }
-
-    public int serializedSize()
-    {
-        if (numBlocks == 1)
-            return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16));
-        else
-            return numBlocks * BLOCK_BYTES;
-    }
-
-    public void write(DataOutputPlus out) throws IOException
-    {
-        ByteBuffer blockBuffer = ByteBuffer.allocate(BLOCK_BYTES);
-        Iterator<Node> levelIterator = root.levelIterator();
-        long childBlockIndex = 1;
-
-        while (levelIterator != null)
-        {
-
-            Node firstChild = null;
-            while (levelIterator.hasNext())
-            {
-                Node block = levelIterator.next();
-
-                if (firstChild == null && !block.isLeaf())
-                    firstChild = ((InteriorNode) block).children.get(0);
-
-                block.serialize(childBlockIndex, blockBuffer);
-                flushBuffer(blockBuffer, out, numBlocks != 1);
-
-                childBlockIndex += block.childCount();
-            }
-
-            levelIterator = (firstChild == null) ? null : 
firstChild.levelIterator();
-        }
-    }
-
-    public Iterator<Pair<Long, LongSet>> iterator()
-    {
-        return new TokenIterator(leftmostLeaf.levelIterator());
-    }
-
-    private void maybeBulkLoad()
-    {
-        if (root == null)
-            bulkLoad();
-    }
-
-    private void flushBuffer(ByteBuffer buffer, DataOutputPlus o, boolean 
align) throws IOException
-    {
-        // seek to end of last block before flushing
-        if (align)
-            alignBuffer(buffer, BLOCK_BYTES);
-
-        buffer.flip();
-        o.write(buffer);
-        buffer.clear();
-    }
-
-    private static void alignBuffer(ByteBuffer buffer, int blockSize)
-    {
-        long curPos = buffer.position();
-        if ((curPos & (blockSize - 1)) != 0) // align on the block boundary if 
needed
-            buffer.position((int) FBUtilities.align(curPos, blockSize));
-    }
-
-    private void bulkLoad()
-    {
-        tokenCount = tokens.size();
-        treeMinToken = tokens.firstKey();
-        treeMaxToken = tokens.lastKey();
-        numBlocks = 1;
-
-        // special case the tree that only has a single block in it (so we 
don't create a useless root)
-        if (tokenCount <= TOKENS_PER_BLOCK)
-        {
-            leftmostLeaf = new Leaf(tokens);
-            rightmostLeaf = leftmostLeaf;
-            root = leftmostLeaf;
-        }
-        else
-        {
-            root = new InteriorNode();
-            rightmostParent = (InteriorNode) root;
-
-            int i = 0;
-            Leaf lastLeaf = null;
-            Long firstToken = tokens.firstKey();
-            Long finalToken = tokens.lastKey();
-            Long lastToken;
-            for (Long token : tokens.keySet())
-            {
-                if (i == 0 || (i % TOKENS_PER_BLOCK != 0 && i != (tokenCount - 
1)))
-                {
-                    i++;
-                    continue;
-                }
-
-                lastToken = token;
-                Leaf leaf = (i != (tokenCount - 1) || 
token.equals(finalToken)) ?
-                        new Leaf(tokens.subMap(firstToken, lastToken)) : new 
Leaf(tokens.tailMap(firstToken));
-
-                if (i == TOKENS_PER_BLOCK)
-                    leftmostLeaf = leaf;
-                else
-                    lastLeaf.next = leaf;
-
-                rightmostParent.add(leaf);
-                lastLeaf = leaf;
-                rightmostLeaf = leaf;
-                firstToken = lastToken;
-                i++;
-                numBlocks++;
-
-                if (token.equals(finalToken))
-                {
-                    Leaf finalLeaf = new Leaf(tokens.tailMap(token));
-                    lastLeaf.next = finalLeaf;
-                    rightmostParent.add(finalLeaf);
-                    rightmostLeaf = finalLeaf;
-                    numBlocks++;
-                }
-            }
-
-        }
-    }
-
-    private abstract class Node
-    {
-        protected InteriorNode parent;
-        protected Node next;
-        protected Long nodeMinToken, nodeMaxToken;
-
-        public abstract void serialize(long childBlockIndex, ByteBuffer buf);
-        public abstract int childCount();
-        public abstract int tokenCount();
-        public abstract Long smallestToken();
-
-        public Iterator<Node> levelIterator()
-        {
-            return new LevelIterator(this);
-        }
-
-        public boolean isLeaf()
-        {
-            return (this instanceof Leaf);
-        }
-
-        protected boolean isLastLeaf()
-        {
-            return this == rightmostLeaf;
-        }
-
-        protected boolean isRoot()
-        {
-            return this == root;
-        }
-
-        protected void updateTokenRange(long token)
-        {
-            nodeMinToken = nodeMinToken == null ? token : 
Math.min(nodeMinToken, token);
-            nodeMaxToken = nodeMaxToken == null ? token : 
Math.max(nodeMaxToken, token);
-        }
-
-        protected void serializeHeader(ByteBuffer buf)
-        {
-            Header header;
-            if (isRoot())
-                header = new RootHeader();
-            else if (!isLeaf())
-                header = new InteriorNodeHeader();
-            else
-                header = new LeafHeader();
-
-            header.serialize(buf);
-            alignBuffer(buf, BLOCK_HEADER_BYTES);
-        }
-
-        private abstract class Header
-        {
-            public void serialize(ByteBuffer buf)
-            {
-                buf.put(infoByte())
-                        .putShort((short) (tokenCount()))
-                        .putLong(nodeMinToken)
-                        .putLong(nodeMaxToken);
-            }
-
-            protected abstract byte infoByte();
-        }
-
-        private class RootHeader extends Header
-        {
-            public void serialize(ByteBuffer buf)
-            {
-                super.serialize(buf);
-                writeMagic(buf);
-                buf.putLong(tokenCount)
-                        .putLong(treeMinToken)
-                        .putLong(treeMaxToken);
-            }
-
-            protected byte infoByte()
-            {
-                // if leaf, set leaf indicator and last leaf indicator (bits 0 
& 1)
-                // if not leaf, clear both bits
-                return (byte) ((isLeaf()) ? 3 : 0);
-            }
-
-            protected void writeMagic(ByteBuffer buf)
-            {
-                switch (Descriptor.CURRENT_VERSION)
-                {
-                    case Descriptor.VERSION_AB:
-                        buf.putShort(AB_MAGIC);
-                        break;
-                    default:
-                        break;
-                }
-
-            }
-        }
-
-        private class InteriorNodeHeader extends Header
-        {
-            // bit 0 (leaf indicator) & bit 1 (last leaf indicator) cleared
-            protected byte infoByte()
-            {
-                return 0;
-            }
-        }
-
-        private class LeafHeader extends Header
-        {
-            // bit 0 set as leaf indicator
-            // bit 1 set if this is last leaf of data
-            protected byte infoByte()
-            {
-                byte infoByte = 1;
-                infoByte |= (isLastLeaf()) ? (1 << LAST_LEAF_SHIFT) : 0;
-
-                return infoByte;
-            }
-        }
-
-    }
-
-    private class Leaf extends Node
-    {
-        private final SortedMap<Long, LongSet> tokens;
-        private LongArrayList overflowCollisions;
-
-        Leaf(SortedMap<Long, LongSet> data)
-        {
-            nodeMinToken = data.firstKey();
-            nodeMaxToken = data.lastKey();
-            tokens = data;
-        }
-
-        public Long largestToken()
-        {
-            return nodeMaxToken;
-        }
-
-        public void serialize(long childBlockIndex, ByteBuffer buf)
-        {
-            serializeHeader(buf);
-            serializeData(buf);
-            serializeOverflowCollisions(buf);
-        }
-
-        public int childCount()
-        {
-            return 0;
-        }
-
-        public int tokenCount()
-        {
-            return tokens.size();
-        }
-
-        public Long smallestToken()
-        {
-            return nodeMinToken;
-        }
-
-        public Iterator<Map.Entry<Long, LongSet>> tokenIterator()
-        {
-            return tokens.entrySet().iterator();
-        }
-
-        private void serializeData(ByteBuffer buf)
-        {
-            for (Map.Entry<Long, LongSet> entry : tokens.entrySet())
-                createEntry(entry.getKey(), entry.getValue()).serialize(buf);
-        }
-
-        private void serializeOverflowCollisions(ByteBuffer buf)
-        {
-            if (overflowCollisions != null)
-                for (LongCursor offset : overflowCollisions)
-                    buf.putLong(offset.value);
-        }
-
-
-        private LeafEntry createEntry(final long tok, final LongSet offsets)
-        {
-            int offsetCount = offsets.size();
-            switch (offsetCount)
-            {
-                case 0:
-                    throw new AssertionError("no offsets for token " + tok);
-                case 1:
-                    long offset = offsets.toArray()[0];
-                    if (offset > MAX_OFFSET)
-                        throw new AssertionError("offset " + offset + " cannot 
be greater than " + MAX_OFFSET);
-                    else if (offset <= Integer.MAX_VALUE)
-                        return new SimpleLeafEntry(tok, offset);
-                    else
-                        return new FactoredOffsetLeafEntry(tok, offset);
-                case 2:
-                    long[] rawOffsets = offsets.toArray();
-                    if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= 
Integer.MAX_VALUE &&
-                            (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] 
<= Short.MAX_VALUE))
-                        return new PackedCollisionLeafEntry(tok, rawOffsets);
-                    else
-                        return createOverflowEntry(tok, offsetCount, offsets);
-                default:
-                    return createOverflowEntry(tok, offsetCount, offsets);
-            }
-        }
-
-        private LeafEntry createOverflowEntry(final long tok, final int 
offsetCount, final LongSet offsets)
-        {
-            if (overflowCollisions == null)
-                overflowCollisions = new LongArrayList();
-
-            LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) 
overflowCollisions.size(), (short) offsetCount);
-            for (LongCursor o : offsets) {
-                if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY)
-                    throw new AssertionError("cannot have more than " + 
OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf");
-                else
-                    overflowCollisions.add(o.value);
-            }
-            return entry;
-        }
-
-        private abstract class LeafEntry
-        {
-            protected final long token;
-
-            abstract public EntryType type();
-            abstract public int offsetData();
-            abstract public short offsetExtra();
-
-            public LeafEntry(final long tok)
-            {
-                token = tok;
-            }
-
-            public void serialize(ByteBuffer buf)
-            {
-                buf.putShort((short) type().ordinal())
-                        .putShort(offsetExtra())
-                        .putLong(token)
-                        .putInt(offsetData());
-            }
-
-        }
-
-
-        // assumes there is a single offset and the offset is <= 
Integer.MAX_VALUE
-        private class SimpleLeafEntry extends LeafEntry
-        {
-            private final long offset;
-
-            public SimpleLeafEntry(final long tok, final long off)
-            {
-                super(tok);
-                offset = off;
-            }
-
-            public EntryType type()
-            {
-                return EntryType.SIMPLE;
-            }
-
-            public int offsetData()
-            {
-                return (int) offset;
-            }
-
-            public short offsetExtra()
-            {
-                return 0;
-            }
-        }
-
-        // assumes there is a single offset and Integer.MAX_VALUE < offset <= 
MAX_OFFSET
-        // take the middle 32 bits of offset (or the top 32 when considering 
offset is max 48 bits)
-        // and store where offset is normally stored. take bottom 16 bits of 
offset and store in entry header
-        private class FactoredOffsetLeafEntry extends LeafEntry
-        {
-            private final long offset;
-
-            public FactoredOffsetLeafEntry(final long tok, final long off)
-            {
-                super(tok);
-                offset = off;
-            }
-
-            public EntryType type()
-            {
-                return EntryType.FACTORED;
-            }
-
-            public int offsetData()
-            {
-                return (int) (offset >>> Short.SIZE);
-            }
-
-            public short offsetExtra()
-            {
-                return (short) offset;
-            }
-        }
-
-        // holds an entry with two offsets that can be packed in an int & a 
short
-        // the int offset is stored where offset is normally stored. short 
offset is
-        // stored in entry header
-        private class PackedCollisionLeafEntry extends LeafEntry
-        {
-            private short smallerOffset;
-            private int largerOffset;
-
-            public PackedCollisionLeafEntry(final long tok, final long[] offs)
-            {
-                super(tok);
-
-                smallerOffset = (short) Math.min(offs[0], offs[1]);
-                largerOffset = (int) Math.max(offs[0], offs[1]);
-            }
-
-            public EntryType type()
-            {
-                return EntryType.PACKED;
-            }
-
-            public int offsetData()
-            {
-                return largerOffset;
-            }
-
-            public short offsetExtra()
-            {
-                return smallerOffset;
-            }
-        }
-
-        // holds an entry with three or more offsets, or two offsets that 
cannot
-        // be packed into an int & a short. the index into the overflow list
-        // is stored where the offset is normally stored. the number of 
overflowed offsets
-        // for the entry is stored in the entry header
-        private class OverflowCollisionLeafEntry extends LeafEntry
-        {
-            private final short startIndex;
-            private final short count;
-
-            public OverflowCollisionLeafEntry(final long tok, final short 
collisionStartIndex, final short collisionCount)
-            {
-                super(tok);
-                startIndex = collisionStartIndex;
-                count = collisionCount;
-            }
-
-            public EntryType type()
-            {
-                return EntryType.OVERFLOW;
-            }
-
-            public int offsetData()
-            {
-                return startIndex;
-            }
-
-            public short offsetExtra()
-            {
-                return count;
-            }
-
-        }
-
-    }
-
-    private class InteriorNode extends Node
-    {
-        private List<Long> tokens = new ArrayList<>(TOKENS_PER_BLOCK);
-        private List<Node> children = new ArrayList<>(TOKENS_PER_BLOCK + 1);
-        private int position = 0; // TODO (jwest): can get rid of this and use 
array size
-
-
-        public void serialize(long childBlockIndex, ByteBuffer buf)
-        {
-            serializeHeader(buf);
-            serializeTokens(buf);
-            serializeChildOffsets(childBlockIndex, buf);
-        }
+    void add(Long token, long keyPosition);
+    void add(SortedMap<Long, LongSet> data);
+    void add(Iterator<Pair<Long, LongSet>> data);
+    void add(TokenTreeBuilder ttb);
 
-        public int childCount()
-        {
-            return children.size();
-        }
+    boolean isEmpty();
+    long getTokenCount();
 
-        public int tokenCount()
-        {
-            return tokens.size();
-        }
+    TokenTreeBuilder finish();
 
-        public Long smallestToken()
-        {
-            return tokens.get(0);
-        }
-
-        protected void add(Long token, InteriorNode leftChild, InteriorNode 
rightChild)
-        {
-            int pos = tokens.size();
-            if (pos == TOKENS_PER_BLOCK)
-            {
-                InteriorNode sibling = split();
-                sibling.add(token, leftChild, rightChild);
-
-            }
-            else {
-                if (leftChild != null)
-                    children.add(pos, leftChild);
-
-                if (rightChild != null)
-                {
-                    children.add(pos + 1, rightChild);
-                    rightChild.parent = this;
-                }
-
-                updateTokenRange(token);
-                tokens.add(pos, token);
-            }
-        }
-
-        protected void add(Leaf node)
-        {
-
-            if (position == (TOKENS_PER_BLOCK + 1))
-            {
-                rightmostParent = split();
-                rightmostParent.add(node);
-            }
-            else
-            {
-
-                node.parent = this;
-                children.add(position, node);
-                position++;
-
-                // the first child is referenced only during bulk load. we 
don't take a value
-                // to store into the tree, one is subtracted since position 
has already been incremented
-                // for the next node to be added
-                if (position - 1 == 0)
-                    return;
-
-
-                // tokens are inserted one behind the current position, but 2 
is subtracted because
-                // position has already been incremented for the next add
-                Long smallestToken = node.smallestToken();
-                updateTokenRange(smallestToken);
-                tokens.add(position - 2, smallestToken);
-            }
-
-        }
-
-        protected InteriorNode split()
-        {
-            Pair<Long, InteriorNode> splitResult = splitBlock();
-            Long middleValue = splitResult.left;
-            InteriorNode sibling = splitResult.right;
-            InteriorNode leftChild = null;
-
-            // create a new root if necessary
-            if (parent == null)
-            {
-                parent = new InteriorNode();
-                root = parent;
-                sibling.parent = parent;
-                leftChild = this;
-                numBlocks++;
-            }
-
-            parent.add(middleValue, leftChild, sibling);
-
-            return sibling;
-        }
-
-        protected Pair<Long, InteriorNode> splitBlock()
-        {
-            final int splitPosition = TOKENS_PER_BLOCK - 2;
-            InteriorNode sibling = new InteriorNode();
-            sibling.parent = parent;
-            next = sibling;
-
-            Long middleValue = tokens.get(splitPosition);
-
-            for (int i = splitPosition; i < TOKENS_PER_BLOCK; i++)
-            {
-                if (i != TOKENS_PER_BLOCK && i != splitPosition)
-                {
-                    long token = tokens.get(i);
-                    sibling.updateTokenRange(token);
-                    sibling.tokens.add(token);
-                }
-
-                Node child = children.get(i + 1);
-                child.parent = sibling;
-                sibling.children.add(child);
-                sibling.position++;
-            }
-
-            for (int i = TOKENS_PER_BLOCK; i >= splitPosition; i--)
-            {
-                if (i != TOKENS_PER_BLOCK)
-                    tokens.remove(i);
-
-                if (i != splitPosition)
-                    children.remove(i);
-            }
-
-            nodeMinToken = smallestToken();
-            nodeMaxToken = tokens.get(tokens.size() - 1);
-            numBlocks++;
-
-            return Pair.create(middleValue, sibling);
-        }
-
-        protected boolean isFull()
-        {
-            return (position >= TOKENS_PER_BLOCK + 1);
-        }
-
-        private void serializeTokens(ByteBuffer buf)
-        {
-            for (Long token : tokens)
-                buf.putLong(token);
-        }
-
-
-        private void serializeChildOffsets(long childBlockIndex, ByteBuffer 
buf)
-        {
-            for (int i = 0; i < children.size(); i++)
-                buf.putLong((childBlockIndex + i) * BLOCK_BYTES);
-        }
-    }
-
-    public static class LevelIterator extends AbstractIterator<Node>
-    {
-        private Node currentNode;
-
-        LevelIterator(Node first)
-        {
-            currentNode = first;
-        }
-
-        public Node computeNext()
-        {
-            if (currentNode == null)
-                return endOfData();
-
-            Node returnNode = currentNode;
-            currentNode = returnNode.next;
-
-            return returnNode;
-        }
-
-
-    }
-
-    public static class TokenIterator extends AbstractIterator<Pair<Long, 
LongSet>>
-    {
-        private Iterator<Node> levelIterator;
-        private Iterator<Map.Entry<Long, LongSet>> currentIterator;
-
-        TokenIterator(Iterator<Node> level)
-        {
-            levelIterator = level;
-            if (levelIterator.hasNext())
-                currentIterator = ((Leaf) 
levelIterator.next()).tokenIterator();
-        }
-
-        public Pair<Long, LongSet> computeNext()
-        {
-            if (currentIterator != null && currentIterator.hasNext())
-            {
-                Map.Entry<Long, LongSet> next = currentIterator.next();
-                return Pair.create(next.getKey(), next.getValue());
-            }
-            else
-            {
-                if (!levelIterator.hasNext())
-                    return endOfData();
-                else
-                {
-                    currentIterator = ((Leaf) 
levelIterator.next()).tokenIterator();
-                    return computeNext();
-                }
-            }
-
-        }
-    }
+    int serializedSize();
+    void write(DataOutputPlus out) throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java 
b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
index 293e2ee..a2f2c0e 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
@@ -29,6 +29,8 @@ import org.apache.cassandra.index.sasi.utils.AbstractIterator;
 import org.apache.cassandra.index.sasi.utils.CombinedValue;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
 
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
 import com.google.common.collect.PeekingIterator;
 
 public class KeyRangeIterator extends RangeIterator<Long, Token>
@@ -91,6 +93,15 @@ public class KeyRangeIterator extends RangeIterator<Long, 
Token>
             }};
         }
 
+        public LongSet getOffsets()
+        {
+            LongSet offsets = new LongOpenHashSet(4);
+            for (DecoratedKey key : keys)
+                offsets.add((long) key.getToken().getTokenValue());
+
+            return offsets;
+        }
+
         public void merge(CombinedValue<Long> other)
         {
             if (!(other instanceof Token))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java 
b/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java
index 63f6c5b..592299e 100644
--- a/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java
+++ b/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.index.sasi.sa;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 
+import org.apache.cassandra.index.sasi.disk.DynamicTokenTreeBuilder;
 import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
 import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -116,19 +117,19 @@ public class SuffixSA extends SA<CharBuffer>
                 if (lastProcessedSuffix == null)
                 {
                     lastProcessedSuffix = suffix.left;
-                    container = new TokenTreeBuilder(suffix.right.getTokens());
+                    container = new DynamicTokenTreeBuilder(suffix.right);
                 }
                 else if (comparator.compare(lastProcessedSuffix, suffix.left) 
== 0)
                 {
                     lastProcessedSuffix = suffix.left;
-                    container.add(suffix.right.getTokens());
+                    container.add(suffix.right);
                 }
                 else
                 {
                     Pair<ByteBuffer, TokenTreeBuilder> result = finishSuffix();
 
                     lastProcessedSuffix = suffix.left;
-                    container = new TokenTreeBuilder(suffix.right.getTokens());
+                    container = new DynamicTokenTreeBuilder(suffix.right);
 
                     return result;
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java 
b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java
index 2bf5a07..ba7123a 100644
--- a/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java
+++ b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java
@@ -18,42 +18,22 @@
 package org.apache.cassandra.index.sasi.utils;
 
 import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.*;
 
+import org.apache.cassandra.index.sasi.disk.*;
 import org.apache.cassandra.index.sasi.disk.OnDiskIndex.DataTerm;
-import org.apache.cassandra.index.sasi.disk.Token;
-import org.apache.cassandra.index.sasi.disk.TokenTree;
-import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
 import org.apache.cassandra.db.marshal.AbstractType;
 
-import com.carrotsearch.hppc.LongOpenHashSet;
-import com.carrotsearch.hppc.LongSet;
-import com.carrotsearch.hppc.cursors.LongCursor;
-
 public class CombinedTerm implements CombinedValue<DataTerm>
 {
     private final AbstractType<?> comparator;
     private final DataTerm term;
-    private final TreeMap<Long, LongSet> tokens;
+    private final List<DataTerm> mergedTerms = new ArrayList<>();
 
     public CombinedTerm(AbstractType<?> comparator, DataTerm term)
     {
         this.comparator = comparator;
         this.term = term;
-        this.tokens = new TreeMap<>();
-
-        RangeIterator<Long, Token> tokens = term.getTokens();
-        while (tokens.hasNext())
-        {
-            Token current = tokens.next();
-            LongSet offsets = this.tokens.get(current.get());
-            if (offsets == null)
-                this.tokens.put(current.get(), (offsets = new 
LongOpenHashSet()));
-
-            for (Long offset : ((TokenTree.OnDiskToken) current).getOffsets())
-                offsets.add(offset);
-        }
     }
 
     public ByteBuffer getTerm()
@@ -61,14 +41,18 @@ public class CombinedTerm implements CombinedValue<DataTerm>
         return term.getTerm();
     }
 
-    public Map<Long, LongSet> getTokens()
+    public RangeIterator<Long, Token> getTokenIterator()
     {
-        return tokens;
+        RangeIterator.Builder<Long, Token> union = 
RangeUnionIterator.builder();
+        union.add(term.getTokens());
+        
mergedTerms.stream().map(OnDiskIndex.DataTerm::getTokens).forEach(union::add);
+
+        return union.build();
     }
 
     public TokenTreeBuilder getTokenTreeBuilder()
     {
-        return new TokenTreeBuilder(tokens).finish();
+        return new StaticTokenTreeBuilder(this).finish();
     }
 
     public void merge(CombinedValue<DataTerm> other)
@@ -80,15 +64,7 @@ public class CombinedTerm implements CombinedValue<DataTerm>
 
         assert comparator == o.comparator;
 
-        for (Map.Entry<Long, LongSet> token : o.tokens.entrySet())
-        {
-            LongSet offsets = this.tokens.get(token.getKey());
-            if (offsets == null)
-                this.tokens.put(token.getKey(), (offsets = new 
LongOpenHashSet()));
-
-            for (LongCursor offset : token.getValue())
-                offsets.add(offset.value);
-        }
+        mergedTerms.add(o.term);
     }
 
     public DataTerm get()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java 
b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
index 6353155..628bd36 100644
--- a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
@@ -605,6 +605,12 @@ public class OnDiskIndexTest
         }
     }
 
+    public void putAll(SortedMap<Long, LongSet> offsets, TokenTreeBuilder ttb)
+    {
+        for (Pair<Long, LongSet> entry : ttb)
+            offsets.put(entry.left, entry.right);
+    }
+
     @Test
     public void testCombiningOfThePartitionedSA() throws Exception
     {
@@ -620,7 +626,7 @@ public class OnDiskIndexTest
                 expected.put(i, (offsets = new TreeMap<>()));
 
             builderA.add(LongType.instance.decompose(i), keyAt(i), i);
-            offsets.putAll(keyBuilder(i).getTokens());
+            putAll(offsets, keyBuilder(i));
         }
 
         for (long i = 50; i < 100; i++)
@@ -631,7 +637,7 @@ public class OnDiskIndexTest
 
             long position = 100L + i;
             builderB.add(LongType.instance.decompose(i), keyAt(position), 
position);
-            offsets.putAll(keyBuilder(100L + i).getTokens());
+            putAll(offsets, keyBuilder(100L + i));
         }
 
         File indexA = File.createTempFile("on-disk-sa-partition-a", ".db");
@@ -659,7 +665,7 @@ public class OnDiskIndexTest
             if (offsets == null)
                 actual.put(composedTerm, (offsets = new TreeMap<>()));
 
-            offsets.putAll(term.getTokens());
+            putAll(offsets, term.getTokenTreeBuilder());
         }
 
         Assert.assertEquals(actual, expected);
@@ -684,7 +690,7 @@ public class OnDiskIndexTest
             if (offsets == null)
                 actual.put(composedTerm, (offsets = new TreeMap<>()));
 
-            offsets.putAll(term.getTokens());
+            putAll(offsets, term.getTokenTreeBuilder());
         }
 
         Assert.assertEquals(actual, expected);
@@ -735,7 +741,7 @@ public class OnDiskIndexTest
 
     private static TokenTreeBuilder keyBuilder(Long... keys)
     {
-        TokenTreeBuilder builder = new TokenTreeBuilder();
+        TokenTreeBuilder builder = new DynamicTokenTreeBuilder();
 
         for (final Long key : keys)
         {
@@ -850,9 +856,9 @@ public class OnDiskIndexTest
 
     private static void addAll(OnDiskIndexBuilder builder, ByteBuffer term, 
TokenTreeBuilder tokens)
     {
-        for (Map.Entry<Long, LongSet> token : tokens.getTokens().entrySet())
+        for (Pair<Long, LongSet> token : tokens)
         {
-            for (long position : token.getValue().toArray())
+            for (long position : token.right.toArray())
                 builder.add(term, keyAt(position), position);
         }
     }

Reply via email to