Hi Mick, attached is the very simple MR job, that deletes expired URL from my test Cassandra DB. The keyspace looks like this:
Keyspace: Test: Replication Strategy: org.apache.cassandra.locator.SimpleStrategy Replication Factor: 2 Column Families: ColumnFamily: Url2 Columns sorted by: org.apache.cassandra.db.marshal.UTF8Type Row cache size / save period: 0.0/0 Key cache size / save period: 200000.0/3600 Memtable thresholds: 4.7015625/1003/60 GC grace seconds: 864000 Compaction min/max thresholds: 4/32 Read repair chance: 1.0 Built indexes: [] In the CF the key is URL and inside there are some data. My MR job needs just "expire_date" which is int64 timestamp. For now I store it as a string because I use Python and C++ to manipulate the data as well. For the MR Job to run you need a patch I did. You can find it here: https://issues.apache.org/jira/browse/CASSANDRA-2014 The atttached file contains the working version with cloned key in reduce() method. My other aproache was: [code] context.write(ByteBuffer.wrap(key.getBytes(), 0, key.getLength()), Collections.singletonList(getMutation(key))); [/code] Which produce junk keys. Best regards, Patrik
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.LongBuffer; import java.util.*; import org.apache.cassandra.avro.Mutation; import org.apache.cassandra.avro.Deletion; import org.apache.cassandra.avro.SliceRange; import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ContextExpirator extends Configured implements Tool { static final String KEYSPACE = "Test"; static final String COLUMN_FAMILY = "Url2"; static final String OUTPUT_COLUMN_FAMILY = "Url2"; static final String COLUMN_VALUE = "expire_date"; public static void main(String[] args) throws Exception { // Let ToolRunner handle generic command-line options ToolRunner.run(new Configuration(), new ContextExpirator(), args); System.exit(0); } public static class UrlFilterMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, NullWritable> { private final static NullWritable nic = NullWritable.get(); private ByteBuffer sourceColumn; private static long now; protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { sourceColumn = ByteBuffer.wrap(COLUMN_VALUE.getBytes()); now = System.currentTimeMillis() / 1000; // convert from ms } public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException { IColumn column = columns.get(sourceColumn); if (column == null) { return; } Text tKey = new Text(ByteBufferUtil.string(key)); Long value = Long.decode(ByteBufferUtil.string(column.value())); if(now > value) { context.write(tKey, nic); } } } public static class RemoveUrlReducer extends Reducer<Text, NullWritable, ByteBuffer, List<Mutation>> { public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { ByteBuffer bbKey = ByteBufferUtil.clone(ByteBuffer.wrap(key.getBytes(), 0, key.getLength())); context.write(bbKey, Collections.singletonList(getMutation())); } private static Mutation getMutation() { Deletion d = new Deletion(); d.timestamp = System.currentTimeMillis(); Mutation m = new Mutation(); m.deletion = d; return m; } } public int run(String[] args) throws Exception { Job job = new Job(getConf(), "context_expitator"); job.setJarByClass(ContextExpirator.class); job.setInputFormatClass(ColumnFamilyInputFormat.class); ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); job.setMapperClass(UrlFilterMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(RemoveUrlReducer.class); job.setOutputKeyClass(ByteBuffer.class); job.setOutputValueClass(List.class); job.setOutputFormatClass(ColumnFamilyOutputFormat.class); ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY); // cassandra details ConfigHelper.setRpcPort(job.getConfiguration(), "9160"); ConfigHelper.setInitialAddress(job.getConfiguration(), "127.0.0.1"); ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner"); SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBuffer.wrap(COLUMN_VALUE.getBytes()))); ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate); job.waitForCompletion(true); return 0; } }