Hi Mick,

attached is the very simple MR job, that deletes expired URL from my
test Cassandra DB. The keyspace looks like this:

Keyspace: Test:
  Replication Strategy: org.apache.cassandra.locator.SimpleStrategy
    Replication Factor: 2
  Column Families:
    ColumnFamily: Url2
      Columns sorted by: org.apache.cassandra.db.marshal.UTF8Type
      Row cache size / save period: 0.0/0
      Key cache size / save period: 200000.0/3600
      Memtable thresholds: 4.7015625/1003/60
      GC grace seconds: 864000
      Compaction min/max thresholds: 4/32
      Read repair chance: 1.0
      Built indexes: []

In the CF the key is URL and inside there are some data. My MR job
needs just "expire_date" which is int64 timestamp. For now I store it
as a string because I use Python and C++ to manipulate the data as
well.

For the MR Job to run you need a patch I did. You can find it here:
https://issues.apache.org/jira/browse/CASSANDRA-2014

The atttached file contains the working version with cloned key in
reduce() method. My other aproache was:
[code]
context.write(ByteBuffer.wrap(key.getBytes(), 0, key.getLength()),
Collections.singletonList(getMutation(key)));
[/code]
Which produce junk keys.

Best regards,
Patrik
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.LongBuffer;
import java.util.*;

import org.apache.cassandra.avro.Mutation;
import org.apache.cassandra.avro.Deletion;
import org.apache.cassandra.avro.SliceRange;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;

import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ContextExpirator extends Configured implements Tool
{
    static final String KEYSPACE = "Test";
    static final String COLUMN_FAMILY = "Url2";
    static final String OUTPUT_COLUMN_FAMILY = "Url2";
    static final String COLUMN_VALUE = "expire_date";

    public static void main(String[] args) throws Exception
    {
        // Let ToolRunner handle generic command-line options
        ToolRunner.run(new Configuration(), new ContextExpirator(), args);
        System.exit(0);
    }

    public static class UrlFilterMapper
            extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, NullWritable>
    {
        private final static NullWritable nic = NullWritable.get();
        private ByteBuffer sourceColumn;
        private static long now;

        protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
        throws IOException, InterruptedException
        {
            sourceColumn = ByteBuffer.wrap(COLUMN_VALUE.getBytes());
            now = System.currentTimeMillis() / 1000; // convert from ms
        }

        public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context)
            throws IOException, InterruptedException
        {
            IColumn column = columns.get(sourceColumn);
            if (column == null) {
                return;
            }

            Text tKey = new Text(ByteBufferUtil.string(key));
            Long value = Long.decode(ByteBufferUtil.string(column.value()));

            if(now > value) {
                context.write(tKey, nic);
            }
        }
    }

    public static class RemoveUrlReducer
            extends Reducer<Text, NullWritable, ByteBuffer, List<Mutation>>
    {
        public void reduce(Text key, Iterable<NullWritable> values, Context context)
            throws IOException, InterruptedException
        {
            ByteBuffer bbKey = ByteBufferUtil.clone(ByteBuffer.wrap(key.getBytes(), 0, key.getLength()));
            context.write(bbKey, Collections.singletonList(getMutation()));
        }

        private static Mutation getMutation()
        {
            Deletion d = new Deletion();
            d.timestamp = System.currentTimeMillis();

            Mutation m = new Mutation();
            m.deletion = d;

            return m;
        }
    }

    public int run(String[] args) throws Exception
    {
        Job job = new Job(getConf(), "context_expitator");
        job.setJarByClass(ContextExpirator.class);

        job.setInputFormatClass(ColumnFamilyInputFormat.class);
        ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);

        job.setMapperClass(UrlFilterMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setReducerClass(RemoveUrlReducer.class);
        job.setOutputKeyClass(ByteBuffer.class);
        job.setOutputValueClass(List.class);

        job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
        ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);

        // cassandra details
        ConfigHelper.setRpcPort(job.getConfiguration(), "9160");
        ConfigHelper.setInitialAddress(job.getConfiguration(), "127.0.0.1");
        ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
        SlicePredicate predicate =
            new SlicePredicate().setColumn_names(Arrays.asList(ByteBuffer.wrap(COLUMN_VALUE.getBytes())));
        ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);

        job.waitForCompletion(true);
        return 0;
    }
}

Reply via email to