Hi list,
has anyone ever tried to migrate a cluster from Random to Murmur?
We would like to do so, to have a more standardized setup. I wrote a small
(yet untested) utility, which should be able to read SSTable files from
disk and write them into a cassandra cluster using Hector. This migration
would be offline of course and would only work for smaller clusters.
Any thoughts on the topic?
kind regards,
Christian
PS: The reason for doing so are not "performance". It is to simplify
operational stuff for the years to come. :-)
import java.io.File;
import java.io.FilenameFilter;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.ThriftCfDef;
import me.prettyprint.cassandra.service.ThriftKsDef;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.HCounterColumn;
import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.CounterColumn;
import org.apache.cassandra.db.DeletedColumn;
import org.apache.cassandra.db.ExpiringColumn;
import org.apache.cassandra.db.OnDiskAtom;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMetadata;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;
/**
*
* @author Christian Spriegel
*
*/
public class BulkMigrator
{
private static Cluster cluster;
private static String keyspaceName = null;
private static Keyspace keyspace;
public static void main(String[] args) throws Exception
{
// Config.setClientMode(true);
Config.setLoadYaml(false);
if(args.length != 1)
{
System.out.println("java -jar BulkMigrator.jar <directory>");
return;
}
String path = args[0];
File dir = new File(path);
if(!dir.exists() || !dir.isDirectory() )
{
System.out.println("Path is not a directory: "+path);
return;
}
cluster = HFactory.getOrCreateCluster("TestCluster", new CassandraHostConfigurator("localhost:9160"));
FilenameFilter filter = new FilenameFilter()
{
@Override
public boolean accept(File dir, String name)
{
if(!name.endsWith("-Data.db"))
return false; // ignore non data files
if(name.substring(0,name.length()-8) .contains("."))
return false; // ignore secondary indexes
return true;
}
};
for(File f : dir.listFiles(filter))
{
System.out.println ("Found file "+f +" ... ");
Descriptor desc = Descriptor.fromFilename(dir, f.getName()).left;
initCF(desc.ksname);
System.out.println("Loaded descriptor "+desc+" ...");
SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(desc).left;
DatabaseDescriptor.setPartitioner((IPartitioner)(Class.forName(sstableMetadata.partitioner).newInstance()));
System.out.println("Using partitioner "+DatabaseDescriptor.getPartitionerName());
SSTableReader ssreader = SSTableReader.open(desc);
int numkeys = (int)(ssreader.estimatedKeys());
System.out.println("Opened reader "+ssreader+" with "+numkeys+" keys");
SliceQueryFilter atomfilter = new SliceQueryFilter(ByteBuffer.allocate(0), ByteBuffer.allocate(0), false, Integer.MAX_VALUE);
String cfname = desc.cfname;
SSTableScanner rrar = ssreader.getScanner(new QueryFilter(null, new QueryPath(cfname), atomfilter));
Mutator<ByteBuffer> mutator = HFactory.createMutator(keyspace, ByteBufferSerializer.get());
int keyi = 0;
long bufsize = 0L;
while(rrar.hasNext())
{
OnDiskAtomIterator odai = rrar.next();
ByteBuffer rowkey = odai.getKey().key.duplicate();
keyi++;
while(odai.hasNext())
{
OnDiskAtom atom = odai.next();
long coltimestamp = atom.minTimestamp();
ByteBuffer colname = atom.name().duplicate();
//System.out.println("# "+StringSerializer.get().fromByteBuffer(odai.getKey().key.duplicate()) +" / "+StringSerializer.get().fromByteBuffer(atom.name()) );
if(atom instanceof CounterColumn)
{
CounterColumn col = (CounterColumn) atom;
long val = col.total();
HCounterColumn colObj = HFactory.createCounterColumn(colname, val, ByteBufferSerializer.get());
mutator.addCounter(rowkey, cfname, colObj);
bufsize += 1000;
}
else if(atom instanceof DeletedColumn)
{
DeletedColumn col = (DeletedColumn) atom;
long delTime = col.getMarkedForDeleteAt();
if(delTime != coltimestamp)
throw new IllegalStateException("WTF: "+delTime +"!="+ coltimestamp);
mutator.addDeletion(rowkey, cfname, colname, ByteBufferSerializer.get(), delTime);
bufsize += 100;
}
else if(atom instanceof ExpiringColumn)
{
ExpiringColumn col = (ExpiringColumn) atom;
ByteBuffer val = col.value().duplicate();
int delTime = col.getLocalDeletionTime();
int ttl = Math.max(1, (int)((System.currentTimeMillis()/1000) - delTime));
HColumn<ByteBuffer, ByteBuffer> colObj = HFactory.createColumn(colname, val, coltimestamp, ttl, ByteBufferSerializer.get(), ByteBufferSerializer.get());
mutator.addInsertion(rowkey, cfname, colObj);
bufsize += 100+val.capacity();
}
else if(atom instanceof Column)
{
Column col = (Column) atom;
ByteBuffer val = col.value().duplicate();
HColumn<ByteBuffer, ByteBuffer> colObj = HFactory.createColumn(colname, val, coltimestamp, ByteBufferSerializer.get(), ByteBufferSerializer.get());
mutator.addInsertion(rowkey, cfname, colObj);
bufsize += 100+val.capacity();
}
else
throw new IllegalStateException("Unknown atom type: "+atom.getClass());
if(bufsize > 1024L*512L)
{
System.out.print("\b\b\b\b"+(int)(((float)keyi/numkeys)*100)+"%");
mutator.execute();
bufsize = 0L;
}
}
}
mutator.execute();
System.out.println("\b\b\b\bdone.\n");
}
}
private static void initCF(String ksname) throws Exception
{
if (BulkMigrator.keyspaceName != null && BulkMigrator.keyspaceName.equals(ksname))
return;
BulkMigrator.keyspace = HFactory.createKeyspace(ksname, cluster);
BulkMigrator.keyspaceName = ksname;
KeyspaceDefinition ksDef = cluster.describeKeyspace(ksname);
List<CFMetaData> cfMetaDatas = new LinkedList<CFMetaData>();
for (ColumnFamilyDefinition cfDef : ksDef.getCfDefs())
{
CFMetaData cfMetaData = CFMetaData.fromThrift(((ThriftCfDef) cfDef).toThrift());
cfMetaDatas.add(cfMetaData);
}
KSMetaData ksm = KSMetaData.fromThrift(((ThriftKsDef) ksDef).toThrift(), cfMetaDatas.toArray(new CFMetaData[0]));
Schema.instance.setTableDefinition(ksm);
}
}