Hi,

I try to run the org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver.
My input look like (userid, itemid) as follows:
...
122641863,5060057723326
123441107,9789020282948
...

How can I transform my input to the format that DistributedLanczosSolver needs (rows = users, columns=items, elements=number of items/user)?

I tried to write a MapReduce Job with Mapper<Object, Text, IntWritable, IntWritable>
that maps the row to userid as key and itemid as value
and a Reducer<IntWritable,IntWritable,IntWritable,SequentialAccessSparseVector> that instantiates a SequentialAccessSparseVector with itemid as key and itemid as index and sum(itemid) as value.

I am getting this exception with the attached code:

2010-06-29 09:04:59,172 WARN org.apache.hadoop.mapred.TaskTracker: Error running child
java.lang.NullPointerException
at org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
        at 
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:759)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:487)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:575)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
        at org.apache.hadoop.mapred.Child.main(Child.java:170)


Can you suggest any other way?

Regards,
Laszlo

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.mahout.math.SequentialAccessSparseVector;

public class UserItemMatrix {
        public static class TokenizerMapper  extends Mapper<Object, Text, 
IntWritable, IntWritable>{
                private final static String DELIMITER = ",";

                public void map(Object key, Text value, Context context) throws 
IOException, InterruptedException {
                        String[] values = DELIMITER.split(value.toString());
                        IntWritable userId = new 
IntWritable(Integer.parseInt(values[0]));
                        IntWritable itemId = new 
IntWritable(Integer.parseInt(values[1]));
                        context.write(userId, itemId);
                }
        }

        public static class ItemReducer extends 
Reducer<IntWritable,IntWritable,IntWritable,SequentialAccessSparseVector> {
                
                private SequentialAccessSparseVector vector = new 
SequentialAccessSparseVector();
                
                public void reduce(IntWritable key, Iterable<IntWritable> 
values, Context context) throws IOException, InterruptedException {
                        Map<Integer,Integer> cooccurence = new 
HashMap<Integer,Integer>();
                        for (IntWritable val : values) {
                                int num = cooccurence.get(val.get());
                                num++;
                                cooccurence.put(val.get(), num);
                        }
                        for(Map.Entry<Integer, Integer> entry : 
cooccurence.entrySet()) {
                                vector.set(entry.getKey(), entry.getValue());
                        }
                        context.write(key, vector);
                }
        }

        public static void main(String[] args) throws Exception {
                Configuration conf = new Configuration();
                String[] otherArgs = new GenericOptionsParser(conf, 
args).getRemainingArgs();
                if (otherArgs.length != 2) {
                        System.err.println("Usage: User Item cooccurence matrix 
<in> <out>");
                        System.exit(2);
                }
                
                Job job = new Job(conf, "User Item cooccurence matrix");
                job.setJarByClass(UserItemMatrix.class);
                job.setMapperClass(TokenizerMapper.class);
                job.setCombinerClass(ItemReducer.class);
                job.setReducerClass(ItemReducer.class);
                job.setOutputKeyClass(IntWritable.class);
                job.setOutputValueClass(SequentialAccessSparseVector.class);
                FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
                FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
                System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
}

Reply via email to