Author: xedin
Date: Mon Aug 15 21:01:39 2011
New Revision: 1158008

URL: http://svn.apache.org/viewvc?rev=1158008&view=rev
Log:
Add 'load new SSTables' functionality to JMX and corresponding "refresh" 
command to the nodetool
patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-2991

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1158008&r1=1158007&r2=1158008&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon Aug 15 21:01:39 2011
@@ -7,7 +7,8 @@
    in a commitlog segment (CASSANDRA-3021)
  * fix cassandra.bat when CASSANDRA_HOME contains spaces (CASSANDRA-2952)
  * fix to SSTableSimpleUnsortedWriter bufferSize calculation (CASSANDRA-3027)
-
+ * add a 'load new SSTables' functionality to JMX and corresponding "refresh"
+   command to the nodetool (CASSANDRA-2991)
 
 0.8.4
  * include files-to-be-streamed in StreamInSession.getSources (CASSANDRA-2972)

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1158008&r1=1158007&r2=1158008&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Mon Aug 15 21:01:39 2011
@@ -274,22 +274,10 @@ public class ColumnFamilyStore implement
         List<SSTableReader> sstables = new ArrayList<SSTableReader>();
         for (Map.Entry<Descriptor,Set<Component>> sstableFiles : 
files(table.name, columnFamilyName, false, false).entrySet())
         {
-            SSTableReader sstable;
-            try
-            {
-                sstable = SSTableReader.open(sstableFiles.getKey(), 
sstableFiles.getValue(), savedKeys, data, metadata, this.partitioner);
-            }
-            catch (FileNotFoundException ex)
-            {
-                logger.error("Missing sstable component in " + sstableFiles + 
"; skipped because of " + ex.getMessage());
-                continue;
-            }
-            catch (IOException ex)
-            {
-                logger.error("Corrupt sstable " + sstableFiles + "; skipped", 
ex);
-                continue;
-            }
-            sstables.add(sstable);
+           SSTableReader reader = openSSTableReader(sstableFiles, savedKeys, 
data, metadata, partitioner);
+
+            if (reader != null) // if == null, logger errors where already 
fired
+                sstables.add(reader);
         }
         data.addSSTables(sstables);
 
@@ -465,7 +453,99 @@ public class ColumnFamilyStore implement
 
         return new ColumnFamilyStore(table, columnFamily, partitioner, value, 
metadata);
     }
-    
+
+    /**
+     * See #{@code StorageService.loadNewSSTables(String, String)} for more 
info
+     *
+     * @param ksName The keyspace name
+     * @param cfName The columnFamily name
+     */
+    public static synchronized void loadNewSSTables(String ksName, String 
cfName)
+    {
+        /** ks/cf existence checks will be done by open and getCFS methods for 
us */
+        Table table = Table.open(ksName);
+        table.getColumnFamilyStore(cfName).loadNewSSTables();
+    }
+
+    /**
+     * #{@inheritDoc}
+     */
+    public synchronized void loadNewSSTables()
+    {
+        logger.info("Loading new SSTables for " + table.name + "/" + 
columnFamily + "...");
+
+        // current view over ColumnFamilyStore
+        DataTracker.View view = data.getView();
+        // descriptors of currently registered SSTables
+        Set<Descriptor> currentDescriptors = new HashSet<Descriptor>();
+        // going to hold new SSTable view of the CFS containing old and new 
SSTables
+        Set<SSTableReader> sstables = new HashSet<SSTableReader>();
+        Set<DecoratedKey> savedKeys = keyCache.readSaved();
+        // get the max generation number, to prevent generation conflicts
+        int generation = 0;
+
+        for (SSTableReader reader : view.sstables)
+        {
+            sstables.add(reader); // first of all, add old SSTables
+            currentDescriptors.add(reader.descriptor);
+
+            if (reader.descriptor.generation > generation)
+                generation = reader.descriptor.generation;
+        }
+
+
+        SSTableReader reader;
+        // set to true if we have at least one new SSTable to load
+        boolean atLeastOneNew = false;
+
+        for (Map.Entry<Descriptor, Set<Component>> rawSSTable : 
files(table.name, columnFamily, false, false).entrySet())
+        {
+            Descriptor descriptor = rawSSTable.getKey();
+
+            if (currentDescriptors.contains(descriptor))
+                continue; // old (initialized) SSTable found, skipping
+
+            if (!descriptor.cfname.equals(columnFamily))
+                continue;
+
+            if (descriptor.isFromTheFuture())
+                throw new RuntimeException(String.format("Can't open sstables 
from the future! Current version %s, found file: %s",
+                                                         
Descriptor.CURRENT_VERSION,
+                                                         descriptor));
+
+            logger.info("Initializing new SSTable {}", rawSSTable);
+            reader = openSSTableReader(rawSSTable, savedKeys, data, metadata, 
partitioner);
+
+            if (reader == null)
+                continue; // something wrong with SSTable, skipping
+
+            sstables.add(reader);
+
+            if (descriptor.generation > generation)
+                generation = descriptor.generation;
+
+            if (!atLeastOneNew) // set flag only once
+                atLeastOneNew = true;
+        }
+
+        if (!atLeastOneNew)
+        {
+            logger.info("No new SSTables where found for " + table.name + "/" 
+ columnFamily);
+            return;
+        }
+
+        logger.info("Loading new SSTable Set for " + table.name + "/" + 
columnFamily + ": " + sstables);
+        data.addSSTables(sstables); // this will call updateCacheSizes() for us
+
+        logger.info("Requesting a full secondary index re-build for " + 
table.name + "/" + columnFamily);
+        buildSecondaryIndexes(sstables, getIndexedColumns());
+
+        logger.info("Setting up new generation: " + generation);
+        fileIndexGenerator.set(generation);
+
+        logger.info("Done loading load new SSTables for " + table.name + "/" + 
columnFamily);
+    }
+
     /**
      * Removes unnecessary files from the cf directory at startup: these 
include temp files, orphans, zero-length files
      * and compacted sstables. Files that cannot be recognized will be ignored.
@@ -2224,4 +2304,28 @@ public class ColumnFamilyStore implement
     {
         return data.getMemtablesPendingFlush();
     }
+
+    private static SSTableReader openSSTableReader(Map.Entry<Descriptor, 
Set<Component>> rawSSTable,
+                                                   Set<DecoratedKey> savedKeys,
+                                                   DataTracker tracker,
+                                                   CFMetaData metadata,
+                                                   IPartitioner partitioner)
+    {
+        SSTableReader reader = null;
+
+        try
+        {
+            reader = SSTableReader.open(rawSSTable.getKey(), 
rawSSTable.getValue(), savedKeys, tracker, metadata, partitioner);
+        }
+        catch (FileNotFoundException ex)
+        {
+            logger.error("Missing sstable component in " + rawSSTable + "; 
skipped because of " + ex.getMessage());
+        }
+        catch (IOException ex)
+        {
+            logger.error("Corrupt sstable " + rawSSTable + "; skipped", ex);
+        }
+
+        return reader;
+    }
 }

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1158008&r1=1158007&r2=1158008&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
 Mon Aug 15 21:01:39 2011
@@ -234,4 +234,10 @@ public interface ColumnFamilyStoreMBean
 
     public int getKeyCacheSavePeriodInSeconds();
     public void setKeyCacheSavePeriodInSeconds(int kcspis);
+
+    /**
+     * Scan through Keyspace/ColumnFamily's data directory
+     * determine which SSTables should be loaded and load them
+     */
+    public void loadNewSSTables();
 }

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1158008&r1=1158007&r2=1158008&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
 Mon Aug 15 21:01:39 2011
