Author: jbellis
Date: Mon Aug 29 20:28:46 2011
New Revision: 1162988

URL: http://svn.apache.org/viewvc?rev=1162988&view=rev
Log:
fix race condition in sstable reference counting
patch by jbellis; reviewed by slebresne for CASSANDRA-3085

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1162988&r1=1162987&r2=1162988&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Aug 29 20:28:46 2011
@@ -44,7 +44,7 @@
    Thrift<->Avro conversion methods (CASSANDRA-3032)
  * Add timeouts to client request schedulers (CASSANDRA-3079)
  * Cli to use hashes rather than array of hashes for strategy options 
(CASSANDRA-3081)
- * LeveledCompactionStrategy (CASSANDRA-1608)
+ * LeveledCompactionStrategy (CASSANDRA-1608, 3085)
  * Improvements of the CLI `describe` command (CASSANDRA-2630)
  * reduce window where dropped CF sstables may not be deleted (CASSANDRA-2942)
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java?rev=1162988&r1=1162987&r2=1162988&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java 
Mon Aug 29 20:28:46 2011
@@ -72,15 +72,12 @@ public class CollationController
     {
         logger.debug("collectTimeOrderedData");
 
-        List<IColumnIterator> iterators;
-        ColumnFamily container;
-        while (true)
-        {
-            DataTracker.View dataview = cfs.getDataTracker().getView();
-            iterators = new ArrayList<IColumnIterator>();
-            container = ColumnFamily.create(cfs.metadata, factory, 
filter.filter.isReversed());
-            List<SSTableReader> sstables;
-            for (Memtable memtable : 
Iterables.concat(dataview.memtablesPendingFlush, 
Collections.singleton(dataview.memtable)))
+        ColumnFamily container = ColumnFamily.create(cfs.metadata, factory, 
filter.filter.isReversed());
+        List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
+        ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key);
+        try
+        {
+            for (Memtable memtable : view.memtables)
             {
                 IColumnIterator iter = 
filter.getMemtableColumnIterator(memtable, cfs.metadata.comparator);
                 if (iter != null)
@@ -99,42 +96,33 @@ public class CollationController
             QueryFilter reducedFilter = new QueryFilter(filter.key, 
filter.path, new NamesQueryFilter(filterColumns));
 
             /* add the SSTables on disk */
-            sstables = dataview.intervalTree.search(new Interval(filter.key, 
filter.key));
-            Collections.sort(sstables, SSTable.maxTimestampComparator);
-            if (!SSTableReader.acquireReferences(sstables))
-                continue; // retry w/ new view
+            Collections.sort(view.sstables, SSTable.maxTimestampComparator);
 
-            try
+            // read sorted sstables
+            for (SSTableReader sstable : view.sstables)
             {
-                // read sorted sstables
-                for (SSTableReader sstable : sstables)
+                long currentMaxTs = sstable.getMaxTimestamp();
+                reduceNameFilter(reducedFilter, container, currentMaxTs);
+                if (((NamesQueryFilter) 
reducedFilter.filter).columns.isEmpty())
+                    break;
+
+                IColumnIterator iter = 
reducedFilter.getSSTableColumnIterator(sstable);
+                iterators.add(iter);
+                if (iter.getColumnFamily() != null)
                 {
-                    long currentMaxTs = sstable.getMaxTimestamp();
-                    reduceNameFilter(reducedFilter, container, currentMaxTs);
-                    if (((NamesQueryFilter) 
reducedFilter.filter).columns.isEmpty())
-                        break;
-
-                    IColumnIterator iter = 
reducedFilter.getSSTableColumnIterator(sstable);
-                    iterators.add(iter);
-                    if (iter.getColumnFamily() != null)
-                    {
-                        container.delete(iter.getColumnFamily());
-                        sstablesIterated++;
-                        while (iter.hasNext())
-                            container.addColumn(iter.next());
-                    }
+                    container.delete(iter.getColumnFamily());
+                    sstablesIterated++;
+                    while (iter.hasNext())
+                        container.addColumn(iter.next());
                 }
             }
-            finally
-            {
-                SSTableReader.releaseReferences(sstables);
-                for (IColumnIterator iter : iterators)
-                    FileUtils.closeQuietly(iter);
-            }
-
-            break; // sstable reference acquisition was successful
         }
-
+        finally
+        {
+            SSTableReader.releaseReferences(view.sstables);
+            for (IColumnIterator iter : iterators)
+                FileUtils.closeQuietly(iter);
+        }
 
         // we need to distinguish between "there is no data at all for this 
row" (BF will let us rebuild that efficiently)
         // and "there used to be data, but it's gone now" (we should cache the 
empty CF so we don't need to rebuild that slower)
@@ -198,12 +186,11 @@ public class CollationController
         logger.debug("collectAllData");
         List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
         ColumnFamily returnCF = ColumnFamily.create(cfs.metadata, factory, 
filter.filter.isReversed());
-        List<SSTableReader> sstables;
 
-        while (true)
+        ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key);
+        try
         {
-            DataTracker.View dataview = cfs.getDataTracker().getView();
-            for (Memtable memtable : 
Iterables.concat(dataview.memtablesPendingFlush, 
Collections.singleton(dataview.memtable)))
+            for (Memtable memtable : view.memtables)
             {
                 IColumnIterator iter = 
filter.getMemtableColumnIterator(memtable, cfs.metadata.comparator);
                 if (iter != null)
@@ -213,32 +200,22 @@ public class CollationController
                 }
             }
 
-            /* add the SSTables on disk */
-            sstables = dataview.intervalTree.search(new Interval(filter.key, 
filter.key));
-            if (!SSTableReader.acquireReferences(sstables))
-                continue; // retry w/ new view
-
-            try
+            for (SSTableReader sstable : view.sstables)
             {
-                for (SSTableReader sstable : sstables)
+                IColumnIterator iter = 
filter.getSSTableColumnIterator(sstable);
+                iterators.add(iter);
+                if (iter.getColumnFamily() != null)
                 {
-                    IColumnIterator iter = 
filter.getSSTableColumnIterator(sstable);
-                    iterators.add(iter);
-                    if (iter.getColumnFamily() != null)
-                    {
-                        returnCF.delete(iter.getColumnFamily());
-                        sstablesIterated++;
-                    }
+                    returnCF.delete(iter.getColumnFamily());
+                    sstablesIterated++;
                 }
             }
-            finally
-            {
-                SSTableReader.releaseReferences(sstables);
-                for (IColumnIterator iter : iterators)
-                    FileUtils.closeQuietly(iter);
-            }
-
-            break; // sstable reference acquisition was successful
+        }
+        finally
+        {
+            SSTableReader.releaseReferences(view.sstables);
+            for (IColumnIterator iter : iterators)
+                FileUtils.closeQuietly(iter);
         }
 
         // we need to distinguish between "there is no data at all for this 
