Hi Hadoop user,
I have graph data file in the form of edge list
<Source Vertex_id> <Sink Vertex_id>
I want to assign each edge a unique ID. In the map function I emit
(key,value) as (<Source Vertex_id>, <Sink Vertex_id>)
In the reducer, for each value , I am using a combination of static count
variable, and task id (context.getTaskAttemptID().getTaskID().getId()) to
generate a unique ID.
edgeId=(localcount <<16)|(taskId << 55);
I am able to generate unique IDs.
My question is if a reducer fails will this work?
What exactly happens when a reducer fails and computed again?
PFA source code for mapper & reducer.
Thanks
Ravikant
package in.dream_lab.hadoopPipeline.cc;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
/*
* Job ID : 1
* Job Name : EL_to_UEL
* Job Description: Convert directed to undirected graph
* Map Input File: EL (SNAP file)
* Map Input Format :V_src, V_sink
* Map Emit :V_src, [V_sink] && V_sink, [V_src]
* Reducer Emit: V_src, V_sink && V_sink, V_src
* Reducer Output File :UEL
* Note :Remove duplicates
*
*/
public class MakeGraphUndirectedMapper extends Mapper<Object , Text, LongWritable, LongWritable> {
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String line=value.toString();
if(!line.startsWith("#")){
String[] strs = line.trim().split("\\s+");
long sinkId=Long.parseLong(strs[1]);
long sourceId=Long.parseLong(strs[0]);
context.write( new LongWritable(sourceId),new LongWritable(sinkId));
/*
* Comment out the following line to make it directed
*/
context.write(new LongWritable(sinkId), new LongWritable(sourceId));
}
}
}
package in.dream_lab.hadoopPipeline.cc;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.JobID;
/*
* Job ID : 1
* Job Name : EL_to_UEL
* Job Description: Convert directed to undirected graph
* Map Input File: EL (SNAP file)
* Map Input Format :V_src, V_sink
* Map Emit :V_src, [V_sink] && V_sink, [V_src]
* Reducer Emit: V_src, V_sink && V_sink, V_src
* Reducer Output File :UEL
* Note :Remove duplicates
* EdgeId : first 8 bits : partitionId, next 40bits : local count , last 16 bits : left for future use(instance data)
*/
public class MakeGraphUndirectedReducer extends Reducer< LongWritable, LongWritable,LongWritable,Text> {
public static long localcount= 0L;
@Override
protected void reduce(LongWritable key, Iterable<LongWritable> values , Context context)
throws IOException, InterruptedException {
/*final String jobId = System.getenv("mapred_job_id");
String[] strs=jobId.split("_");
String reducerStr =strs[strs.length - 1];
long reducerId=Long.parseLong(reducerStr);
//System.out.println("TEST : TASKId :"+context.getTaskAttemptID().getTaskID().getId()+ " reducerID :"+reducerStr);
*/
LongWritable sourceId=key;
Set<Long> adjlist = new HashSet<Long>();
for(LongWritable v : values){
adjlist.add(v.get());
}
long taskId=context.getTaskAttemptID().getTaskID().getId();
long edgeId;
String edgeSinkPair="";
StringBuilder sb =new StringBuilder();
String prefix = "";
for(Long sinkId : adjlist){
localcount++;
edgeId=(localcount <<16)|(taskId << 55);
sb.append(prefix);
prefix = ",";
edgeSinkPair= edgeId +":"+sinkId;
sb.append(edgeSinkPair);
}
context.write(sourceId, new Text(sb.toString()));
}
}