Hi list, has anyone ever tried to migrate a cluster from Random to Murmur?
We would like to do so, to have a more standardized setup. I wrote a small (yet untested) utility, which should be able to read SSTable files from disk and write them into a cassandra cluster using Hector. This migration would be offline of course and would only work for smaller clusters. Any thoughts on the topic? kind regards, Christian PS: The reason for doing so are not "performance". It is to simplify operational stuff for the years to come. :-)
import java.io.File; import java.io.FilenameFilter; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; import me.prettyprint.cassandra.serializers.ByteBufferSerializer; import me.prettyprint.cassandra.service.CassandraHostConfigurator; import me.prettyprint.cassandra.service.ThriftCfDef; import me.prettyprint.cassandra.service.ThriftKsDef; import me.prettyprint.hector.api.Cluster; import me.prettyprint.hector.api.Keyspace; import me.prettyprint.hector.api.beans.HColumn; import me.prettyprint.hector.api.beans.HCounterColumn; import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition; import me.prettyprint.hector.api.ddl.KeyspaceDefinition; import me.prettyprint.hector.api.factory.HFactory; import me.prettyprint.hector.api.mutation.Mutator; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.Column; import org.apache.cassandra.db.CounterColumn; import org.apache.cassandra.db.DeletedColumn; import org.apache.cassandra.db.ExpiringColumn; import org.apache.cassandra.db.OnDiskAtom; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.filter.SliceQueryFilter; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMetadata; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableScanner; /** * * @author Christian Spriegel * */ public class BulkMigrator { private static Cluster cluster; private static String keyspaceName = null; private static Keyspace keyspace; public static void main(String[] args) throws Exception { // Config.setClientMode(true); Config.setLoadYaml(false); if(args.length != 1) { System.out.println("java -jar BulkMigrator.jar <directory>"); return; } String path = args[0]; File dir = new File(path); if(!dir.exists() || !dir.isDirectory() ) { System.out.println("Path is not a directory: "+path); return; } cluster = HFactory.getOrCreateCluster("TestCluster", new CassandraHostConfigurator("localhost:9160")); FilenameFilter filter = new FilenameFilter() { @Override public boolean accept(File dir, String name) { if(!name.endsWith("-Data.db")) return false; // ignore non data files if(name.substring(0,name.length()-8) .contains(".")) return false; // ignore secondary indexes return true; } }; for(File f : dir.listFiles(filter)) { System.out.println ("Found file "+f +" ... "); Descriptor desc = Descriptor.fromFilename(dir, f.getName()).left; initCF(desc.ksname); System.out.println("Loaded descriptor "+desc+" ..."); SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(desc).left; DatabaseDescriptor.setPartitioner((IPartitioner)(Class.forName(sstableMetadata.partitioner).newInstance())); System.out.println("Using partitioner "+DatabaseDescriptor.getPartitionerName()); SSTableReader ssreader = SSTableReader.open(desc); int numkeys = (int)(ssreader.estimatedKeys()); System.out.println("Opened reader "+ssreader+" with "+numkeys+" keys"); SliceQueryFilter atomfilter = new SliceQueryFilter(ByteBuffer.allocate(0), ByteBuffer.allocate(0), false, Integer.MAX_VALUE); String cfname = desc.cfname; SSTableScanner rrar = ssreader.getScanner(new QueryFilter(null, new QueryPath(cfname), atomfilter)); Mutator<ByteBuffer> mutator = HFactory.createMutator(keyspace, ByteBufferSerializer.get()); int keyi = 0; long bufsize = 0L; while(rrar.hasNext()) { OnDiskAtomIterator odai = rrar.next(); ByteBuffer rowkey = odai.getKey().key.duplicate(); keyi++; while(odai.hasNext()) { OnDiskAtom atom = odai.next(); long coltimestamp = atom.minTimestamp(); ByteBuffer colname = atom.name().duplicate(); //System.out.println("# "+StringSerializer.get().fromByteBuffer(odai.getKey().key.duplicate()) +" / "+StringSerializer.get().fromByteBuffer(atom.name()) ); if(atom instanceof CounterColumn) { CounterColumn col = (CounterColumn) atom; long val = col.total(); HCounterColumn colObj = HFactory.createCounterColumn(colname, val, ByteBufferSerializer.get()); mutator.addCounter(rowkey, cfname, colObj); bufsize += 1000; } else if(atom instanceof DeletedColumn) { DeletedColumn col = (DeletedColumn) atom; long delTime = col.getMarkedForDeleteAt(); if(delTime != coltimestamp) throw new IllegalStateException("WTF: "+delTime +"!="+ coltimestamp); mutator.addDeletion(rowkey, cfname, colname, ByteBufferSerializer.get(), delTime); bufsize += 100; } else if(atom instanceof ExpiringColumn) { ExpiringColumn col = (ExpiringColumn) atom; ByteBuffer val = col.value().duplicate(); int delTime = col.getLocalDeletionTime(); int ttl = Math.max(1, (int)((System.currentTimeMillis()/1000) - delTime)); HColumn<ByteBuffer, ByteBuffer> colObj = HFactory.createColumn(colname, val, coltimestamp, ttl, ByteBufferSerializer.get(), ByteBufferSerializer.get()); mutator.addInsertion(rowkey, cfname, colObj); bufsize += 100+val.capacity(); } else if(atom instanceof Column) { Column col = (Column) atom; ByteBuffer val = col.value().duplicate(); HColumn<ByteBuffer, ByteBuffer> colObj = HFactory.createColumn(colname, val, coltimestamp, ByteBufferSerializer.get(), ByteBufferSerializer.get()); mutator.addInsertion(rowkey, cfname, colObj); bufsize += 100+val.capacity(); } else throw new IllegalStateException("Unknown atom type: "+atom.getClass()); if(bufsize > 1024L*512L) { System.out.print("\b\b\b\b"+(int)(((float)keyi/numkeys)*100)+"%"); mutator.execute(); bufsize = 0L; } } } mutator.execute(); System.out.println("\b\b\b\bdone.\n"); } } private static void initCF(String ksname) throws Exception { if (BulkMigrator.keyspaceName != null && BulkMigrator.keyspaceName.equals(ksname)) return; BulkMigrator.keyspace = HFactory.createKeyspace(ksname, cluster); BulkMigrator.keyspaceName = ksname; KeyspaceDefinition ksDef = cluster.describeKeyspace(ksname); List<CFMetaData> cfMetaDatas = new LinkedList<CFMetaData>(); for (ColumnFamilyDefinition cfDef : ksDef.getCfDefs()) { CFMetaData cfMetaData = CFMetaData.fromThrift(((ThriftCfDef) cfDef).toThrift()); cfMetaDatas.add(cfMetaData); } KSMetaData ksm = KSMetaData.fromThrift(((ThriftKsDef) ksDef).toThrift(), cfMetaDatas.toArray(new CFMetaData[0])); Schema.instance.setTableDefinition(ksm); } }