row" (BF will let us rebuild that efficiently)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1162988&r1=1162987&r2=1162988&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon 
Aug 29 20:28:46 2011
@@ -1270,6 +1270,48 @@ public class ColumnFamilyStore implement
         return markCurrentViewReferenced().sstables;
     }
 
+    /**
+     * @return a ViewFragment containing the sstables and memtables that may 
need to be merged
+     * for the given @param key, according to the interval tree
+     */
+    public ViewFragment markReferenced(DecoratedKey key)
+    {
+        assert !key.isEmpty();
+        DataTracker.View view;
+        List<SSTableReader> sstables;
+        while (true)
+        {
+            view = data.getView();
+            sstables = view.intervalTree.search(new Interval(key, key));
+            if (SSTableReader.acquireReferences(sstables))
+                break;
+            // retry w/ new view
+        }
+        return new ViewFragment(sstables, 
Iterables.concat(Collections.singleton(view.memtable), 
view.memtablesPendingFlush));
+    }
+
+    /**
+     * @return a ViewFragment containing the sstables and memtables that may 
need to be merged
+     * for rows between @param startWith and @param stopAt, inclusive, 
according to the interval tree
+     */
+    public ViewFragment markReferenced(DecoratedKey startWith, DecoratedKey 
stopAt)
+    {
+        DataTracker.View view;
+        List<SSTableReader> sstables;
+        while (true)
+        {
+            view = data.getView();
+            // startAt == minimum is ok, but stopAt == minimum is confusing 
because all IntervalTree deals with
+            // is Comparable, so it won't know to special-case that.
+            Comparable stopInTree = stopAt.isEmpty() ? view.intervalTree.max : 
stopAt;
+            sstables = view.intervalTree.search(new Interval(startWith, 
stopInTree));
+            if (SSTableReader.acquireReferences(sstables))
+                break;
+            // retry w/ new view
+        }
+        return new ViewFragment(sstables, 
Iterables.concat(Collections.singleton(view.memtable), 
view.memtablesPendingFlush));
+    }
+
     private ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore, 
