Hi Hadoop user,

I have graph data file in the form of edge list
<Source Vertex_id> <Sink Vertex_id>

I want to assign each edge a unique ID. In the map function I emit
(key,value) as (<Source Vertex_id>, <Sink Vertex_id>)

In the reducer, for each value , I am using a combination of static count
variable, and task id (context.getTaskAttemptID().getTaskID().getId()) to
generate a unique ID.

edgeId=(localcount <<16)|(taskId << 55);

I am able to generate unique IDs.

My question is if a reducer fails will this work?

What exactly happens when a reducer fails and computed again?

PFA source code for mapper & reducer.

Thanks
Ravikant
package in.dream_lab.hadoopPipeline.cc;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;


/*
 * Job ID : 1
 * Job Name : EL_to_UEL
 * Job Description: Convert directed to undirected graph
 * Map Input File: EL (SNAP file)
 * Map Input Format :V_src, V_sink
 * Map Emit :V_src, [V_sink]  && V_sink, [V_src]
 * Reducer Emit: V_src, V_sink && V_sink, V_src
 * Reducer Output File :UEL
 * Note :Remove duplicates
 * 
 */

public class MakeGraphUndirectedMapper extends Mapper<Object , Text, LongWritable, LongWritable> {

	@Override
	protected void map(Object key, Text value, Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		
		String line=value.toString();
		if(!line.startsWith("#")){
		String[] strs = line.trim().split("\\s+");
		long sinkId=Long.parseLong(strs[1]);
    	long sourceId=Long.parseLong(strs[0]);
    	
    	context.write( new LongWritable(sourceId),new LongWritable(sinkId));
    	/*
    	 * Comment out the following line to make it directed
    	 */
    	context.write(new LongWritable(sinkId), new LongWritable(sourceId));
    	
		}
	}

}
package in.dream_lab.hadoopPipeline.cc;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.JobID;
/*
 * Job ID : 1
 * Job Name : EL_to_UEL
 * Job Description: Convert directed to undirected graph
 * Map Input File: EL (SNAP file)
 * Map Input Format :V_src, V_sink
 * Map Emit :V_src, [V_sink]  && V_sink, [V_src]
 * Reducer Emit: V_src, V_sink && V_sink, V_src
 * Reducer Output File :UEL
 * Note :Remove duplicates
 * EdgeId : first  8 bits : partitionId, next  40bits : local count , last 16 bits : left for future use(instance data)
 */

public class MakeGraphUndirectedReducer extends Reducer< LongWritable, LongWritable,LongWritable,Text> {

	public static long localcount= 0L;
	@Override
	protected void reduce(LongWritable key, Iterable<LongWritable> values , Context context)
			throws IOException, InterruptedException {
		
		
		/*final String jobId = System.getenv("mapred_job_id");
		String[] strs=jobId.split("_");
		
		String reducerStr =strs[strs.length - 1];
		long reducerId=Long.parseLong(reducerStr);
		
		//System.out.println("TEST : TASKId :"+context.getTaskAttemptID().getTaskID().getId()+ " reducerID :"+reducerStr);
		*/
		LongWritable sourceId=key;
		Set<Long> adjlist = new HashSet<Long>();
		for(LongWritable v : values){
			adjlist.add(v.get());
		}
		
				
		long taskId=context.getTaskAttemptID().getTaskID().getId();
		
		 long edgeId;
		
		String edgeSinkPair="";
		
		StringBuilder sb =new StringBuilder();
		
		String prefix = "";
		
		for(Long sinkId : adjlist){
			
			localcount++;
			
			edgeId=(localcount <<16)|(taskId << 55);
			
			 sb.append(prefix);
			 
			 prefix = ",";
			
			 edgeSinkPair= edgeId +":"+sinkId;
			
			sb.append(edgeSinkPair);
			
		}
	
		context.write(sourceId, new Text(sb.toString()));
	}

}

Reply via email to