Hi list,

has anyone ever tried to migrate a cluster from Random to Murmur?

We would like to do so, to have a more standardized setup. I wrote a small
(yet untested) utility, which should be able to read SSTable files from
disk and write them into a cassandra cluster using Hector. This migration
would be offline of course and would only work for smaller clusters.

Any thoughts on the topic?

kind regards,
Christian

PS: The reason for doing so are not "performance". It is to simplify
operational stuff for the years to come. :-)
import java.io.File;
import java.io.FilenameFilter;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;

import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.ThriftCfDef;
import me.prettyprint.cassandra.service.ThriftKsDef;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.HCounterColumn;
import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.CounterColumn;
import org.apache.cassandra.db.DeletedColumn;
import org.apache.cassandra.db.ExpiringColumn;
import org.apache.cassandra.db.OnDiskAtom;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMetadata;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;

/**
 * 
 * @author Christian Spriegel
 *
 */
public class BulkMigrator
{
    private static Cluster     cluster;
    private static String      keyspaceName = null;
    private static Keyspace    keyspace;

    public static void main(String[] args) throws Exception
    {
//        Config.setClientMode(true);
        Config.setLoadYaml(false);
        if(args.length != 1)
        {
            System.out.println("java -jar BulkMigrator.jar <directory>");
            return;
        }
        
        String path = args[0];
        File dir = new File(path);
        if(!dir.exists() ||  !dir.isDirectory() )
        {
            System.out.println("Path is not a directory: "+path);
            return;
        }

        cluster = HFactory.getOrCreateCluster("TestCluster", new CassandraHostConfigurator("localhost:9160"));
        FilenameFilter filter = new FilenameFilter()
        {
            @Override
            public boolean accept(File dir, String name)
            {
                if(!name.endsWith("-Data.db"))
                    return false; // ignore non data files
                if(name.substring(0,name.length()-8) .contains("."))
                    return false; // ignore secondary indexes
                return true;
            }
        };
        for(File f : dir.listFiles(filter))
        {
            System.out.println ("Found file "+f +" ... ");
            Descriptor desc = Descriptor.fromFilename(dir, f.getName()).left;
            initCF(desc.ksname);
            System.out.println("Loaded descriptor "+desc+" ...");
            SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(desc).left;
            DatabaseDescriptor.setPartitioner((IPartitioner)(Class.forName(sstableMetadata.partitioner).newInstance()));
            System.out.println("Using partitioner "+DatabaseDescriptor.getPartitionerName());
            SSTableReader ssreader = SSTableReader.open(desc);
            int numkeys = (int)(ssreader.estimatedKeys());
            System.out.println("Opened reader "+ssreader+" with "+numkeys+" keys");
            
            SliceQueryFilter atomfilter = new SliceQueryFilter(ByteBuffer.allocate(0), ByteBuffer.allocate(0), false, Integer.MAX_VALUE);
            String cfname = desc.cfname;
            SSTableScanner rrar = ssreader.getScanner(new QueryFilter(null, new QueryPath(cfname), atomfilter));
            
            Mutator<ByteBuffer> mutator = HFactory.createMutator(keyspace, ByteBufferSerializer.get());
            int keyi = 0;
            long bufsize = 0L;
            while(rrar.hasNext())
            {
                OnDiskAtomIterator odai = rrar.next();
                ByteBuffer rowkey = odai.getKey().key.duplicate();
                keyi++;
                
                while(odai.hasNext())
                {
                    OnDiskAtom atom = odai.next();
                    long coltimestamp = atom.minTimestamp();
                    ByteBuffer colname = atom.name().duplicate();
                    //System.out.println("# "+StringSerializer.get().fromByteBuffer(odai.getKey().key.duplicate()) +" / "+StringSerializer.get().fromByteBuffer(atom.name()) );
                    
                    if(atom instanceof CounterColumn)
                    {
                        CounterColumn col = (CounterColumn) atom;
                        long val = col.total();
                        HCounterColumn colObj = HFactory.createCounterColumn(colname, val, ByteBufferSerializer.get());
                        mutator.addCounter(rowkey, cfname, colObj);
                        bufsize += 1000;
                    }
                    else if(atom instanceof DeletedColumn)
                    {
                        DeletedColumn col = (DeletedColumn) atom;
                        long delTime = col.getMarkedForDeleteAt();
                        if(delTime != coltimestamp)
                            throw new IllegalStateException("WTF: "+delTime +"!="+ coltimestamp);
                        mutator.addDeletion(rowkey, cfname, colname, ByteBufferSerializer.get(), delTime);
                        bufsize += 100;
                    }
                    else if(atom instanceof ExpiringColumn)
                    {
                        ExpiringColumn col = (ExpiringColumn) atom;
                        ByteBuffer val = col.value().duplicate();
                        int delTime = col.getLocalDeletionTime();
                        int ttl = Math.max(1, (int)((System.currentTimeMillis()/1000) - delTime));
                        HColumn<ByteBuffer, ByteBuffer> colObj = HFactory.createColumn(colname, val, coltimestamp, ttl, ByteBufferSerializer.get(), ByteBufferSerializer.get());
                        mutator.addInsertion(rowkey, cfname, colObj);
                        bufsize += 100+val.capacity();
                    }
                    else if(atom instanceof Column)
                    {
                        Column col = (Column) atom;
                        ByteBuffer val = col.value().duplicate();
                        HColumn<ByteBuffer, ByteBuffer> colObj = HFactory.createColumn(colname, val, coltimestamp, ByteBufferSerializer.get(), ByteBufferSerializer.get());
                        mutator.addInsertion(rowkey, cfname, colObj);
                        bufsize += 100+val.capacity();
                    }
                    else
                        throw new IllegalStateException("Unknown atom type: "+atom.getClass());
                    
                    if(bufsize > 1024L*512L)
                    {
                        System.out.print("\b\b\b\b"+(int)(((float)keyi/numkeys)*100)+"%");
                        mutator.execute();
                        bufsize = 0L;
                    }
                }
            }
            mutator.execute();
            System.out.println("\b\b\b\bdone.\n");
        }
    }

    private static void initCF(String ksname) throws Exception
    {
        if (BulkMigrator.keyspaceName != null && BulkMigrator.keyspaceName.equals(ksname))
            return;

        BulkMigrator.keyspace = HFactory.createKeyspace(ksname, cluster);
        BulkMigrator.keyspaceName = ksname;

        KeyspaceDefinition ksDef = cluster.describeKeyspace(ksname);
        List<CFMetaData> cfMetaDatas = new LinkedList<CFMetaData>();
        for (ColumnFamilyDefinition cfDef : ksDef.getCfDefs())
        {
            CFMetaData cfMetaData = CFMetaData.fromThrift(((ThriftCfDef) cfDef).toThrift());
            cfMetaDatas.add(cfMetaData);
        }
        KSMetaData ksm = KSMetaData.fromThrift(((ThriftKsDef) ksDef).toThrift(), cfMetaDatas.toArray(new CFMetaData[0]));
        Schema.instance.setTableDefinition(ksm);
    }

}

Reply via email to