ISortedColumns.Factory factory)
     {
         CollationController controller = new CollationController(this, 
factory, filter, gcBefore);
@@ -1301,25 +1343,12 @@ public class ColumnFamilyStore implement
         QueryFilter filter = new QueryFilter(null, new QueryPath(columnFamily, 
superColumn, null), columnFilter);
         int gcBefore = (int)(System.currentTimeMillis() / 1000) - 
metadata.getGcGraceSeconds();
 
-        DataTracker.View currentView = markCurrentViewReferenced();
+        List<Row> rows;
+        ViewFragment view = markReferenced(startWith, stopAt);
         try
         {
-            Collection<Memtable> memtables = new ArrayList<Memtable>();
-            memtables.add(currentView.memtable);
-            memtables.addAll(currentView.memtablesPendingFlush);
-            // It is fine to aliases the View.sstables since it's an 
unmodifiable collection
-            Collection<SSTableReader> sstables = currentView.sstables;
-
-            Comparable startWithComp = startWith;
-            Comparable stopAtComp = stopAt;
-            if (startWith.token.equals(partitioner.getMinimumToken()))
-                startWithComp = currentView.intervalTree.min;
-            if (stopAt.token.equals(partitioner.getMinimumToken()))
-                stopAtComp = currentView.intervalTree.max;
-            sstables = currentView.intervalTree.search(new 
Interval(startWithComp, stopAtComp));
-
-            CloseableIterator<Row> iterator = 
RowIteratorFactory.getIterator(memtables, sstables, startWith, stopAt, filter, 
getComparator(), this);
-            List<Row> rows = new ArrayList<Row>();
+            CloseableIterator<Row> iterator = 
RowIteratorFactory.getIterator(view.memtables, view.sstables, startWith, 
stopAt, filter, getComparator(), this);
+            rows = new ArrayList<Row>();
 
             try
             {
@@ -1361,13 +1390,14 @@ public class ColumnFamilyStore implement
                     throw new IOError(e);
                 }
             }
-
-            return rows;
         }
         finally
         {
-            SSTableReader.releaseReferences(currentView.sstables);
+            // separate finally block to release references in case 
getIterator() throws
+            SSTableReader.releaseReferences(view.sstables);
         }
+
+        return rows;
     }
 
     public List<Row> search(IndexClause clause, AbstractBounds range, IFilter 
dataFilter)
@@ -1941,4 +1971,16 @@ public class ColumnFamilyStore implement
                ? ((LeveledCompactionStrategy) 
this.compactionStrategy).getLevelSize(0)
                : 0;
     }
+
+    public static class ViewFragment
+    {
+        public final List<SSTableReader> sstables;
+        public final Iterable<Memtable> memtables;
+
+        public ViewFragment(List<SSTableReader> sstables, Iterable<Memtable> 
memtables)
+        {
+            this.sstables = sstables;
+            this.memtables = memtables;
+        }
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1162988&r1=1162987&r2=1162988&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java 
Mon Aug 29 20:28:46 2011
@@ -61,14 +61,13 @@ public class RowIteratorFactory
      * @param comparator
      * @return A row iterator following all the given restrictions
      */
-    public static CloseableIterator<Row> getIterator(final 
Collection<Memtable> memtables,
+    public static CloseableIterator<Row> getIterator(final Iterable<Memtable> 
memtables,
                                           final Collection<SSTableReader> 
sstables,
                                           final DecoratedKey startWith,
                                           final DecoratedKey stopAt,
                                           final QueryFilter filter,
                                           final AbstractType comparator,
-                                          final ColumnFamilyStore cfs
-    )
+                                          final ColumnFamilyStore cfs)
     {
         // fetch data from current memtable, historical memtables, and 
SSTables in the correct order.
         final List<CloseableIterator<IColumnIterator>> iterators = new 
ArrayList<CloseableIterator<IColumnIterator>>();


Reply via email to