Updated Branches:
  refs/heads/trunk 40598efa6 -> dbd1a727b

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java 
b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
new file mode 100644
index 0000000..aac70ec
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+import static org.apache.cassandra.io.sstable.Downsampling.MIN_SAMPLING_LEVEL;
+import static 
org.apache.cassandra.io.sstable.IndexSummaryManager.DOWNSAMPLE_THESHOLD;
+import static 
org.apache.cassandra.io.sstable.IndexSummaryManager.UPSAMPLE_THRESHOLD;
+import static 
org.apache.cassandra.io.sstable.IndexSummaryManager.redistributeSummaries;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class IndexSummaryManagerTest extends SchemaLoader
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(IndexSummaryManagerTest.class);
+
+
+    private static long totalOffHeapSize(List<SSTableReader> sstables)
+    {
+        long total = 0;
+        for (SSTableReader sstable : sstables)
+            total += sstable.getIndexSummaryOffHeapSize();
+
+        return total;
+    }
+
+    private static List<SSTableReader> resetSummaries(List<SSTableReader> 
sstables, long originalOffHeapSize) throws IOException
+    {
+        for (SSTableReader sstable : sstables)
+            sstable.readMeter = new RestorableMeter(100.0, 100.0);
+
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 
originalOffHeapSize * sstables.size());
+        for (SSTableReader sstable : sstables)
+            assertEquals(BASE_SAMPLING_LEVEL, 
sstable.getIndexSummarySamplingLevel());
+
+        return sstables;
+    }
+
+    private void validateData(ColumnFamilyStore cfs, int numRows)
+    {
+        for (int i = 0; i < numRows; i++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(i));
+            QueryFilter filter = QueryFilter.getIdentityFilter(key, 
cfs.getColumnFamilyName(), System.currentTimeMillis());
+            ColumnFamily row = cfs.getColumnFamily(filter);
+            assertNotNull(row);
+            Column column = row.getColumn(ByteBufferUtil.bytes("column"));
+            assertNotNull(column);
+            assertEquals(100, column.value().array().length);
+        }
+    }
+
+    private Comparator<SSTableReader> hotnessComparator = new 
Comparator<SSTableReader>()
+    {
+        public int compare(SSTableReader o1, SSTableReader o2)
+        {
+            return Double.compare(o1.readMeter.fifteenMinuteRate(), 
o2.readMeter.fifteenMinuteRate());
+        }
+    };
+
+    @Test
+    public void testRedistributeSummaries() throws IOException
+    {
+        String ksname = "Keyspace1";
+        String cfname = "StandardLowIndexInterval"; // index interval of 8, no 
key caching
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        int numSSTables = 4;
+        int numRows = 256;
+        for (int sstable = 0; sstable < numSSTables; sstable++)
+        {
+            for (int row = 0; row < numRows; row++)
+            {
+                DecoratedKey key = Util.dk(String.valueOf(row));
+                RowMutation rm = new RowMutation(ksname, key.key);
+                rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+                rm.apply();
+            }
+            cfs.forceBlockingFlush();
+        }
+
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
+        assertEquals(numSSTables, sstables.size());
+        validateData(cfs, numRows);
+
+        for (SSTableReader sstable : sstables)
+            sstable.readMeter = new RestorableMeter(100.0, 100.0);
+
+        long singleSummaryOffHeapSpace = 
sstables.get(0).getIndexSummaryOffHeapSize();
+
+        // there should be enough space to not downsample anything
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 
(singleSummaryOffHeapSpace * numSSTables));
+        for (SSTableReader sstable : sstables)
+            assertEquals(BASE_SAMPLING_LEVEL, 
sstable.getIndexSummarySamplingLevel());
+        assertEquals(singleSummaryOffHeapSpace * numSSTables, 
totalOffHeapSize(sstables));
+        validateData(cfs, numRows);
+
+        // everything should get cut in half
+        assert sstables.size() == 4;
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 
(singleSummaryOffHeapSpace * (numSSTables / 2)));
+        for (SSTableReader sstable : sstables)
+            assertEquals(BASE_SAMPLING_LEVEL / 2, 
sstable.getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+        // everything should get cut to a quarter
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 
(singleSummaryOffHeapSpace * (numSSTables / 4)));
+        for (SSTableReader sstable : sstables)
+            assertEquals(BASE_SAMPLING_LEVEL / 4, 
sstable.getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+        // upsample back up to half
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, 
sstables,(singleSummaryOffHeapSpace * (numSSTables / 2)));
+        assert sstables.size() == 4;
+        for (SSTableReader sstable : sstables)
+            assertEquals(BASE_SAMPLING_LEVEL / 2, 
sstable.getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+        // upsample back up to the original index summary
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 
(singleSummaryOffHeapSpace * numSSTables));
+        for (SSTableReader sstable : sstables)
+            assertEquals(BASE_SAMPLING_LEVEL, 
sstable.getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+        // make two of the four sstables cold, only leave enough space for 
three full index summaries,
+        // so the two cold sstables should get downsampled to be half of their 
original size
+        sstables.get(0).readMeter = new RestorableMeter(50.0, 50.0);
+        sstables.get(1).readMeter = new RestorableMeter(50.0, 50.0);
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 
(singleSummaryOffHeapSpace * 3));
+        Collections.sort(sstables, hotnessComparator);
+        assertEquals(BASE_SAMPLING_LEVEL / 2, 
sstables.get(0).getIndexSummarySamplingLevel());
+        assertEquals(BASE_SAMPLING_LEVEL / 2, 
sstables.get(1).getIndexSummarySamplingLevel());
+        assertEquals(BASE_SAMPLING_LEVEL, 
sstables.get(2).getIndexSummarySamplingLevel());
+        assertEquals(BASE_SAMPLING_LEVEL, 
sstables.get(3).getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+        // small increases or decreases in the read rate don't result in 
downsampling or upsampling
+        double lowerRate = 50.0 * (DOWNSAMPLE_THESHOLD + (DOWNSAMPLE_THESHOLD 
* 0.10));
+        double higherRate = 50.0 * (UPSAMPLE_THRESHOLD - (UPSAMPLE_THRESHOLD * 
0.10));
+        sstables.get(0).readMeter = new RestorableMeter(lowerRate, lowerRate);
+        sstables.get(1).readMeter = new RestorableMeter(higherRate, 
higherRate);
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 
(singleSummaryOffHeapSpace * 3));
+        Collections.sort(sstables, hotnessComparator);
+        assertEquals(BASE_SAMPLING_LEVEL / 2, 
sstables.get(0).getIndexSummarySamplingLevel());
+        assertEquals(BASE_SAMPLING_LEVEL / 2, 
sstables.get(1).getIndexSummarySamplingLevel());
+        assertEquals(BASE_SAMPLING_LEVEL, 
sstables.get(2).getIndexSummarySamplingLevel());
+        assertEquals(BASE_SAMPLING_LEVEL, 
sstables.get(3).getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+        // reset, and then this time, leave enough space for one of the cold 
sstables to not get downsampled
+        sstables = resetSummaries(sstables, singleSummaryOffHeapSpace);
+        sstables.get(0).readMeter = new RestorableMeter(1.0, 1.0);
+        sstables.get(1).readMeter = new RestorableMeter(2.0, 2.0);
+        sstables.get(2).readMeter = new RestorableMeter(1000.0, 1000.0);
+        sstables.get(3).readMeter = new RestorableMeter(1000.0, 1000.0);
+
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 
(singleSummaryOffHeapSpace * 3) + 50);
+        Collections.sort(sstables, hotnessComparator);
+
+        if (sstables.get(0).getIndexSummarySamplingLevel() == 
MIN_SAMPLING_LEVEL)
+            assertEquals(BASE_SAMPLING_LEVEL, 
sstables.get(1).getIndexSummarySamplingLevel());
+        else
+            assertEquals(BASE_SAMPLING_LEVEL, 
sstables.get(0).getIndexSummarySamplingLevel());
+
+        assertEquals(BASE_SAMPLING_LEVEL, 
sstables.get(2).getIndexSummarySamplingLevel());
+        assertEquals(BASE_SAMPLING_LEVEL, 
sstables.get(3).getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+
+        // Cause a mix of upsampling and downsampling. We'll leave enough 
space for two full index summaries. The two
+        // coldest sstables will get downsampled to 8/128 of their size, 
leaving us with 1 and 112/128th index
+        // summaries worth of space.  The hottest sstable should get a full 
index summary, and the one in the middle
+        // should get the remainder.
+        sstables.get(0).readMeter = new RestorableMeter(0.0, 0.0);
+        sstables.get(1).readMeter = new RestorableMeter(0.0, 0.0);
+        sstables.get(2).readMeter = new RestorableMeter(100, 100);
+        sstables.get(3).readMeter = new RestorableMeter(128.0, 128.0);
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 
(long) (singleSummaryOffHeapSpace + (singleSummaryOffHeapSpace * (100.0 / 
BASE_SAMPLING_LEVEL))));
+        Collections.sort(sstables, hotnessComparator);
+        assertEquals(MIN_SAMPLING_LEVEL, 
sstables.get(0).getIndexSummarySamplingLevel());
+        assertEquals(MIN_SAMPLING_LEVEL, 
sstables.get(1).getIndexSummarySamplingLevel());
+        assertTrue(sstables.get(2).getIndexSummarySamplingLevel() > 
MIN_SAMPLING_LEVEL);
+        assertTrue(sstables.get(2).getIndexSummarySamplingLevel() < 
BASE_SAMPLING_LEVEL);
+        assertEquals(BASE_SAMPLING_LEVEL, 
sstables.get(3).getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+
+        // Don't leave enough space for even the minimal index summaries
+        sstables = redistributeSummaries(Collections.EMPTY_LIST, sstables, 
100);
+        for (SSTableReader sstable : sstables)
+            assertEquals(MIN_SAMPLING_LEVEL, 
sstable.getIndexSummarySamplingLevel());
+        validateData(cfs, numRows);
+    }
+
+    @Test
+    public void testRebuildAtSamplingLevel() throws IOException
+    {
+        String ksname = "Keyspace1";
+        String cfname = "StandardLowIndexInterval";
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        int numRows = 256;
+        for (int row = 0; row < numRows; row++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(row));
+            RowMutation rm = new RowMutation(ksname, key.key);
+            rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+            rm.apply();
+        }
+        cfs.forceBlockingFlush();
+
+        List<SSTableReader> sstables = new ArrayList<>(cfs.getSSTables());
+        assertEquals(1, sstables.size());
+        SSTableReader sstable = sstables.get(0);
+
+        for (int samplingLevel = MIN_SAMPLING_LEVEL; samplingLevel < 
BASE_SAMPLING_LEVEL; samplingLevel++)
+        {
+            sstable = sstable.cloneWithNewSummarySamplingLevel(samplingLevel);
+            assertEquals(samplingLevel, 
sstable.getIndexSummarySamplingLevel());
+            int expectedSize = (numRows * samplingLevel) / 
(sstable.metadata.getIndexInterval() * BASE_SAMPLING_LEVEL);
+            assertEquals(expectedSize, sstable.getIndexSummarySize(), 1);
+        }
+    }
+
+    @Test
+    public void testJMXFunctions() throws IOException
+    {
+        IndexSummaryManager manager = IndexSummaryManager.instance;
+
+        // resize interval
+        assertNotNull(manager.getResizeIntervalInMinutes());
+        manager.setResizeIntervalInMinutes(-1);
+        assertNull(manager.getTimeToNextResize(TimeUnit.MINUTES));
+
+        manager.setResizeIntervalInMinutes(10);
+        assertEquals(10, manager.getResizeIntervalInMinutes());
+        assertEquals(10, manager.getTimeToNextResize(TimeUnit.MINUTES), 1);
+        manager.setResizeIntervalInMinutes(15);
+        assertEquals(15, manager.getResizeIntervalInMinutes());
+        assertEquals(15, manager.getTimeToNextResize(TimeUnit.MINUTES), 2);
+
+        // memory pool capacity
+        assertTrue(manager.getMemoryPoolCapacityInMB() >= 0);
+        manager.setMemoryPoolCapacityInMB(10);
+        assertEquals(10, manager.getMemoryPoolCapacityInMB());
+
+        String ksname = "Keyspace1";
+        String cfname = "StandardLowIndexInterval"; // index interval of 8, no 
key caching
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        int numSSTables = 2;
+        int numRows = 10;
+        for (int sstable = 0; sstable < numSSTables; sstable++)
+        {
+            for (int row = 0; row < numRows; row++)
+            {
+                DecoratedKey key = Util.dk(String.valueOf(row));
+                RowMutation rm = new RowMutation(ksname, key.key);
+                rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+                rm.apply();
+            }
+            cfs.forceBlockingFlush();
+        }
+
+        assertEquals(1.0, manager.getAverageSamplingRatio(), 0.001);
+        Map<String, Double> samplingRatios = manager.getSamplingRatios();
+        for (Map.Entry<String, Double> entry : samplingRatios.entrySet())
+            assertEquals(1.0, entry.getValue(), 0.001);
+
+        manager.setMemoryPoolCapacityInMB(0);
+        manager.redistributeSummaries();
+        assertTrue(manager.getAverageSamplingRatio() < 0.99);
+        samplingRatios = manager.getSamplingRatios();
+        for (Map.Entry<String, Double> entry : samplingRatios.entrySet())
+        {
+            if (entry.getKey().contains("StandardLowIndexInterval"))
+                assertTrue(entry.getValue() < 0.9);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java 
b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index 8e73161..8d013f9 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -23,22 +23,25 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 
 import com.google.common.collect.Lists;
 import org.junit.Test;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
-import static org.junit.Assert.assertArrayEquals;
+import static org.apache.cassandra.io.sstable.IndexSummaryBuilder.downsample;
+import static 
org.apache.cassandra.io.sstable.IndexSummaryBuilder.entriesAtSamplingLevel;
+import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
+import static org.apache.cassandra.io.sstable.Downsampling.MIN_SAMPLING_LEVEL;
+
+import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
 
 public class IndexSummaryTest
@@ -73,13 +76,13 @@ public class IndexSummaryTest
         Pair<List<DecoratedKey>, IndexSummary> random = 
generateRandomIndex(100, 1);
         ByteArrayOutputStream aos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(aos);
-        IndexSummary.serializer.serialize(random.right, dos);
+        IndexSummary.serializer.serialize(random.right, dos, false);
         // write junk
         dos.writeUTF("JUNK");
         dos.writeUTF("JUNK");
         FileUtils.closeQuietly(dos);
         DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(aos.toByteArray()));
-        IndexSummary is = IndexSummary.serializer.deserialize(dis, 
DatabaseDescriptor.getPartitioner());
+        IndexSummary is = IndexSummary.serializer.deserialize(dis, 
DatabaseDescriptor.getPartitioner(), false);
         for (int i = 0; i < 100; i++)
             assertEquals(i, is.binarySearch(random.left.get(i)));
         // read the junk
@@ -92,7 +95,7 @@ public class IndexSummaryTest
     public void testAddEmptyKey() throws Exception
     {
         IPartitioner p = new RandomPartitioner();
-        IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1);
+        IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1, 
BASE_SAMPLING_LEVEL);
         builder.maybeAddEntry(p.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER), 
