Adding source code for more clarity
Problem statement is simple
PartitionFileMapper : it takes input file which has tab separated value V ,
P
It emits (V, -1#P)
ALFileMapper : It takes input file which has tab separated values V, EL
It emits (V, E#-1)
in reducer I want to emit
(V,E#P)
Thanks
Ravikant
On Mon, Jun 29, 2015 at 11:04 AM, Ravikant Dindokar <[email protected]
> wrote:
> By custom key, did you meant some class object ? then no.
>
> I have two map methods each having different file as input. And both map
> methods emit *Longwritable key* type. But As in stdout of container file
> I can see,
>
> key & value separated by ':'
>
> ./container_1435326857837_0036_01_000102/stdout:Reduce:*391*:-1#11
> ./container_1435326857837_0036_01_000102/stdout:Reduce:*391*
> :3278620528725786624:5352454#-1
>
> for key 391 reducer is called twice. , one for value from first map while
> one for value from other map.
>
> In map method I parse the string from input file as Long variable and then
> emit it as LongWritable.
>
> Is there something I am missing when I use multipleInput
> (org.apache.hadoop.mapreduce.lib.input.MultipleInputs)?
>
> Thanks
> Ravikant
>
> On Mon, Jun 29, 2015 at 9:22 AM, Harshit Mathur <[email protected]>
> wrote:
>
>> As per Map Reduce, it is not possible that two different reducers will
>> get same keys.
>> I think you have created some custom key type? If that is the case then
>> there should be some issue with the comparator.
>>
>> On Mon, Jun 29, 2015 at 12:40 AM, Ravikant Dindokar <
>> [email protected]> wrote:
>>
>>> Hi Hadoop user,
>>>
>>> I have two map classes processing two different input files. Both map
>>> functions have same key,value format to emit.
>>>
>>> But Reducer called twice for same key , one for value from first map
>>> while one for value from other map.
>>>
>>> I am printing (key ,value) pairs in reducer :
>>> ./container_1435326857837_0036_01_000102/stdout:Reduce:391:-1#11
>>>
>>> ./container_1435326857837_0036_01_000102/stdout:Reduce:391:3278620528725786624:5352454#-1
>>>
>>> ./container_1435326857837_0036_01_000102/stdout:Reduce:591:3278620528725852160:4194699#-1
>>> ./container_1435326857837_0036_01_000102/stdout:Reduce:591:-1#13
>>> ./container_1435326857837_0036_01_000102/stdout:Reduce:2391:-1#19
>>>
>>> ./container_1435326857837_0036_01_000102/stdout:Reduce:2391:3278620528725917696:5283986#-1
>>>
>>> ./container_1435326857837_0036_01_000102/stdout:Reduce:3291:3278620528725983232:4973087#-1
>>>
>>> both maps emit Longwritable key and Text value.
>>>
>>>
>>> Any idea why this is happening?
>>> Is there any way to get hash values generated by hadoop for keys emitted
>>> by mapper?
>>>
>>> Thanks
>>> Ravikant
>>>
>>
>>
>>
>> --
>> Harshit Mathur
>>
>
>
package in.dream_lab.hadoopPipeline.cc;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* Job ID : 2
* Job Name : VP_AP_to_PAL
* Job Description: Concatenate partition Id with vertex adjacency list
* Map Input File: VP, EL
* Map Input Format :V_id, [P_id] V_src, [<E_id,V_sink>+]
* Map Emit :V_id, [-1, P_id] V_src, [V_sink, -1]
* Reducer Emit: V_id, P_id, <E_id, V_sink>+
* Reducer Output File :PAL
* Note :Separator between P_id, <E_id, V_sink>+ is "#"
*
*/
public class ALFileMapper extends Mapper<Object , Text, LongWritable, Text> {
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line=value.toString();
String[] strs = line.trim().split("\\s+");
long vertexId=Long.parseLong(strs[0]);
String mapValue=strs[1]+"#-1";
System.out.println("AL: "+vertexId+ ":"+mapValue);
context.write( new LongWritable(vertexId),new Text(mapValue));
}
}package in.dream_lab.hadoopPipeline.cc;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* Job ID : 2
* Job Name : VP_AP_to_PAL
* Job Description: Concatenate partition Id with vertex adjacency list
* Map Input File: VP, EL
* Map Input Format :V_id, [P_id] V_src, [<E_id,V_sink>+]
* Map Emit :V_id, [-1, P_id] V_src, [V_sink, -1]
* Reducer Emit: V_id, P_id, <E_id, V_sink>+
* Reducer Output File :PAL
* Note :Separator between P_id, <E_id, V_sink>+ is "#"
*
*/
public class PartitionFileMapper extends Mapper<Object , Text, LongWritable, Text> {
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line=value.toString();
String[] strs = line.trim().split("\\s+");
long vertexId=Long.parseLong(strs[0]);
long partitionId=Long.parseLong(strs[1]);
String mapValue="-1#"+partitionId;
System.out.println("PART: "+strs[0]+":"+strs[1]+":"+vertexId+ ":"+mapValue);
context.write( new LongWritable(vertexId),new Text(mapValue));
}
}
package in.dream_lab.hadoopPipeline.cc;
import in.dream_lab.hadoopPipeline.cc.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class PartitionInputReader {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
/* Job job1 = Job.getInstance(conf, "conf1");
job1.setNumReduceTasks(100);
job1.setJarByClass(MakeGraphUndirectedMapper.class);
job1.setMapperClass(MakeGraphUndirectedMapper.class);
job1.setReducerClass(MakeGraphUndirectedReducer.class);
job1.setOutputKeyClass(LongWritable.class);
job1.setOutputValueClass(LongWritable.class);
job1.setMapOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
System.exit(job1.waitForCompletion(true) ? 0 : 1);*/
Job job2 = Job.getInstance(conf, "conf2");
job2.setNumReduceTasks(100);
job2.setJarByClass(PartitionFileMapper.class);
MultipleInputs.addInputPath(job2,new Path(args[0]),TextInputFormat.class,PartitionFileMapper.class);
MultipleInputs.addInputPath(job2,new Path(args[1]),TextInputFormat.class,ALFileMapper.class);
job2.setMapOutputKeyClass(LongWritable.class);
job2.setMapOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job2, new Path(args[2]));
job2.setReducerClass(PALReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
}