Create interval tree over canonical sstables to avoid missing sstables during 
streaming

patch by marcuse; reviewed by benedict for CASSANDRA-11886


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/72acbcd0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/72acbcd0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/72acbcd0

Branch: refs/heads/trunk
Commit: 72acbcd00fe7c46e54cd267f42868531e99e39df
Parents: 68319f7
Author: Marcus Eriksson <marc...@apache.org>
Authored: Wed May 25 08:38:14 2016 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Mon Jun 13 14:31:47 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  4 ++
 .../org/apache/cassandra/db/DataTracker.java    |  8 ++-
 .../cassandra/streaming/StreamSession.java      | 21 +++---
 .../io/sstable/SSTableRewriterTest.java         | 72 ++++++++++++++++++++
 5 files changed, 93 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/72acbcd0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af641e1..ebcc90c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.15
+ * Create interval tree over canonical sstables to avoid missing sstables 
during streaming (CASSANDRA-11886)
  * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting 
SSL connections (CASSANDRA-11749)
  * Updated cqlsh Python driver to fix DESCRIBE problem for legacy tables 
(CASSANDRA-11055)
  * cqlsh: apply current keyspace to source command (CASSANDRA-11152)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72acbcd0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 166ce7e..559ba0b 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1531,6 +1531,10 @@ public class DatabaseDescriptor
     {
         return conf.sstable_preemptive_open_interval_in_mb;
     }
+    public static void setSSTablePreempiveOpenIntervalInMB(int mb)
+    {
+        conf.sstable_preemptive_open_interval_in_mb = mb;
+    }
 
     public static boolean getTrickleFsync()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72acbcd0/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java 
b/src/java/org/apache/cassandra/db/DataTracker.java
index c731a35..927e717 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.IndexSummary;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
@@ -810,9 +811,14 @@ public class DataTracker
 
         public List<SSTableReader> 
sstablesInBounds(AbstractBounds<RowPosition> rowBounds)
         {
+            return sstablesInBounds(rowBounds, intervalTree, 
liveMemtables.get(0).cfs.partitioner);
+        }
+
+        public static List<SSTableReader> 
sstablesInBounds(AbstractBounds<RowPosition> rowBounds, SSTableIntervalTree 
intervalTree, IPartitioner partitioner)
+        {
             if (intervalTree.isEmpty())
                 return Collections.emptyList();
-            RowPosition stopInTree = 
rowBounds.right.isMinimum(liveMemtables.get(0).cfs.partitioner) ? 
intervalTree.max() : rowBounds.right;
+            RowPosition stopInTree = rowBounds.right.isMinimum(partitioner) ? 
intervalTree.max() : rowBounds.right;
             return intervalTree.search(Interval.<RowPosition, 
SSTableReader>create(rowBounds.left, stopInTree));
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72acbcd0/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 4eb8557..273631c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.*;
 import org.slf4j.Logger;
@@ -38,6 +39,8 @@ import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
@@ -295,7 +298,8 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         return stores;
     }
 