0);
         IndexSummary summary = builder.build(p);
         assertEquals(1, summary.size());
@@ -101,9 +104,9 @@ public class IndexSummaryTest
 
         ByteArrayOutputStream aos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(aos);
-        IndexSummary.serializer.serialize(summary, dos);
+        IndexSummary.serializer.serialize(summary, dos, false);
         DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(aos.toByteArray()));
-        IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p);
+        IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, 
false);
 
         assertEquals(1, loaded.size());
         assertEquals(summary.getPosition(0), loaded.getPosition(0));
@@ -113,7 +116,7 @@ public class IndexSummaryTest
     private Pair<List<DecoratedKey>, IndexSummary> generateRandomIndex(int 
size, int interval)
     {
         List<DecoratedKey> list = Lists.newArrayList();
-        IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), 
interval);
+        IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), 
interval, BASE_SAMPLING_LEVEL);
         for (int i = 0; i < size; i++)
         {
             UUID uuid = UUID.randomUUID();
@@ -126,4 +129,128 @@ public class IndexSummaryTest
         IndexSummary summary = 
builder.build(DatabaseDescriptor.getPartitioner());
         return Pair.create(list, summary);
     }
-}
+
+    @Test
+    public void testDownsamplePatterns()
+    {
+        assertEquals(Arrays.asList(0), Downsampling.getSamplingPattern(0));
+        assertEquals(Arrays.asList(0), Downsampling.getSamplingPattern(1));
+
+        assertEquals(Arrays.asList(0, 1), Downsampling.getSamplingPattern(2));
+        assertEquals(Arrays.asList(0, 2, 1, 3), 
Downsampling.getSamplingPattern(4));
+        assertEquals(Arrays.asList(0, 4, 2, 6, 1, 5, 3, 7), 
Downsampling.getSamplingPattern(8));
+        assertEquals(Arrays.asList(0, 8, 4, 12, 2, 10, 6, 14, 1, 9, 5, 13, 3, 
11, 7, 15), Downsampling.getSamplingPattern(16));
+    }
+
+    private static boolean shouldSkip(int index, List<Integer> startPoints)
+    {
+        for (int start : startPoints)
+        {
+            if ((index - start) % BASE_SAMPLING_LEVEL == 0)
+                return true;
+        }
+        return false;
+    }
+
+    @Test
+    public void testDownsample()
+    {
+        final int NUM_KEYS = 4096;
+        final int INDEX_INTERVAL = 128;
+        final int ORIGINAL_NUM_ENTRIES = NUM_KEYS / INDEX_INTERVAL;
+
+
+        Pair<List<DecoratedKey>, IndexSummary> random = 
generateRandomIndex(NUM_KEYS, INDEX_INTERVAL);
+        List<DecoratedKey> keys = random.left;
+        IndexSummary original = random.right;
+
+        // sanity check on the original index summary
+        for (int i = 0; i < ORIGINAL_NUM_ENTRIES; i++)
+            assertEquals(keys.get(i * INDEX_INTERVAL).key, 
ByteBuffer.wrap(original.getKey(i)));
+
+        List<Integer> samplePattern = 
Downsampling.getSamplingPattern(BASE_SAMPLING_LEVEL);
+
+        // downsample by one level, then two levels, then three levels...
+        int downsamplingRound = 1;
+        for (int samplingLevel = BASE_SAMPLING_LEVEL - 1; samplingLevel >= 
MIN_SAMPLING_LEVEL; samplingLevel--)
+        {
+            IndexSummary downsampled = downsample(original, samplingLevel, 
DatabaseDescriptor.getPartitioner());
+            assertEquals(entriesAtSamplingLevel(samplingLevel, 
original.getMaxNumberOfEntries()), downsampled.size());
+
+            int sampledCount = 0;
+            List<Integer> skipStartPoints = samplePattern.subList(0, 
downsamplingRound);
+            for (int i = 0; i < ORIGINAL_NUM_ENTRIES; i++)
+            {
+                if (!shouldSkip(i, skipStartPoints))
+                {
+                    assertEquals(keys.get(i * INDEX_INTERVAL).key, 
ByteBuffer.wrap(downsampled.getKey(sampledCount)));
+                    sampledCount++;
+                }
+            }
+            downsamplingRound++;
+        }
+
+        // downsample one level each time
+        IndexSummary previous = original;
+        downsamplingRound = 1;
+        for (int downsampleLevel = BASE_SAMPLING_LEVEL - 1; downsampleLevel >= 
MIN_SAMPLING_LEVEL; downsampleLevel--)
+        {
+            IndexSummary downsampled = downsample(previous, downsampleLevel, 
DatabaseDescriptor.getPartitioner());
+            assertEquals(entriesAtSamplingLevel(downsampleLevel, 
original.getMaxNumberOfEntries()), downsampled.size());
+
+            int sampledCount = 0;
+            List<Integer> skipStartPoints = samplePattern.subList(0, 
downsamplingRound);
+            for (int i = 0; i < ORIGINAL_NUM_ENTRIES; i++)
+            {
+                if (!shouldSkip(i, skipStartPoints))
+                {
+                    assertEquals(keys.get(i * INDEX_INTERVAL).key, 
ByteBuffer.wrap(downsampled.getKey(sampledCount)));
+                    sampledCount++;
+                }
+            }
+
+            previous = downsampled;
+            downsamplingRound++;
+        }
+    }
+
+    @Test
+    public void testOriginalIndexLookup()
+    {
+        for (int i = BASE_SAMPLING_LEVEL; i >= MIN_SAMPLING_LEVEL; i--)
+            assertEquals(i, Downsampling.getOriginalIndexes(i).size());
+
+        ArrayList<Integer> full = new ArrayList<>();
+        for (int i = 0; i < BASE_SAMPLING_LEVEL; i++)
+            full.add(i);
+
+        assertEquals(full, 
Downsampling.getOriginalIndexes(BASE_SAMPLING_LEVEL));
+        // the entry at index 0 is the first to go
+        assertEquals(full.subList(1, full.size()), 
Downsampling.getOriginalIndexes(BASE_SAMPLING_LEVEL - 1));
+
+        // spot check a few values (these depend on BASE_SAMPLING_LEVEL being 
128)
+        assert BASE_SAMPLING_LEVEL == 128;
+        assertEquals(Arrays.asList(31, 63, 95, 127), 
Downsampling.getOriginalIndexes(4));
+        assertEquals(Arrays.asList(63, 127), 
Downsampling.getOriginalIndexes(2));
+        assertEquals(Arrays.asList(), Downsampling.getOriginalIndexes(0));
+    }
+
+    @Test
+    public void testGetNumberOfSkippedEntriesAfterIndex()
+    {
+        int indexInterval = 128;
+        for (int i = 0; i < BASE_SAMPLING_LEVEL; i++)
+            assertEquals(indexInterval, 
Downsampling.getEffectiveIndexIntervalAfterIndex(i, BASE_SAMPLING_LEVEL, 
indexInterval));
+
+        // with one round of downsampling, only the first summary has been 
removed, so only the last index will have
+        // double the gap until the next sample
+        for (int i = 0; i < BASE_SAMPLING_LEVEL - 2; i++)
+            assertEquals(indexInterval, 
Downsampling.getEffectiveIndexIntervalAfterIndex(i, BASE_SAMPLING_LEVEL - 1, 
indexInterval));
+        assertEquals(indexInterval * 2, 
Downsampling.getEffectiveIndexIntervalAfterIndex(BASE_SAMPLING_LEVEL - 2, 
BASE_SAMPLING_LEVEL - 1, indexInterval));
+
+        // at samplingLevel=2, the retained summary points are [63, 127] 
(assumes BASE_SAMPLING_LEVEL is 128)
+        assert BASE_SAMPLING_LEVEL == 128;
+        assertEquals(64 * indexInterval, 
Downsampling.getEffectiveIndexIntervalAfterIndex(0, 2, indexInterval));
+        assertEquals(64 * indexInterval, 
Downsampling.getEffectiveIndexIntervalAfterIndex(1, 2, indexInterval));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd1a727/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index b771e72..6bf17fd 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -23,17 +23,20 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.junit.Assert;
 import com.google.common.collect.Sets;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
@@ -54,11 +57,17 @@ import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class SSTableReaderTest extends SchemaLoader
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(SSTableReaderTest.class);
+
     static Token t(int i)
     {
         return 
StorageService.getPartitioner().getToken(ByteBufferUtil.bytes(String.valueOf(i)));
@@ -252,8 +261,8 @@ public class SSTableReaderTest extends SchemaLoader
 
         // test to see if sstable can be opened as expected
         SSTableReader target = SSTableReader.open(desc);
-        Assert.assertEquals(target.getKeySampleSize(), 1);
-        Assert.assertArrayEquals(ByteBufferUtil.getArray(firstKey.key), 
target.getKeySample(0));
+        Assert.assertEquals(target.getIndexSummarySize(), 1);
+        Assert.assertArrayEquals(ByteBufferUtil.getArray(firstKey.key), 
target.getIndexSummaryKey(0));
         assert target.first.equals(firstKey);
         assert target.last.equals(lastKey);
     }
@@ -341,6 +350,64 @@ public class SSTableReaderTest extends SchemaLoader
         assert sections.size() == 1 : "Expected to find range in sstable 
opened for bulk loading";
     }
 
