Author: xedin Date: Wed Aug 17 11:46:55 2011 New Revision: 1158642 URL: http://svn.apache.org/viewvc?rev=1158642&view=rev Log: Add 'load new SSTables' functionality to JMX and corresponding "refresh" command to the nodetool patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-2991
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1158642&r1=1158641&r2=1158642&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Aug 17 11:46:55 2011 @@ -235,22 +235,10 @@ public class ColumnFamilyStore implement List<SSTableReader> sstables = new ArrayList<SSTableReader>(); for (Map.Entry<Descriptor,Set<Component>> sstableFiles : files(table.name, columnFamilyName, false, false).entrySet()) { - SSTableReader sstable; - try - { - sstable = SSTableReader.open(sstableFiles.getKey(), sstableFiles.getValue(), savedKeys, data, metadata, this.partitioner); - } - catch (FileNotFoundException ex) - { - logger.error("Missing sstable component in " + sstableFiles + "; skipped because of " + ex.getMessage()); - continue; - } - catch (IOException ex) - { - logger.error("Corrupt sstable " + sstableFiles + "; skipped", ex); - continue; - } - sstables.add(sstable); + SSTableReader reader = openSSTableReader(sstableFiles, savedKeys, data, metadata, partitioner); + + if (reader != null) // if == null, logger errors where already fired + sstables.add(reader); } data.addSSTables(sstables); @@ -512,6 +500,97 @@ public class ColumnFamilyStore implement } /** + * See #{@code StorageService.loadNewSSTables(String, String)} for more info + * + * @param ksName The keyspace name + * @param cfName The columnFamily name + */ + public static synchronized void loadNewSSTables(String ksName, String cfName) + { + /** ks/cf existence checks will be done by open and getCFS methods for us */ + Table table = Table.open(ksName); + table.getColumnFamilyStore(cfName).loadNewSSTables(); + } + + /** + * #{@inheritDoc} + */ + public synchronized void loadNewSSTables() + { + logger.info("Loading new SSTables for " + table.name + "/" + columnFamily + "..."); + + // current view over ColumnFamilyStore + DataTracker.View view = data.getView(); + // descriptors of currently registered SSTables + Set<Descriptor> currentDescriptors = new HashSet<Descriptor>(); + // going to hold new SSTable view of the CFS containing old and new SSTables + Set<SSTableReader> sstables = new HashSet<SSTableReader>(); + Set<DecoratedKey> savedKeys = keyCache.readSaved(); + // get the max generation number, to prevent generation conflicts + int generation = 0; + + for (SSTableReader reader : view.sstables) + { + sstables.add(reader); // first of all, add old SSTables + currentDescriptors.add(reader.descriptor); + + if (reader.descriptor.generation > generation) + generation = reader.descriptor.generation; + } + + SSTableReader reader; + // set to true if we have at least one new SSTable to load + boolean atLeastOneNew = false; + + for (Map.Entry<Descriptor, Set<Component>> rawSSTable : files(table.name, columnFamily, false, false).entrySet()) + { + Descriptor descriptor = rawSSTable.getKey(); + + if (currentDescriptors.contains(descriptor)) + continue; // old (initialized) SSTable found, skipping + + if (!descriptor.cfname.equals(columnFamily)) + continue; + + if (descriptor.isFromTheFuture()) + throw new RuntimeException(String.format("Can't open sstables from the future! Current version %s, found file: %s", + Descriptor.CURRENT_VERSION, + descriptor)); + + logger.info("Initializing new SSTable {}", rawSSTable); + reader = openSSTableReader(rawSSTable, savedKeys, data, metadata, partitioner); + + if (reader == null) + continue; // something wrong with SSTable, skipping + + sstables.add(reader); + + if (descriptor.generation > generation) + generation = descriptor.generation; + + if (!atLeastOneNew) // set flag only once + atLeastOneNew = true; + } + + if (!atLeastOneNew) + { + logger.info("No new SSTables where found for " + table.name + "/" + columnFamily); + return; + } + + logger.info("Loading new SSTable Set for " + table.name + "/" + columnFamily + ": " + sstables); + data.addSSTables(sstables); // this will call updateCacheSizes() for us + + logger.info("Requesting a full secondary index re-build for " + table.name + "/" + columnFamily); + indexManager.buildSecondaryIndexes(sstables, indexManager.getIndexedColumns()); + + logger.info("Setting up new generation: " + generation); + fileIndexGenerator.set(generation); + + logger.info("Done loading load new SSTables for " + table.name + "/" + columnFamily); + } + + /** * @return the name of the column family */ public String getColumnFamilyName() @@ -1892,4 +1971,28 @@ public class ColumnFamilyStore implement { return indexManager.getBuiltIndexes(); } + + private static SSTableReader openSSTableReader(Map.Entry<Descriptor, Set<Component>> rawSSTable, + Set<DecoratedKey> savedKeys, + DataTracker tracker, + CFMetaData metadata, + IPartitioner partitioner) + { + SSTableReader reader = null; + + try + { + reader = SSTableReader.open(rawSSTable.getKey(), rawSSTable.getValue(), savedKeys, tracker, metadata, partitioner); + } + catch (FileNotFoundException ex) + { + logger.error("Missing sstable component in " + rawSSTable + "; skipped because of " + ex.getMessage()); + } + catch (IOException ex) + { + logger.error("Corrupt sstable " + rawSSTable + "; skipped", ex); + } + + return reader; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1158642&r1=1158641&r2=1158642&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Wed Aug 17 11:46:55 2011 @@ -234,4 +234,10 @@ public interface ColumnFamilyStoreMBean public int getRowCacheKeysToSave(); public void setRowCacheKeysToSave(int keysToSave); + + /** + * Scan through Keyspace/ColumnFamily's data directory + * determine which SSTables should be loaded and load them + */ + public void loadNewSSTables(); } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1158642&r1=1158641&r2=1158642&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Aug 17 11:46:55 2011 @@ -2608,4 +2608,12 @@ public class StorageService implements I { SSTableDeletingTask.rescheduleFailedTasks(); } + + /** + * #{@inheritDoc} + */ + public void loadNewSSTables(String ksName, String cfName) + { + ColumnFamilyStore.loadNewSSTables(ksName, cfName); + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1158642&r1=1158641&r2=1158642&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Aug 17 11:46:55 2011 @@ -323,4 +323,12 @@ public interface StorageServiceMBean public void bulkLoad(String directory); public void rescheduleFailedDeletions(); + + /** + * Load new SSTables to the given keyspace/columnFamily + * + * @param ksName The parent keyspace name + * @param cfName The ColumnFamily name where SSTables belong + */ + public void loadNewSSTables(String ksName, String cfName); } Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1158642&r1=1158641&r2=1158642&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Wed Aug 17 11:46:55 2011 @@ -79,7 +79,8 @@ public class NodeCmd DECOMMISSION, MOVE, REMOVETOKEN, REPAIR, CLEANUP, COMPACT, SCRUB, SETCACHECAPACITY, GETCOMPACTIONTHRESHOLD, SETCOMPACTIONTHRESHOLD, NETSTATS, CFHISTOGRAMS, COMPACTIONSTATS, DISABLEGOSSIP, ENABLEGOSSIP, INVALIDATEKEYCACHE, INVALIDATEROWCACHE, - DISABLETHRIFT, ENABLETHRIFT, STATUSTHRIFT, JOIN, SETCOMPACTIONTHROUGHPUT, GETENDPOINTS + DISABLETHRIFT, ENABLETHRIFT, STATUSTHRIFT, JOIN, SETCOMPACTIONTHROUGHPUT, GETENDPOINTS, + REFRESH } @@ -125,6 +126,7 @@ public class NodeCmd addCmdHelp(header, "invalidaterowcache [keyspace] [cfnames]", "Invalidate the key cache of one or more column family"); addCmdHelp(header, "getcompactionthreshold <keyspace> <cfname>", "Print min and max compaction thresholds for a given column family"); addCmdHelp(header, "cfhistograms <keyspace> <cfname>", "Print statistic histograms for a given column family"); + addCmdHelp(header, "refresh <keyspace> <cf-name>", "Load newly placed SSTables to the system without restart."); // Three args addCmdHelp(header, "getendpoints <keyspace> <cf> <key>", "Print the end points that owns the key"); @@ -699,6 +701,11 @@ public class NodeCmd nodeCmd.printEndPoints(arguments[0], arguments[1], arguments[2], System.out); break; + case REFRESH: + if (arguments.length != 2) { badUse("load_new_sstables requires ks and cf args"); } + probe.loadNewSSTables(arguments[0], arguments[1]); + break; + default : throw new RuntimeException("Unreachable code."); Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1158642&r1=1158641&r2=1158642&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Aug 17 11:46:55 2011 @@ -591,6 +591,11 @@ public class NodeProbe { return msProxy.getDroppedMessages(); } + + public void loadNewSSTables(String ksName, String cfName) + { + ssProxy.loadNewSSTables(ksName, cfName); + } } class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>