Please try this
for (DoubleArrayWritable avalue : values) {
Writable[] value = avalue.get();
// DoubleWritable[] value = new DoubleWritable[6];
// for(int k=0;k<6;k++){
// value[k] = DoubleWritable(wvalue[k]);
// }
//parse accordingly
if (Double.parseDouble(value[1].toString()) != 0) {
total_records_Temp = total_records_Temp + 1;
sumvalueTemp = sumvalueTemp + Double.parseDouble(value[0].toString());
}
if (Double.parseDouble(value[3].toString()) != 0) {
total_records_Dewpoint = total_records_Dewpoint + 1;
sumvalueDewpoint = sumvalueDewpoint +
Double.parseDouble(value[2].toString());
}
if (Double.parseDouble(value[5].toString()) != 0) {
total_records_Windspeed = total_records_Windspeed + 1;
sumvalueWindspeed = sumvalueWindspeed +
Double.parseDouble(value[4].toString());
}
}
​Attaching the code​


-- 
*Thanks & Regards *


*Unmesha Sreeveni U.B*
*Hadoop, Bigdata Developer*
*Centre for Cyber Security | Amrita Vishwa Vidyapeetham*
http://www.unmeshasreeveni.blogspot.in/
//cc MaxTemperature Application to find the maximum temperature in the weather dataset
//vv MaxTemperature
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;

public class MapReduce {

	public static void main(String[] args) throws Exception {
		if (args.length != 2) {
			System.err
					.println("Usage: MaxTemperature <input path> <output path>");
			System.exit(-1);
		}

		/*
		 * Job job = new Job(); job.setJarByClass(MaxTemperature.class);
		 * job.setJobName("Max temperature");
		 */

		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		Job job = Job.getInstance(conf, "AverageTempValues");

		/*
		 * Deleting output directory for reuseing same dir
		 */
		Path dest = new Path(args[1]);
		if(fs.exists(dest)){
			fs.delete(dest, true);
		}
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.setNumReduceTasks(2);

		job.setMapperClass(NewMapper.class);
		job.setReducerClass(NewReducer.class);

		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DoubleArrayWritable.class);

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
// ^^ MaxTemperature
// cc MaxTemperatureMapper Mapper for maximum temperature example
// vv MaxTemperatureMapper
import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class NewMapper extends
		Mapper<LongWritable, Text, Text, DoubleArrayWritable> {

	public void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {

		String Str = value.toString();
		String[] Mylist = new String[1000];
		int i = 0;

		for (String retval : Str.split("\\s+")) {
			System.out.println(retval);
			Mylist[i++] = retval;

		}
		String Val = Mylist[2];
		String Year = Val.substring(0, 4);
		String Month = Val.substring(5, 6);
		String[] Section = Val.split("_");

		String section_string = "0";
		if (Section[1].matches("^(0|1|2|3|4|5)$")) {
			section_string = "4";
		} else if (Section[1].matches("^(6|7|8|9|10|11)$")) {
			section_string = "1";
		} else if (Section[1].matches("^(12|13|14|15|16|17)$")) {
			section_string = "2";
		} else if (Section[1].matches("^(18|19|20|21|22|23)$")) {
			section_string = "3";
		}

		DoubleWritable[] array = new DoubleWritable[6];
		DoubleArrayWritable output = new DoubleArrayWritable();
		array[0].set(Double.parseDouble(Mylist[3]));
		array[2].set(Double.parseDouble(Mylist[4]));
		array[4].set(Double.parseDouble(Mylist[12]));
		for (int j = 0; j < 6; j = j + 2) {
			if (999.9 == array[j].get()) {
				array[j + 1].set(0);
			} else {
				array[j + 1].set(1);
			}
		}
		output.set(array);
		context.write(new Text(Year + section_string + Month), output);
	}
}
//cc MaxTemperatureReducer Reducer for maximum temperature example
//vv MaxTemperatureReducer
import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class NewReducer extends
		Reducer<Text, DoubleArrayWritable, Text, DoubleArrayWritable> {

	@Override
	public void reduce(Text key, Iterable<DoubleArrayWritable> values,
			Context context) throws IOException, InterruptedException {
		double sumvalueTemp = 0;
		double sumvalueDewpoint = 0;
		double sumvalueWindspeed = 0;
		double total_records_Temp = 0;
		double total_records_Dewpoint = 0;
		double total_records_Windspeed = 0;
		double average_Temp = Integer.MIN_VALUE;
		double average_Dewpoint = Integer.MIN_VALUE;
		double average_Windspeed = Integer.MIN_VALUE;
		DoubleWritable[] temp = new DoubleWritable[3];
		DoubleArrayWritable output = new DoubleArrayWritable();
		for (DoubleArrayWritable avalue : values) {
			Writable[] value = avalue.get();
			// DoubleWritable[] value = new DoubleWritable[6];
			// for(int k=0;k<6;k++){
			// value[k] = DoubleWritable(wvalue[k]);
			// }
			
			//parse accordingly
			if (Double.parseDouble(value[1].toString()) != 0) {
				total_records_Temp = total_records_Temp + 1;
				sumvalueTemp = sumvalueTemp + Double.parseDouble(value[0].toString());
			}
			if (Double.parseDouble(value[3].toString()) != 0) {
				total_records_Dewpoint = total_records_Dewpoint + 1;
				sumvalueDewpoint = sumvalueDewpoint + Double.parseDouble(value[2].toString());
			}
			if (Double.parseDouble(value[5].toString()) != 0) {
				total_records_Windspeed = total_records_Windspeed + 1;
				sumvalueWindspeed = sumvalueWindspeed + Double.parseDouble(value[4].toString());
			}
		}

		/*
		 * if (value[1].get()!=0) { total_records_Temp = total_records_Temp + 1;
		 * sumvalueTemp = sumvalueTemp + value[0].get(); } if
		 * (value[3].get()!=0) { total_records_Dewpoint = total_records_Dewpoint
		 * + 1; sumvalueDewpoint = sumvalueDewpoint + value[2].get(); } if
		 * (value[5].get()!=0) { total_records_Windspeed =
		 * total_records_Windspeed + 1; sumvalueWindspeed = sumvalueWindspeed +
		 * value[4].get(); } }
		 */
		average_Temp = sumvalueTemp / total_records_Temp;
		average_Dewpoint = sumvalueDewpoint / total_records_Dewpoint;
		average_Windspeed = sumvalueWindspeed / total_records_Windspeed;
		temp[0].set(average_Temp);
		temp[1].set(average_Dewpoint);
		temp[2].set(average_Windspeed);

		output.set(temp);
		context.write(key, output);
	}

}
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@hadoop.apache.org
For additional commands, e-mail: user-h...@hadoop.apache.org

Reply via email to