Hello hadoop users.

I am trying to implement a mapreduce KMeans algorithm using hadoop. The problem i have is that the code does not enter the map and reduce class. I'm running the application from Intellij Idea not using hadoop binary.


The rest of the email is a sample of my code. If someone can see something that could help that would be greatly appreciated.

Thanks in advance.

Here is my driver code:

Job job = Job.getInstance(conf); job.setJobName("kmeans"); 
job.setJarByClass(KMeans.class); FileInputFormat.addInputPath(job, input); 
FileOutputFormat.setOutputPath(job, output); job.setMapperClass(KMeansMapper.class); 
job.setReducerClass(KMeansReducer.class);job.setMapOutputKeyClass(PointVector.class); 
job.setMapOutputValueClass(PointVector.class); job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); 
job.setOutputFormatClass(TextOutputFormat.class);job.waitForCompletion(true);

And below are my map and reduce classes:

public class KMeansMapperextends Mapper<LongWritable, Text, PointVector, 
PointVector> {

    private int clusters; private List<ImmutableTriple<Integer, String, 
PointVector>>centers; @Override protected void setup(Context context)throws 
IOException, InterruptedException {
        System.out.println("Inside setup"); this.clusters = 
Integer.valueOf(context.getConfiguration().get("clusters")); this.centers =new ArrayList<>(); 
BufferedReader br =new BufferedReader(new FileReader("/home/denis/centers")); for(int i =0; i <clusters; 
i++) {
            centers.add(DocumentRecordParser.parse(br.readLine())); }
        br.close(); }

    @Override public void map(LongWritable key, Text value, Context 
context)throws IOException, InterruptedException {
        PointVector line = DocumentRecordParser.returnPointVector(value.toString()); 
System.out.println("Inside map!"); double minDist = Double.MAX_VALUE; double 
dist;PointVector index =null; EuclideanDistance ed =new EuclideanDistance(); for 
(ImmutableTriple<Integer, String, PointVector> c :centers) {
            dist = ed.compute(line.points(), c.right.points()); if (dist < 
minDist) {
                minDist = dist; index = c.right; }
        }

        context.write(index, line); }
}

public class KMeansReducerextends Reducer<PointVector, PointVector, Text, Text> 
{
    private double min_dist = Double.MAX_VALUE; @Override public void 
reduce(PointVector center, Iterable<PointVector> points, Context context)throws 
IOException, InterruptedException {
        EuclideanDistance measure =new EuclideanDistance(); double distance 
=0.0; int numOfPoints =0;double diff =0.0; PointVector newCenter =null; double 
[] sums =new double[center.size()]; for (PointVector p : points) {
            distance += measure.compute(center.points(), p.points()); if (distance 
<min_dist) {
                min_dist = distance; newCenter = p; }
            numOfPoints++; sums = MathArrays.ebeAdd(p.points(), sums);}
        for (int i =0; i < sums.length; i++) {
            sums[i] = sums[i] / numOfPoints; }System.out.println("Old center " + center 
+" new center: " + newCenter); context.write(new Text(newCenter.toString()), new Text(new 
PointVector(sums).toString())); }
}

Last but not least my custom data structure class PointVector

public class PointVectorimplements WritableComparable<PointVector> {
    /** * Keep the tfIdf values of the terms of a document */ private 
Vector<Double>data =new Vector<>(); public PointVector(double [] values) {
        this.data =new Vector<>(values.length); 
this.data.addAll(Doubles.asList(values)); }

    public PointVector(List<Double> values) {
        this.data =new Vector<>(values.size()); this.data.addAll(values); }

    public PointVector(String [] values) {
        this.data =new Vector<>(values.length); for (String s: values) {
            this.data.add(Double.valueOf(s)); }
    }

    public PointVector() {
        this.data =new Vector<>(); }

    public double[]points() {
        return Doubles.toArray(data); }

/** * Subtract the values of this vector from the PointVector passed as argument * @param subtracted * @return */ public PointVectorsub(PointVector subtracted) {
        int N =this.data.size(); double [] vals =new double[N]; for (int i =0; i 
< N; i++) {
            vals[i] =this.data.get(i) - subtracted.get(i); }
        return new PointVector(vals); }

    public PointVectoradd(PointVector vec) {
        int N =this.data.size(); double [] vals =new double[N]; for (int i =0; i 
< N; i++) {
            vals[i] =this.data.get(i) + vec.get(i); }
        return new PointVector(vals); }

/** * Compute the dot product of this vector with the one passed as argument * @param vector * @return */ public double dotProduct(PointVector vector) {
        int N =this.data.size(); double sum =0.0; for (int i =0; i < N; i++) {
            sum +=this.data.get(i) * vector.get(i); }
        return sum; }

    @Override public int compareTo(PointVector pointVector) {
        return 0; }

    @Override public void write(DataOutput dataOutput)throws IOException {
        dataOutput.writeInt(data.size()); for (double d :data) {
            dataOutput.writeDouble(d); }
    }

    @Override public void readFields(DataInput dataInput)throws IOException {
        int s = dataInput.readInt(); // read the size of the vector }

    public Doubleget(int i) {
        return this.data.get(i); }

    public int size() {
        return this.data.size(); }

    @Override public StringtoString() {
        if (data.isEmpty()) {
            return "[]"; }
        StringBuilder sb =new StringBuilder(); sb.append("["); for (double d 
:data) {
            sb.append(d); sb.append(", "); }
        final int pos = sb.lastIndexOf(","); sb.delete(pos, pos +1); 
sb.append("]"); return sb.toString(); }
}

Reply via email to