Please try this for (DoubleArrayWritable avalue : values) { Writable[] value = avalue.get(); // DoubleWritable[] value = new DoubleWritable[6]; // for(int k=0;k<6;k++){ // value[k] = DoubleWritable(wvalue[k]); // } //parse accordingly if (Double.parseDouble(value[1].toString()) != 0) { total_records_Temp = total_records_Temp + 1; sumvalueTemp = sumvalueTemp + Double.parseDouble(value[0].toString()); } if (Double.parseDouble(value[3].toString()) != 0) { total_records_Dewpoint = total_records_Dewpoint + 1; sumvalueDewpoint = sumvalueDewpoint + Double.parseDouble(value[2].toString()); } if (Double.parseDouble(value[5].toString()) != 0) { total_records_Windspeed = total_records_Windspeed + 1; sumvalueWindspeed = sumvalueWindspeed + Double.parseDouble(value[4].toString()); } } Attaching the code
-- *Thanks & Regards * *Unmesha Sreeveni U.B* *Hadoop, Bigdata Developer* *Centre for Cyber Security | Amrita Vishwa Vidyapeetham* http://www.unmeshasreeveni.blogspot.in/
//cc MaxTemperature Application to find the maximum temperature in the weather dataset //vv MaxTemperature import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.conf.Configuration; public class MapReduce { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err .println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } /* * Job job = new Job(); job.setJarByClass(MaxTemperature.class); * job.setJobName("Max temperature"); */ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf, "AverageTempValues"); /* * Deleting output directory for reuseing same dir */ Path dest = new Path(args[1]); if(fs.exists(dest)){ fs.delete(dest, true); } FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setNumReduceTasks(2); job.setMapperClass(NewMapper.class); job.setReducerClass(NewReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleArrayWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } } // ^^ MaxTemperature
// cc MaxTemperatureMapper Mapper for maximum temperature example // vv MaxTemperatureMapper import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class NewMapper extends Mapper<LongWritable, Text, Text, DoubleArrayWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String Str = value.toString(); String[] Mylist = new String[1000]; int i = 0; for (String retval : Str.split("\\s+")) { System.out.println(retval); Mylist[i++] = retval; } String Val = Mylist[2]; String Year = Val.substring(0, 4); String Month = Val.substring(5, 6); String[] Section = Val.split("_"); String section_string = "0"; if (Section[1].matches("^(0|1|2|3|4|5)$")) { section_string = "4"; } else if (Section[1].matches("^(6|7|8|9|10|11)$")) { section_string = "1"; } else if (Section[1].matches("^(12|13|14|15|16|17)$")) { section_string = "2"; } else if (Section[1].matches("^(18|19|20|21|22|23)$")) { section_string = "3"; } DoubleWritable[] array = new DoubleWritable[6]; DoubleArrayWritable output = new DoubleArrayWritable(); array[0].set(Double.parseDouble(Mylist[3])); array[2].set(Double.parseDouble(Mylist[4])); array[4].set(Double.parseDouble(Mylist[12])); for (int j = 0; j < 6; j = j + 2) { if (999.9 == array[j].get()) { array[j + 1].set(0); } else { array[j + 1].set(1); } } output.set(array); context.write(new Text(Year + section_string + Month), output); } }
//cc MaxTemperatureReducer Reducer for maximum temperature example //vv MaxTemperatureReducer import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class NewReducer extends Reducer<Text, DoubleArrayWritable, Text, DoubleArrayWritable> { @Override public void reduce(Text key, Iterable<DoubleArrayWritable> values, Context context) throws IOException, InterruptedException { double sumvalueTemp = 0; double sumvalueDewpoint = 0; double sumvalueWindspeed = 0; double total_records_Temp = 0; double total_records_Dewpoint = 0; double total_records_Windspeed = 0; double average_Temp = Integer.MIN_VALUE; double average_Dewpoint = Integer.MIN_VALUE; double average_Windspeed = Integer.MIN_VALUE; DoubleWritable[] temp = new DoubleWritable[3]; DoubleArrayWritable output = new DoubleArrayWritable(); for (DoubleArrayWritable avalue : values) { Writable[] value = avalue.get(); // DoubleWritable[] value = new DoubleWritable[6]; // for(int k=0;k<6;k++){ // value[k] = DoubleWritable(wvalue[k]); // } //parse accordingly if (Double.parseDouble(value[1].toString()) != 0) { total_records_Temp = total_records_Temp + 1; sumvalueTemp = sumvalueTemp + Double.parseDouble(value[0].toString()); } if (Double.parseDouble(value[3].toString()) != 0) { total_records_Dewpoint = total_records_Dewpoint + 1; sumvalueDewpoint = sumvalueDewpoint + Double.parseDouble(value[2].toString()); } if (Double.parseDouble(value[5].toString()) != 0) { total_records_Windspeed = total_records_Windspeed + 1; sumvalueWindspeed = sumvalueWindspeed + Double.parseDouble(value[4].toString()); } } /* * if (value[1].get()!=0) { total_records_Temp = total_records_Temp + 1; * sumvalueTemp = sumvalueTemp + value[0].get(); } if * (value[3].get()!=0) { total_records_Dewpoint = total_records_Dewpoint * + 1; sumvalueDewpoint = sumvalueDewpoint + value[2].get(); } if * (value[5].get()!=0) { total_records_Windspeed = * total_records_Windspeed + 1; sumvalueWindspeed = sumvalueWindspeed + * value[4].get(); } } */ average_Temp = sumvalueTemp / total_records_Temp; average_Dewpoint = sumvalueDewpoint / total_records_Dewpoint; average_Windspeed = sumvalueWindspeed / total_records_Windspeed; temp[0].set(average_Temp); temp[1].set(average_Dewpoint); temp[2].set(average_Windspeed); output.set(temp); context.write(key, output); } }
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@hadoop.apache.org For additional commands, e-mail: user-h...@hadoop.apache.org