The imports are from the mapreduce package. Also i use maven for dependencies and the version of hadoop is 2.7.1

On 09/18/2016 09:11 PM, Dieter De Witte wrote:
Maybe also add the list of imports you are doing, they are different between different versions of hadoop and mixing them might cause counterintuitive behaviour...

Kind Regards,

Dieter De Witte
Big Data Scientist
iMinds - Data Science Lab <http://www.datasciencelab.ugent.be/>
Ghent University

2016-09-18 19:54 GMT+02:00 Denis Mone <monede...@gmail.com <mailto:monede...@gmail.com>>:

    Hello and thanks for your time.
        What i mean is that i have setup breakpoints in my code in the
    map and reduce functions
        and the the breakpoint is not activated when the program
    starts running (hence the title code does not enter class, which
    is not that informative really).
     As for the jobs, there is only one, that of kmeans algorithm and
    is being executed correctly no exception thrown.
    Here
    <https://gist.github.com/Denis1990/9f42f73ae126ea47008ee907da64ac6a>
    is the output of the job.
     The driver class is this
    <https://gist.github.com/Denis1990/2978c92567efa17060bb618f66d02ce0>

    09/18/2016 02:27 AM, daemeon reiydelle wrote:
    What do you mean by "does not enter ... class(es)"?

    Does the log show that the scheduler ever accepts the job (You
    may have to turn logging up)? Are "other" jobs that are submitted
    to the same class under your user scheduled & executed? Wonder
    about which scheduler? What is the definition for the scheduler
    class? Is it getting to a container? Let's get a complete history
    of the steps you are getting please?

    ***
    .......**

    Daemeon C.M. Reiydelle
    USA (+1) 415.501.0198 <tel:%28%2B1%29%20415.501.0198>
    London (+44) (0) 20 8144 9872
    <tel:%28%2B44%29%20%280%29%2020%208144%209872>*/
    /

    On Sat, Sep 17, 2016 at 3:56 AM, Denis Mone <monede...@gmail.com
    <mailto:monede...@gmail.com>> wrote:

        Hello hadoop users.

            I am trying to implement a mapreduce KMeans algorithm
        using hadoop. The problem i have is that the code does not
        enter the map and reduce class. I'm running the application
        from Intellij Idea not using hadoop binary.

        The rest of the email is a sample of my code. If someone can
        see something that could help that would be greatly appreciated.

        Thanks in advance.

        Here is my driver code:

        Job job = Job.getInstance(conf); job.setJobName("kmeans"); 
job.setJarByClass(KMeans.class); FileInputFormat.addInputPath(job, input); 
FileOutputFormat.setOutputPath(job, output); job.setMapperClass(KMeansMapper.class); 
job.setReducerClass(KMeansReducer.class);job.setMapOutputKeyClass(PointVector.class); 
job.setMapOutputValueClass(PointVector.class); job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); 
job.setOutputFormatClass(TextOutputFormat.class);job.waitForCompletion(true);

        And below are my map and reduce classes:

        public class KMeansMapperextends Mapper<LongWritable, Text, PointVector, 
PointVector> {

             private int clusters; private List<ImmutableTriple<Integer, String, 
PointVector>>centers; @Override protected void setup(Context context)throws 
IOException, InterruptedException {
                 System.out.println("Inside setup"); this.clusters = 
Integer.valueOf(context.getConfiguration().get("clusters")); this.centers =new ArrayList<>(); 
BufferedReader br =new BufferedReader(new FileReader("/home/denis/centers")); for(int i =0; i <clusters; 
i++) {
                     centers.add(DocumentRecordParser.parse(br.readLine())); }
                 br.close(); }

             @Override public void map(LongWritable key, Text value, Context 
context)throws IOException, InterruptedException {
                 PointVector line = DocumentRecordParser.returnPointVector(value.toString()); 
System.out.println("Inside map!"); double minDist = Double.MAX_VALUE; double 
dist;PointVector index =null; EuclideanDistance ed =new EuclideanDistance(); for 
(ImmutableTriple<Integer, String, PointVector> c :centers) {
                     dist = ed.compute(line.points(), c.right.points()); if (dist 
< minDist) {
                         minDist = dist; index = c.right; }
                 }

                 context.write(index, line); }
        }

        public class KMeansReducerextends Reducer<PointVector, PointVector, Text, 