+    @Test
+    public void testIndexSummaryReplacement() throws IOException, 
ExecutionException, InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open("Keyspace1");
+        final ColumnFamilyStore store = 
keyspace.getColumnFamilyStore("StandardLowIndexInterval"); // index interval of 
8, no key caching
+        CompactionManager.instance.disableAutoCompaction();
+
+        final int NUM_ROWS = 1000;
+        for (int j = 0; j < NUM_ROWS; j++)
+        {
+            ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", j));
+            RowMutation rm = new RowMutation("Keyspace1", key);
+            rm.add("StandardLowIndexInterval", ByteBufferUtil.bytes("0"), 
ByteBufferUtil.bytes(String.format("%3d", j)), j);
+            rm.apply();
+        }
+        store.forceBlockingFlush();
+        CompactionManager.instance.performMaximal(store);
+
+        Collection<SSTableReader> sstables = store.getSSTables();
+        assert sstables.size() == 1;
+        final SSTableReader sstable = sstables.iterator().next();
+
+        ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5);
+        List<Future> futures = new ArrayList<>(NUM_ROWS * 2);
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            final ByteBuffer key = ByteBufferUtil.bytes(String.format("%3d", 
i));
+            final int index = i;
+
+            futures.add(executor.submit(new Runnable()
+            {
+                public void run()
+                {
+                    ColumnFamily result = 
store.getColumnFamily(sstable.partitioner.decorateKey(key), 
ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 100, 
100);
+                    assertFalse(result.isEmpty());
+                    assertEquals(0, 
ByteBufferUtil.compare(String.format("%3d", index).getBytes(), 
result.getColumn(ByteBufferUtil.bytes("0")).value()));
+                }
+            }));
+
+            futures.add(executor.submit(new Runnable()
+            {
+                public void run()
+                {
+                    Iterable<DecoratedKey> results = store.keySamples(
+                            new Range<>(sstable.partitioner.getMinimumToken(), 
sstable.partitioner.getToken(key)));
+                    assertTrue(results.iterator().hasNext());
+                }
+            }));
+        }
+
+        SSTableReader replacement = 
sstable.cloneWithNewSummarySamplingLevel(Downsampling.MIN_SAMPLING_LEVEL);
+        store.getDataTracker().replaceReaders(Arrays.asList(sstable), 
Arrays.asList(replacement));
+        for (Future future : futures)
+            future.get();
+
+        assertEquals(sstable.estimatedKeys(), replacement.estimatedKeys(), 1);
+    }
+
     private void assertIndexQueryWorks(ColumnFamilyStore indexedCFS) throws 
IOException
     {
         assert "Indexed1".equals(indexedCFS.name);

Reply via email to