A side point for Hadoop experts: a comparator is used for sorting in the shuffle. If a comparator always returns -1 for unequal objects, then sorting will take longer than it should because there will be a certain amount of items that are compared more than once.

Is this true?

On 06/05/2013 04:10 PM, Max Lebedev wrote:

I’ve taken your advice and made a wrapper class which implements WritableComparable. Thank you very much for your help. I believe everything is working fine on that front. I used google’s gson for the comparison.


public int compareTo(Object o) {

    JsonElement o1 = PARSER.parse(this.json.toString());

    JsonElement o2 = PARSER.parse(o.toString());

    if(o2.equals(o1))

  return 0;

    else

  return -1;

}


The problem I have now is that only consecutive duplicates are detected. Given 6 lines:

{"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false}

{"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":false}

{"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":true}

{"ts":1368758947.291035,"isSecure":false,"version":2,"source":"sdk","debug":false}

{"ts":1368758947.291035, "source":"sdk","isSecure":false,"version":2,"debug":false}

{"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false}


I get back 1, 3, 4, and 6. I should be getting 1, 3 and 4 as 6 is exactly equal to 1. If I switch 5 and 6, the original line 5 is no longer filtered (I get 1,3,4,5,6). I’ve noticed that the compareTo method is called a total of 13 times. I assume that in order for all 6 of the keys to be compared, 15 comparisons need to be made. Am I missing something here? I’ve tested the compareTo manually and line 1 and 6 are interpreted as equal. My map reduce code currently looks like this:


class DupFilter{

    private static final Gson GSON = new Gson();

    private static final JsonParser PARSER = new JsonParser();

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, JSONWrapper, IntWritable> { public void map(LongWritable key, Text value, OutputCollector<JSONWrapper, IntWritable> output, Reporter reporter) throws IOException{

            JsonElement je = PARSER.parse(value.toString());

            JSONWrapper jow = null;

            jow = new JSONWrapper(value.toString());

            IntWritable one = new IntWritable(1);

            output.collect(jow, one);

    }

    }

public static class Reduce extends MapReduceBase implements Reducer<JSONWrapper, IntWritable, JSONWrapper, IntWritable> {

public void reduce(JSONWrapper jow, Iterator<IntWritable> values, OutputCollector<JSONWrapper, IntWritable> output, Reporter reporter) throws IOException {

            int sum = 0;

            while (values.hasNext())

                sum += values.next().get();

            output.collect(jow, new IntWritable(sum));

      }

    }

    public static void main(String[] args) throws Exception {

        JobConf conf = new JobConf(DupFilter.class);

  conf.setJobName("dupfilter");

  conf.setOutputKeyClass(JSONWrapper.class);

  conf.setOutputValueClass(IntWritable.class);

  conf.setMapperClass(Map.class);

  conf.setReducerClass(Reduce.class);

  conf.setInputFormat(TextInputFormat.class);

  conf.setOutputFormat(TextOutputFormat.class);

  FileInputFormat.setInputPaths(conf, new Path(args[0]));

  FileOutputFormat.setOutputPath(conf, new Path(args[1]));

  JobClient.runJob(conf);

    }

}

Thanks,

Max Lebedev



On Tue, Jun 4, 2013 at 10:58 PM, Rahul Bhattacharjee <[email protected] <mailto:[email protected]>> wrote:

    I agree with Shahab , you have to ensure that the key are writable
    comparable and values are writable in order to be used in MR.

    You can have writable comparable implementation wrapping the
    actual json object.

    Thanks,
    Rahul


    On Wed, Jun 5, 2013 at 5:09 AM, Mischa Tuffield <[email protected]
    <mailto:[email protected]>> wrote:

        Hello,

        On 4 Jun 2013, at 23:49, Max Lebedev <[email protected]
        <mailto:[email protected]>> wrote:

        Hi. I've been trying to use JSONObjects to identify
        duplicates in JSONStrings.
        The duplicate strings contain the same data, but not
        necessarily in the same order. For example the following two
        lines should be identified as duplicates (and filtered).

        
{"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false
        
{"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":false}


        Can you not use the timestamp as a URI and emit them as URIs.
        Then you have your mapper emit the following kv :

        output.collect(ts, value);

        And you would have a straight forward reducer that can dedup
        based on the timestamps.

        If above doesn't work for you, I would look at the jackson
        library for mangling json in java. It method of using java
        beans for json is clean from a code pov and comes with lots of
        nice features.
        http://stackoverflow.com/a/2255893

        P.S. In your code you are using the old'er map reduce API, I
        would look at using the newer APIs in this
        package org.apache.hadoop.mapreduce

        Mischa

        This is the code:

        class DupFilter{

                public static class Map extends MapReduceBase
        implements Mapper<LongWritable, Text, JSONObject, Text> {

                        public void map(LongWritable key, Text value,
        OutputCollector<JSONObject, Text> output, Reporter reporter)
        throws IOException{

        JSONObject jo = null;

                        try {

        jo = new JSONObject(value.toString());

                                } catch (JSONException e) {

              e.printStackTrace();

        }

        output.collect(jo, value);

                        }

                }

                public static class Reduce extends MapReduceBase
        implements Reducer<JSONObject, Text, NullWritable, Text> {

                        public void reduce(JSONObject jo,
        Iterator<Text> lines, OutputCollector<NullWritable, Text>
        output, Reporter reporter) throws IOException {


        output.collect(null, lines.next());

                        }

                }

                public static void main(String[] args) throws
        Exception {

                        JobConf conf = new JobConf(DupFilter.class);

        conf.setOutputKeyClass(JSONObject.class);

        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(Map.class);

        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);

        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));

        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);

                }

        }

        I get the following error:

        java.lang.ClassCastException: class org.json.JSONObject

                at java.lang.Class.asSubclass(Class.java:3027)

                at
        
org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)


                at
        
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:817)


                at
        org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)

                at
        org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)

                at
        org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)


        It looks like it has something to do with
        conf.setOutputKeyClass(). Am I doing something wrong here?


        Thanks,

        Max Lebedev


        _______________________________
        Mischa Tuffield PhD
        http://mmt.me.uk/
        @mischat








Reply via email to