See this for more details in how to write your own Custom Paritioner (even if a bit outdated, they still give you the basic idea of what you need to do). http://hadooptutorial.wikispaces.com/Custom+partitioner https://developer.yahoo.com/hadoop/tutorial/module5.html#partitioning
Regards, Shahab On Wed, Apr 1, 2015 at 11:03 AM, Shahab Yunus <[email protected]> wrote: > As the error tells you, you cannot use a class as a Partitioner if it does > not satisfy the interface requirements of the partitioning mechanism. You > need to set a class a Partitioner which extends or implements the Partioner > contract. > > Regards, > Shahab > > On Wed, Apr 1, 2015 at 10:54 AM, xeonmailinglist-gmail < > [email protected]> wrote: > >> Hi, >> >> I have created a Mapper class[3] that filters out key values pairs that >> go to a specific partition. When I set the partition class in my code [1], >> I get the error in [2] and I don’t understand why this is happening. Any >> help to fix this? >> >> [1] >> >> Configuration conf = cj.getConfiguration(); >> cj.setPartitionerClass(MyFilterMapper.class); >> >> [2] >> >> The method setPartitionerClass(Class<? extends Partitioner>) in the type Job >> is not applicable for the arguments (Class<JobExecution.MyFilterMapper>) >> >> [3] >> >> public static class MyFilterMapper >> extends Mapper<Object, Text, Text, IntWritable>{ >> >> private Text word = new Text(); >> private IntWritable rvalue = new IntWritable(); >> >> public static final String REDUCE_TASK_REEXECUTE = >> "mapreduce.reduce.task.reexecute"; >> public static final int NULL_REDUCE_TASK = -1; >> >> private Class<? extends Partitioner<?, ?>> partitionerClass; >> private org.apache.hadoop.mapreduce.Partitioner<Object, Text> >> partitionerInstance; >> >> public void map(Object key, Text value, Context context >> ) throws IOException, InterruptedException { >> Configuration conf = context.getConfiguration(); >> partitionerInstance = new MyHashPartitioner<Object, Text>(); >> >> int[] task_reexecute = conf.getInts(REDUCE_TASK_REEXECUTE); >> int nr_reduce_tasks = conf.getInt("mapreduce.job.reduces", 0); >> System.out.println("Tasks reexecute: " + task_reexecute + " >> NRREDUCETASKS: " + nr_reduce_tasks); >> StringTokenizer itr = new StringTokenizer(value.toString()); >> while (itr.hasMoreTokens()) { >> String wword = itr.nextToken(); >> Integer rrvalue = Integer.valueOf(itr.nextToken()); >> int partition = partitionerInstance.getPartition(wword, >> value, nr_reduce_tasks); >> >> if (contains(partition, task_reexecute)) { >> System.out.println("Partition Consumed: " + partition + >> " - key: " + key.toString() + " word: " + wword + " value - " + >> value.toString()); >> System.out.println("Partition Consumed: " + partition + >> " - word: " + wword + " value - ");// + rrvalue); >> >> word.set(wword); >> rvalue.set(rrvalue); >> context.write(word, rvalue); >> } >> } >> } >> >> public boolean contains(int partition, int[] set) { >> for(int i=0; i<set.length; i++){ >> if (partition == set[i]) >> return true; >> } >> >> return false; >> } >> } >> >> >> >> -- >> -- >> Thanks, >> >> >
