Adding source code for more clarity

Problem statement is simple

PartitionFileMapper : it takes input file which has tab separated value V ,
P
It emits (V, -1#P)

ALFileMapper : It takes input file which has tab separated values V, EL
It emits (V, E#-1)

in reducer I want to emit
(V,E#P)

Thanks
Ravikant

On Mon, Jun 29, 2015 at 11:04 AM, Ravikant Dindokar <[email protected]
> wrote:

> By custom key, did you meant some class object ? then no.
>
> I have two map methods each having different file as input. And both map
> methods emit *Longwritable key* type. But As in stdout of container file
> I can see,
>
> key & value separated by ':'
>
> ./container_1435326857837_0036_01_000102/stdout:Reduce:*391*:-1#11
> ./container_1435326857837_0036_01_000102/stdout:Reduce:*391*
> :3278620528725786624:5352454#-1
>
> for key 391 reducer is called twice. , one for value from first map while
> one for value from other map.
>
> In map method I parse the string from input file as Long variable and then
> emit it as LongWritable.
>
> Is there something I am missing when I use multipleInput
> (org.apache.hadoop.mapreduce.lib.input.MultipleInputs)?
>
> Thanks
> Ravikant
>
> On Mon, Jun 29, 2015 at 9:22 AM, Harshit Mathur <[email protected]>
> wrote:
>
>> As per Map Reduce, it is not possible that two different reducers will
>> get same keys.
>> I think you have created some custom key type? If that is the case then
>> there should be some issue with the comparator.
>>
>> On Mon, Jun 29, 2015 at 12:40 AM, Ravikant Dindokar <
>> [email protected]> wrote:
>>
>>> Hi Hadoop user,
>>>
>>> I have two map classes processing two different input files. Both map
>>> functions have same key,value format to emit.
>>>
>>> But Reducer called twice for same key , one for value from first map
>>> while one for value from other map.
>>>
>>> I am printing (key ,value) pairs in reducer  :
>>> ./container_1435326857837_0036_01_000102/stdout:Reduce:391:-1#11
>>>
>>> ./container_1435326857837_0036_01_000102/stdout:Reduce:391:3278620528725786624:5352454#-1
>>>
>>> ./container_1435326857837_0036_01_000102/stdout:Reduce:591:3278620528725852160:4194699#-1
>>> ./container_1435326857837_0036_01_000102/stdout:Reduce:591:-1#13
>>> ./container_1435326857837_0036_01_000102/stdout:Reduce:2391:-1#19
>>>
>>> ./container_1435326857837_0036_01_000102/stdout:Reduce:2391:3278620528725917696:5283986#-1
>>>
>>> ./container_1435326857837_0036_01_000102/stdout:Reduce:3291:3278620528725983232:4973087#-1
>>>
>>> both maps emit Longwritable key and Text value.
>>>
>>>
>>> Any idea why this is happening?
>>> Is there any way to get hash values generated by hadoop for keys emitted
>>> by mapper?
>>>
>>> Thanks
>>> Ravikant
>>>
>>
>>
>>
>> --
>> Harshit Mathur
>>
>
>
package in.dream_lab.hadoopPipeline.cc;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*
 * Job ID : 2
 * Job Name : VP_AP_to_PAL
 * Job Description: Concatenate partition Id with vertex adjacency list
 * Map Input File: VP, EL
 * Map Input Format :V_id, [P_id]  V_src, [<E_id,V_sink>+]
 * Map Emit :V_id, [-1, P_id]    V_src, [V_sink, -1]
 * Reducer Emit: V_id, P_id, <E_id, V_sink>+
 * Reducer Output File :PAL
 * Note :Separator between P_id, <E_id, V_sink>+ is "#"
 * 
 */

public class ALFileMapper extends Mapper<Object , Text, LongWritable, Text> {

	@Override
	protected void map(Object key, Text value, Context context)
			throws IOException, InterruptedException {
		
		String line=value.toString();
		
		String[] strs = line.trim().split("\\s+");
		long vertexId=Long.parseLong(strs[0]);
		String mapValue=strs[1]+"#-1";
    	
		System.out.println("AL: "+vertexId+ ":"+mapValue);
    	
    	context.write( new LongWritable(vertexId),new Text(mapValue));
    	
		
	}

	
}
package in.dream_lab.hadoopPipeline.cc;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*
 * Job ID : 2
 * Job Name : VP_AP_to_PAL
 * Job Description: Concatenate partition Id with vertex adjacency list
 * Map Input File: VP, EL
 * Map Input Format :V_id, [P_id]  V_src, [<E_id,V_sink>+]
 * Map Emit :V_id, [-1, P_id]    V_src, [V_sink, -1]
 * Reducer Emit: V_id, P_id, <E_id, V_sink>+
 * Reducer Output File :PAL
 * Note :Separator between P_id, <E_id, V_sink>+ is "#"
 * 
 */

public class PartitionFileMapper extends Mapper<Object , Text, LongWritable, Text> {

	@Override
	protected void map(Object key, Text value, Context context)
			throws IOException, InterruptedException {
		
		String line=value.toString();
		
		String[] strs = line.trim().split("\\s+");
		long vertexId=Long.parseLong(strs[0]);
    	long partitionId=Long.parseLong(strs[1]);
    	
    	String mapValue="-1#"+partitionId;
    	
    	System.out.println("PART: "+strs[0]+":"+strs[1]+":"+vertexId+ ":"+mapValue);
    	
    	context.write( new LongWritable(vertexId),new Text(mapValue));
    	
		
	}

	
}
package in.dream_lab.hadoopPipeline.cc;

import in.dream_lab.hadoopPipeline.cc.*;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class PartitionInputReader {

public static void main(String[] args) throws Exception{
		
		Configuration conf = new Configuration();
	   /* Job job1 = Job.getInstance(conf, "conf1");
	    job1.setNumReduceTasks(100);
	    job1.setJarByClass(MakeGraphUndirectedMapper.class);
	    job1.setMapperClass(MakeGraphUndirectedMapper.class);
	  
	    job1.setReducerClass(MakeGraphUndirectedReducer.class);
	    job1.setOutputKeyClass(LongWritable.class);
	    job1.setOutputValueClass(LongWritable.class);
		   
		job1.setMapOutputValueClass(LongWritable.class);
	    
	    FileInputFormat.addInputPath(job1, new Path(args[0]));
	    FileOutputFormat.setOutputPath(job1, new Path(args[1]));
	   
	    System.exit(job1.waitForCompletion(true) ? 0 : 1);*/
		
		Job job2 = Job.getInstance(conf, "conf2");
		job2.setNumReduceTasks(100);
		job2.setJarByClass(PartitionFileMapper.class);
		MultipleInputs.addInputPath(job2,new Path(args[0]),TextInputFormat.class,PartitionFileMapper.class);
		MultipleInputs.addInputPath(job2,new Path(args[1]),TextInputFormat.class,ALFileMapper.class);

		job2.setMapOutputKeyClass(LongWritable.class);
		job2.setMapOutputValueClass(Text.class);
		
		 FileOutputFormat.setOutputPath(job2, new Path(args[2]));
		 job2.setReducerClass(PALReducer.class);
		 job2.setOutputKeyClass(Text.class);
		 job2.setOutputValueClass(Text.class);
		 System.exit(job2.waitForCompletion(true) ? 0 : 1);
	}
	
	
}

Reply via email to