I’ve taken your advice and made a wrapper class which implements
WritableComparable. Thank you very much for your help. I believe
everything is working fine on that front. I used google’s gson for the
comparison.
public int compareTo(Object o) {
JsonElement o1 = PARSER.parse(this.json.toString());
JsonElement o2 = PARSER.parse(o.toString());
if(o2.equals(o1))
return 0;
else
return -1;
}
The problem I have now is that only consecutive duplicates are
detected. Given 6 lines:
{"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false}
{"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":false}
{"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":true}
{"ts":1368758947.291035,"isSecure":false,"version":2,"source":"sdk","debug":false}
{"ts":1368758947.291035,
"source":"sdk","isSecure":false,"version":2,"debug":false}
{"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false}
I get back 1, 3, 4, and 6. I should be getting 1, 3 and 4 as 6 is
exactly equal to 1. If I switch 5 and 6, the original line 5 is no
longer filtered (I get 1,3,4,5,6). I’ve noticed that the compareTo
method is called a total of 13 times. I assume that in order for all 6
of the keys to be compared, 15 comparisons need to be made. Am I
missing something here? I’ve tested the compareTo manually and line 1
and 6 are interpreted as equal. My map reduce code currently looks
like this:
class DupFilter{
private static final Gson GSON = new Gson();
private static final JsonParser PARSER = new JsonParser();
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, JSONWrapper, IntWritable> {
public void map(LongWritable key, Text value,
OutputCollector<JSONWrapper, IntWritable> output, Reporter reporter)
throws IOException{
JsonElement je = PARSER.parse(value.toString());
JSONWrapper jow = null;
jow = new JSONWrapper(value.toString());
IntWritable one = new IntWritable(1);
output.collect(jow, one);
}
}
public static class Reduce extends MapReduceBase implements
Reducer<JSONWrapper, IntWritable, JSONWrapper, IntWritable> {
public void reduce(JSONWrapper jow, Iterator<IntWritable> values,
OutputCollector<JSONWrapper, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext())
sum += values.next().get();
output.collect(jow, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(DupFilter.class);
conf.setJobName("dupfilter");
conf.setOutputKeyClass(JSONWrapper.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Thanks,
Max Lebedev
On Tue, Jun 4, 2013 at 10:58 PM, Rahul Bhattacharjee
<[email protected] <mailto:[email protected]>> wrote:
I agree with Shahab , you have to ensure that the key are writable
comparable and values are writable in order to be used in MR.
You can have writable comparable implementation wrapping the
actual json object.
Thanks,
Rahul
On Wed, Jun 5, 2013 at 5:09 AM, Mischa Tuffield <[email protected]
<mailto:[email protected]>> wrote:
Hello,
On 4 Jun 2013, at 23:49, Max Lebedev <[email protected]
<mailto:[email protected]>> wrote:
Hi. I've been trying to use JSONObjects to identify
duplicates in JSONStrings.
The duplicate strings contain the same data, but not
necessarily in the same order. For example the following two
lines should be identified as duplicates (and filtered).
{"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false
{"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":false}
Can you not use the timestamp as a URI and emit them as URIs.
Then you have your mapper emit the following kv :
output.collect(ts, value);
And you would have a straight forward reducer that can dedup
based on the timestamps.
If above doesn't work for you, I would look at the jackson
library for mangling json in java. It method of using java
beans for json is clean from a code pov and comes with lots of
nice features.
http://stackoverflow.com/a/2255893
P.S. In your code you are using the old'er map reduce API, I
would look at using the newer APIs in this
package org.apache.hadoop.mapreduce
Mischa
This is the code:
class DupFilter{
public static class Map extends MapReduceBase
implements Mapper<LongWritable, Text, JSONObject, Text> {
public void map(LongWritable key, Text value,
OutputCollector<JSONObject, Text> output, Reporter reporter)
throws IOException{
JSONObject jo = null;
try {
jo = new JSONObject(value.toString());
} catch (JSONException e) {
e.printStackTrace();
}
output.collect(jo, value);
}
}
public static class Reduce extends MapReduceBase
implements Reducer<JSONObject, Text, NullWritable, Text> {
public void reduce(JSONObject jo,
Iterator<Text> lines, OutputCollector<NullWritable, Text>
output, Reporter reporter) throws IOException {
output.collect(null, lines.next());
}
}
public static void main(String[] args) throws
Exception {
JobConf conf = new JobConf(DupFilter.class);
conf.setOutputKeyClass(JSONObject.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
I get the following error:
java.lang.ClassCastException: class org.json.JSONObject
at java.lang.Class.asSubclass(Class.java:3027)
at
org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:817)
at
org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)
at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
It looks like it has something to do with
conf.setOutputKeyClass(). Am I doing something wrong here?
Thanks,
Max Lebedev
_______________________________
Mischa Tuffield PhD
http://mmt.me.uk/
@mischat