Did you overwrite the partitioner as well?

2013/10/29 java8964 java8964 <java8...@hotmail.com>

> Hi, I have a strange question related to my secondary sort implementation
> in the MR job.
> Currently I need to support 2nd sort in one of my MR job. I implemented my
> custom WritableComparable like following:
>
> public class MyPartitionKey implements WritableComparable<MyPartitionKey> {
>     String type;
>     long id1;
>     String id2;
>     String id3;
>     String id4;
>     long timestamp1;
>     long timestamp2
> }
>
> Then I implemented following methods for this class:
>
> public int compareTo(); // sort the data based on all attributes listed
> above, sorted the last 2 timestamps descending
> public int hashCode(); // generate the hashcode using all attributes above
> public boolean equals(); // using all the attributes for equals check
> public void write(DataOutput out) // serialize all the attributes listed
> above
> public void readFields(DataInput in) // deserialize all the attributes
> listed above
>
> For partition and grouping of my keys, I want the following logic:
> Based on the type, the data could partition either by year or by day for
> timestamp1.
>
> For sorting order, I want the data sort by (type, id1, id2, id3, id4),
> then reverse sorting by (timestamp1, timestamp2).
>
> I implemented my KeyComparator using my sorting order logic listed above,
> and my Partitioner and GroupComparator based on my logic listed above.
>
> Here is the pseudo code of the Partitioner and GroupComparator:
>
> public class MyPartitioner implements Partitioner {
>     @Override
>     public int getPartition(MyPartitionKey key, Value value, int
> numPartitions) {
>         int hashCode = key.getActivityType().name().hashCode();
>         StringBuilder sb = new StringBuilder();
>         for (String subPartitionValue : key.getPartitionValue()) {
>             sb.append(subPartitionValue);
>         }
>         return Math.abs(hashCode * 127 + sb.toString().hashCode()) %
> numPartitions;
>     }
>
>     @Override
>     public void configure(JobConf job) {
>     }
> }
>
> // The key getPartitionValue method will return array of string of either
> YYYY or {YYYY, MM, DD} of the timestamp1.
>
> For GroupComparator:
>
>     public static class MyGroupComparator extends WritableComparator {
>         protected MyGroupComparator() {
>             super(MyPartitionKey.class, true);
>         }
>
>         @Override
>         public int compare(WritableComparable w1, WritableComparable w2) {
>             MyPartitionKey key1 = (MyPartitionKey) w1;
>             MyPartitionKey key2 = (MyPartitionKey) w2;
>             int cmp = key1.type.compareTo(key2.type);
>             // different type, send to different group
>             if (cmp != 0)
>                 return cmp;
>
>             // for the same type, should have the same partition value
> array length
>             String[] partitionValue1 = key1.getPartitionValue();
>             String[] partitionValue2 = key2.getPartitionValue();
>             assert partitionValue1.length == partitionValue2.length;
>             StringBuilder sb1 = new StringBuilder();
>             StringBuilder sb2 = new StringBuilder();
>             for (String subValue : partitionValue1) {
>                 sb1.append(subValue);
>             }
>             for (String subValue : partitionValue2) {
>                 sb2.append(subValue);
>             }
>             return sb1.toString().compareTo(sb2.toString());
>         }
>
> Now, here is the strange problem I don't understand. I tested with my MR
> job. I know in the test data, I have 7 types data, 3 of them partitioned
> yearly, 4 of them partition daily. In the test data, for the 3 types
> partitioned daily, there are 2 days data of each type. So I expected the
> Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In
> fact, if I don't use this custom MyPartitionKey, just use  Text as the key
> type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily
> dataset, there are 11 input groups for the reducer. But I have to support
> secondary sort. To my surprise, runtime MR job generates 51792 input groups
> for the reducer. This doesn't make sense.
>
> If I changed MyGroupComparator compare() method, to only compare the
> type, like following:
>       @Override
>         public int compare(WritableComparable w1, WritableComparable w2) {
>             MyPartitionKey key1 = (MyPartitionKey) w1;
>             MyPartitionKey key2 = (MyPartitionKey) w2;
>             return key1.type.compareTo(key2.type);
>        }
> The MR job generates 7 input group for the reducer, which is what I
> expects. But when I start to add the comparing of the YYYY or MM or DD data
> parsed out from the timestamp1, the input group count became very large.
>
> What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group
> large, because in the test data, there are a lot of combination of unique
> (id1,id2,id3,id4). But they are NOT part of my GroupComparator
> implementation. Why in this case, the input group count for the reducer is
> so high? And in this case, the MR job won't do what I want, as same group
> of data NOT being sent to the same reducer. Here are the summary of my
> questions:
>
> 1) My understanding is that GroupComparator is the only class to control
> the input groups of the reducer, is that correct?
> 2) If so, in my case above, I know MyGroupComparator will return 11 unique
> values from my test data. Why there are 51792 input groups generated? This
> big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not
> used in MyGroupComparator, why they affect the reducer input group count?
> 3) If I only use type in my GroupComparator, I got correct 7 input groups
> for the reducer. So in this case, it correctly ignored all other data
> contains in the MyPartitionKey class, why? Is the order of the attributes
> make any difference? I don't think so, but I cannot explain the above
> result.
>
> If you have any idea, or my implementation has any problem, please let me
> know.
>
> Thanks
>
> Yong
>

Reply via email to