Hi, I have a strange question related to my secondary sort implementation in
the MR job.Currently I need to support 2nd sort in one of my MR job. I
implemented my custom WritableComparable like following:
public class MyPartitionKey implements WritableComparable<MyPartitionKey> {
String type; long id1; String id2; String id3; String id4; long
timestamp1; long timestamp2}
Then I implemented following methods for this class:
public int compareTo(); // sort the data based on all attributes listed above,
sorted the last 2 timestamps descendingpublic int hashCode(); // generate the
hashcode using all attributes abovepublic boolean equals(); // using all the
attributes for equals checkpublic void write(DataOutput out) // serialize all
the attributes listed abovepublic void readFields(DataInput in) // deserialize
all the attributes listed above
For partition and grouping of my keys, I want the following logic:Based on the
type, the data could partition either by year or by day for timestamp1.
For sorting order, I want the data sort by (type, id1, id2, id3, id4), then
reverse sorting by (timestamp1, timestamp2).
I implemented my KeyComparator using my sorting order logic listed above, and
my Partitioner and GroupComparator based on my logic listed above.
Here is the pseudo code of the Partitioner and GroupComparator:
public class MyPartitioner implements Partitioner { @Override public int
getPartition(MyPartitionKey key, Value value, int numPartitions) { int
hashCode = key.getActivityType().name().hashCode(); StringBuilder sb =
new StringBuilder(); for (String subPartitionValue :
key.getPartitionValue()) { sb.append(subPartitionValue); }
return Math.abs(hashCode * 127 + sb.toString().hashCode()) % numPartitions;
}
@Override public void configure(JobConf job) { }}
// The key getPartitionValue method will return array of string of either YYYY
or {YYYY, MM, DD} of the timestamp1.
For GroupComparator:
public static class MyGroupComparator extends WritableComparator {
protected MyGroupComparator() { super(MyPartitionKey.class, true);
}
@Override public int compare(WritableComparable w1,
WritableComparable w2) { MyPartitionKey key1 = (MyPartitionKey) w1;
MyPartitionKey key2 = (MyPartitionKey) w2; int cmp =
key1.type.compareTo(key2.type); // different type, send to different
group if (cmp != 0) return cmp;
// for the same type, should have the same partition value array
length String[] partitionValue1 = key1.getPartitionValue();
String[] partitionValue2 = key2.getPartitionValue(); assert
partitionValue1.length == partitionValue2.length; StringBuilder sb1
= new StringBuilder(); StringBuilder sb2 = new StringBuilder();
for (String subValue : partitionValue1) {
sb1.append(subValue); } for (String subValue :
partitionValue2) { sb2.append(subValue); }
return sb1.toString().compareTo(sb2.toString()); }
Now, here is the strange problem I don't understand. I tested with my MR job. I
know in the test data, I have 7 types data, 3 of them partitioned yearly, 4 of
them partition daily. In the test data, for the 3 types partitioned daily,
there are 2 days data of each type. So I expected the Input group count of the
reducer should be 11, which is 4 x 2 + 3 = 11. In fact, if I don't use this
custom MyPartitionKey, just use Text as the key type, with "type + YYYY" for
yearly dataset, "type + YYYYMMDD" for daily dataset, there are 11 input groups
for the reducer. But I have to support secondary sort. To my surprise, runtime
MR job generates 51792 input groups for the reducer. This doesn't make sense.
If I changed MyGroupComparator compare() method, to only compare the type, like
following: @Override public int compare(WritableComparable w1,
WritableComparable w2) { MyPartitionKey key1 = (MyPartitionKey) w1;
MyPartitionKey key2 = (MyPartitionKey) w2; return
key1.type.compareTo(key2.type); }The MR job generates 7 input group for
the reducer, which is what I expects. But when I start to add the comparing of
the YYYY or MM or DD data parsed out from the timestamp1, the input group count
became very large.
What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group large,
because in the test data, there are a lot of combination of unique
(id1,id2,id3,id4). But they are NOT part of my GroupComparator implementation.
Why in this case, the input group count for the reducer is so high? And in this
case, the MR job won't do what I want, as same group of data NOT being sent to
the same reducer. Here are the summary of my questions:
1) My understanding is that GroupComparator is the only class to control the
input groups of the reducer, is that correct?2) If so, in my case above, I know
MyGroupComparator will return 11 unique values from my test data. Why there are
51792 input groups generated? This big number must come from (Id1, Id2, Id3 and
Id4), but these Ids are not used in MyGroupComparator, why they affect the
reducer input group count?3) If I only use type in my GroupComparator, I got
correct 7 input groups for the reducer. So in this case, it correctly ignored
all other data contains in the MyPartitionKey class, why? Is the order of the
attributes make any difference? I don't think so, but I cannot explain the
above result.
If you have any idea, or my implementation has any problem, please let me know.
Thanks
Yong