@@ -2615,4 +2615,12 @@ public class StorageService implements I
     {
         return AbstractCassandraDaemon.exceptions.get();
     }
+
+    /**
+     * #{@inheritDoc}
+     */
+    public void loadNewSSTables(String ksName, String cfName)
+    {
+        ColumnFamilyStore.loadNewSSTables(ksName, cfName);
+    }
 }

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1158008&r1=1158007&r2=1158008&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
 Mon Aug 15 21:01:39 2011
@@ -321,4 +321,12 @@ public interface StorageServiceMBean
     public void setCompactionThroughputMbPerSec(int value);
 
     public void bulkLoad(String directory);
+
+    /**
+     * Load new SSTables to the given keyspace/columnFamily
+     *
+     * @param ksName The parent keyspace name
+     * @param cfName The ColumnFamily name where SSTables belong
+     */
+    public void loadNewSSTables(String ksName, String cfName);
 }

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1158008&r1=1158007&r2=1158008&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java
 Mon Aug 15 21:01:39 2011
@@ -79,7 +79,8 @@ public class NodeCmd
         DECOMMISSION, MOVE, REMOVETOKEN, REPAIR, CLEANUP, COMPACT, SCRUB,
         SETCACHECAPACITY, GETCOMPACTIONTHRESHOLD, SETCOMPACTIONTHRESHOLD, 
NETSTATS, CFHISTOGRAMS,
         COMPACTIONSTATS, DISABLEGOSSIP, ENABLEGOSSIP, INVALIDATEKEYCACHE, 
INVALIDATEROWCACHE,
-        DISABLETHRIFT, ENABLETHRIFT, STATUSTHRIFT, JOIN, 
SETCOMPACTIONTHROUGHPUT, GETENDPOINTS
+        DISABLETHRIFT, ENABLETHRIFT, STATUSTHRIFT, JOIN, 
SETCOMPACTIONTHROUGHPUT, GETENDPOINTS,
+        REFRESH
     }
 
     
@@ -125,6 +126,7 @@ public class NodeCmd
         addCmdHelp(header, "invalidaterowcache [keyspace] [cfnames]", 
"Invalidate the key cache of one or more column family");
         addCmdHelp(header, "getcompactionthreshold <keyspace> <cfname>", 
"Print min and max compaction thresholds for a given column family");
         addCmdHelp(header, "cfhistograms <keyspace> <cfname>", "Print 
statistic histograms for a given column family");
+        addCmdHelp(header, "refresh <keyspace> <cf-name>", "Load newly placed 
SSTables to the system without restart.");
 
         // Three args
         addCmdHelp(header, "getendpoints <keyspace> <cf> <key>", "Print the 
end points that owns the key");
@@ -699,6 +701,11 @@ public class NodeCmd
                 nodeCmd.printEndPoints(arguments[0], arguments[1], 
arguments[2], System.out);
                 break;
 
+            case REFRESH:
+                if (arguments.length != 2) { badUse("load_new_sstables 
requires ks and cf args"); }
+                probe.loadNewSSTables(arguments[0], arguments[1]);
+                break;
+
             default :
                 throw new RuntimeException("Unreachable code.");
 

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1158008&r1=1158007&r2=1158008&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java
 Mon Aug 15 21:01:39 2011
@@ -591,6 +591,11 @@ public class NodeProbe
     {
         return msProxy.getDroppedMessages();
     }
+
+    public void loadNewSSTables(String ksName, String cfName)
+    {
+        ssProxy.loadNewSSTables(ksName, cfName);
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, 
ColumnFamilyStoreMBean>>


Reply via email to