-    private List<SSTableStreamingSections> 
getSSTableSectionsForRanges(Collection<Range<Token>> ranges, 
Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, final boolean 
isIncremental)
+    @VisibleForTesting
+    public static List<SSTableStreamingSections> 
getSSTableSectionsForRanges(Collection<Range<Token>> ranges, 
Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, final boolean 
isIncremental)
     {
         Refs<SSTableReader> refs = new Refs<>();
         try
@@ -303,30 +307,23 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
             for (ColumnFamilyStore cfStore : stores)
             {
                 final List<AbstractBounds<RowPosition>> rowBoundsList = new 
ArrayList<>(ranges.size());
+                final IPartitioner partitioner = cfStore.partitioner;
                 for (Range<Token> range : ranges)
                     rowBoundsList.add(range.toRowBounds());
                 refs.addAll(cfStore.selectAndReference(new 
Function<DataTracker.View, List<SSTableReader>>()
                 {
                     public List<SSTableReader> apply(DataTracker.View view)
                     {
-                        Map<SSTableReader, SSTableReader> permittedInstances = 
new HashMap<>();
-                        for (SSTableReader reader : 
ColumnFamilyStore.CANONICAL_SSTABLES.apply(view))
-                            permittedInstances.put(reader, reader);
-
+                        DataTracker.SSTableIntervalTree intervalTree = 
DataTracker.buildIntervalTree(ColumnFamilyStore.CANONICAL_SSTABLES.apply(view));
                         Set<SSTableReader> sstables = Sets.newHashSet();
                         for (AbstractBounds<RowPosition> rowBounds : 
rowBoundsList)
                         {
-                            // sstableInBounds may contain early opened 
sstables
-                            for (SSTableReader sstable : 
view.sstablesInBounds(rowBounds))
+                            for (SSTableReader sstable : 
DataTracker.View.sstablesInBounds(rowBounds, intervalTree, partitioner))
                             {
-                                if (isIncremental && sstable.isRepaired())
-                                    continue;
-                                sstable = permittedInstances.get(sstable);
-                                if (sstable != null)
+                                if (!isIncremental || !sstable.isRepaired())
                                     sstables.add(sstable);
                             }
                         }
-
                         return ImmutableList.copyOf(sstables);
                     }
                 }).refs);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72acbcd0/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index a735657..1fb28f5 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -21,13 +21,16 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Sets;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
@@ -39,7 +42,11 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.notifications.INotification;
+import org.apache.cassandra.notifications.INotificationConsumer;
+import org.apache.cassandra.notifications.SSTableListChangedNotification;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
@@ -767,6 +774,71 @@ public class SSTableRewriterTest extends SchemaLoader
         truncate(cfs);
     }
 
+    @Test
+    public void testSSTableSectionsForRanges() throws IOException, 
InterruptedException, ExecutionException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        truncate(cfs);
+
+        cfs.addSSTable(writeFile(cfs, 1000));
+
+        Collection<SSTableReader> allSSTables = cfs.getSSTables();
+        assertEquals(1, allSSTables.size());
+        final Token firstToken = 
allSSTables.iterator().next().first.getToken();
+        DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1);
+
+        List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = 
StreamSession.getSSTableSectionsForRanges(
+            Collections.singleton(new Range<Token>(firstToken, firstToken)),
+            Collections.singleton(cfs), 0L, false);
+        assertEquals(1, sectionsBeforeRewrite.size());
+        for (StreamSession.SSTableStreamingSections section : 
sectionsBeforeRewrite)
+            section.ref.release();
+        final AtomicInteger checkCount = new AtomicInteger();
+        // needed since we get notified when compaction is done as well - we 
can't get sections for ranges for obsoleted sstables
+        INotificationConsumer consumer = new INotificationConsumer()
+                {
+                    public void handleNotification(INotification notification, 
Object sender)
+                    {
+                        if (notification instanceof 
SSTableListChangedNotification)
+                        {
+                            Collection<SSTableReader> added = 
((SSTableListChangedNotification) notification).added;
+                            Collection<SSTableReader> removed = 
((SSTableListChangedNotification) notification).removed;
+                            // note that we need to check if 
added.equals(removed) because once the compaction is done the old sstable will 
have
+                            // selfRef().globalCount() == 0 and we cant get 
the SectionsForRanges then. During incremental opening we always add and remove 
the same
+                            // sstable (note that the sstables are x.equal(y) 
but not x == y since the new one will be a new instance with a moved starting 
point
+                            // In this case we must avoid trying to call 
getSSTableSectionsForRanges since we are in the notification
+                            // method and trying to reference an sstable with 
globalcount == 0 puts it into a loop, and this blocks the tracker from removing 
the
+                            // unreferenced sstable.
+                            if (added.isEmpty() || 
!added.iterator().next().getColumnFamilyName().equals(cfs.getColumnFamilyName())
 || !added.equals(removed))
+                                return;
+
+                            // at no point must the rewrite process hide
+                            // sections returned by getSSTableSectionsForRanges
+                            Set<Range<Token>> range = 
Collections.singleton(new Range<Token>(firstToken, firstToken));
+                            List<StreamSession.SSTableStreamingSections> 
sections = StreamSession.getSSTableSectionsForRanges(range, 
Collections.singleton(cfs), 0L, false);
+                            assertEquals(1, sections.size());
+                            for (StreamSession.SSTableStreamingSections 
section : sections)
+                                section.ref.release();
+                            checkCount.incrementAndGet();
+                        }
+                    }
+                };
+        cfs.getDataTracker().subscribe(consumer);
+        try
+        {
+            cfs.forceMajorCompaction();
+            // reset
+        }
+        finally
+        {
+            DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(50);
+            cfs.getDataTracker().unsubscribe(consumer);
+        }
+        assertTrue(checkCount.get() >= 2);
+        truncate(cfs);
+    }
+
     /**
      * emulates anticompaction - writing from one source sstable to two new 
sstables
      *

Reply via email to