Text> {
             private double min_dist = Double.MAX_VALUE; @Override public void 
reduce(PointVector center, Iterable<PointVector> points, Context context)throws 
IOException, InterruptedException {
                 EuclideanDistance measure =new EuclideanDistance(); double 
distance =0.0; int numOfPoints =0;double diff =0.0; PointVector newCenter 
=null; double [] sums =new double[center.size()]; for (PointVector p : points) {
                     distance += measure.compute(center.points(), p.points()); if 
(distance <min_dist) {
                         min_dist = distance; newCenter = p; }
                     numOfPoints++; sums = MathArrays.ebeAdd(p.points(), sums);}
                 for (int i =0; i < sums.length; i++) {
                     sums[i] = sums[i] / numOfPoints; }System.out.println("Old center " + 
center +" new center: " + newCenter); context.write(new Text(newCenter.toString()), new 
Text(new PointVector(sums).toString())); }
        }

        Last but not least my custom data structure class PointVector

        public class PointVectorimplements WritableComparable<PointVector> {
             /** * Keep the tfIdf values of the terms of a document */
        private Vector<Double>data =new Vector<>(); public PointVector(double 
[] values) {
                 this.data =new Vector<>(values.length); 
this.data.addAll(Doubles.asList(values)); }

             public PointVector(List<Double> values) {
                 this.data =new Vector<>(values.size()); 
this.data.addAll(values); }

             public PointVector(String [] values) {
                 this.data =new Vector<>(values.length); for (String s: values) 
{
                     this.data.add(Double.valueOf(s)); }
             }

             public PointVector() {
                 this.data =new Vector<>(); }

             public double[]points() {
                 return Doubles.toArray(data); }

             /** * Subtract the values of this vector from the PointVector
        passed as argument * @param subtracted * @return */ public 
PointVectorsub(PointVector subtracted) {
                 int N =this.data.size(); double [] vals =new double[N]; for (int 
i =0; i < N; i++) {
                     vals[i] =this.data.get(i) - subtracted.get(i); }
                 return new PointVector(vals); }

             public PointVectoradd(PointVector vec) {
                 int N =this.data.size(); double [] vals =new double[N]; for (int 
i =0; i < N; i++) {
                     vals[i] =this.data.get(i) + vec.get(i); }
                 return new PointVector(vals); }

             /** * Compute the dot product of this vector with the one
        passed as argument * @param vector * @return */ public double
        dotProduct(PointVector vector) {
                 int N =this.data.size(); double sum =0.0; for (int i =0; i < 
N; i++) {
                     sum +=this.data.get(i) * vector.get(i); }
                 return sum; }

             @Override public int compareTo(PointVector pointVector) {
                 return 0; }

             @Override public void write(DataOutput dataOutput)throws 
IOException {
                 dataOutput.writeInt(data.size()); for (double d :data) {
                     dataOutput.writeDouble(d); }
             }

             @Override public void readFields(DataInput dataInput)throws 
IOException {
                 int s = dataInput.readInt(); // read the size of the vector }

             public Doubleget(int i) {
                 return this.data.get(i); }

             public int size() {
                 return this.data.size(); }

             @Override public StringtoString() {
                 if (data.isEmpty()) {
                     return "[]"; }
                 StringBuilder sb =new StringBuilder(); sb.append("["); for 
(double d :data) {
                     sb.append(d); sb.append(", "); }
                 final int pos = sb.lastIndexOf(","); sb.delete(pos, pos +1); 
sb.append("]"); return sb.toString(); }
        }

Reply via email to