Please try this
for (DoubleArrayWritable avalue : values) {
Writable[] value = avalue.get();
// DoubleWritable[] value = new DoubleWritable[6];
// for(int k=0;k<6;k++){
// value[k] = DoubleWritable(wvalue[k]);
// }
//parse accordingly
if (Double.parseDouble(value[1].toString()) != 0) {
total_records_Temp = total_records_Temp + 1;
sumvalueTemp = sumvalueTemp + Double.parseDouble(value[0].toString());
}
if (Double.parseDouble(value[3].toString()) != 0) {
total_records_Dewpoint = total_records_Dewpoint + 1;
sumvalueDewpoint = sumvalueDewpoint +
Double.parseDouble(value[2].toString());
}
if (Double.parseDouble(value[5].toString()) != 0) {
total_records_Windspeed = total_records_Windspeed + 1;
sumvalueWindspeed = sumvalueWindspeed +
Double.parseDouble(value[4].toString());
}
}
Attaching the code
--
*Thanks & Regards *
*Unmesha Sreeveni U.B*
*Hadoop, Bigdata Developer*
*Centre for Cyber Security | Amrita Vishwa Vidyapeetham*
http://www.unmeshasreeveni.blogspot.in/
//cc MaxTemperature Application to find the maximum temperature in the weather dataset
//vv MaxTemperature
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;
public class MapReduce {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err
.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
/*
* Job job = new Job(); job.setJarByClass(MaxTemperature.class);
* job.setJobName("Max temperature");
*/
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf, "AverageTempValues");
/*
* Deleting output directory for reuseing same dir
*/
Path dest = new Path(args[1]);
if(fs.exists(dest)){
fs.delete(dest, true);
}
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(2);
job.setMapperClass(NewMapper.class);
job.setReducerClass(NewReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleArrayWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
// ^^ MaxTemperature
// cc MaxTemperatureMapper Mapper for maximum temperature example
// vv MaxTemperatureMapper
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class NewMapper extends
Mapper<LongWritable, Text, Text, DoubleArrayWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String Str = value.toString();
String[] Mylist = new String[1000];
int i = 0;
for (String retval : Str.split("\\s+")) {
System.out.println(retval);
Mylist[i++] = retval;
}
String Val = Mylist[2];
String Year = Val.substring(0, 4);
String Month = Val.substring(5, 6);
String[] Section = Val.split("_");
String section_string = "0";
if (Section[1].matches("^(0|1|2|3|4|5)$")) {
section_string = "4";
} else if (Section[1].matches("^(6|7|8|9|10|11)$")) {
section_string = "1";
} else if (Section[1].matches("^(12|13|14|15|16|17)$")) {
section_string = "2";
} else if (Section[1].matches("^(18|19|20|21|22|23)$")) {
section_string = "3";
}
DoubleWritable[] array = new DoubleWritable[6];
DoubleArrayWritable output = new DoubleArrayWritable();
array[0].set(Double.parseDouble(Mylist[3]));
array[2].set(Double.parseDouble(Mylist[4]));
array[4].set(Double.parseDouble(Mylist[12]));
for (int j = 0; j < 6; j = j + 2) {
if (999.9 == array[j].get()) {
array[j + 1].set(0);
} else {
array[j + 1].set(1);
}
}
output.set(array);
context.write(new Text(Year + section_string + Month), output);
}
}
//cc MaxTemperatureReducer Reducer for maximum temperature example
//vv MaxTemperatureReducer
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class NewReducer extends
Reducer<Text, DoubleArrayWritable, Text, DoubleArrayWritable> {
@Override
public void reduce(Text key, Iterable<DoubleArrayWritable> values,
Context context) throws IOException, InterruptedException {
double sumvalueTemp = 0;
double sumvalueDewpoint = 0;
double sumvalueWindspeed = 0;
double total_records_Temp = 0;
double total_records_Dewpoint = 0;
double total_records_Windspeed = 0;
double average_Temp = Integer.MIN_VALUE;
double average_Dewpoint = Integer.MIN_VALUE;
double average_Windspeed = Integer.MIN_VALUE;
DoubleWritable[] temp = new DoubleWritable[3];
DoubleArrayWritable output = new DoubleArrayWritable();
for (DoubleArrayWritable avalue : values) {
Writable[] value = avalue.get();
// DoubleWritable[] value = new DoubleWritable[6];
// for(int k=0;k<6;k++){
// value[k] = DoubleWritable(wvalue[k]);
// }
//parse accordingly
if (Double.parseDouble(value[1].toString()) != 0) {
total_records_Temp = total_records_Temp + 1;
sumvalueTemp = sumvalueTemp + Double.parseDouble(value[0].toString());
}
if (Double.parseDouble(value[3].toString()) != 0) {
total_records_Dewpoint = total_records_Dewpoint + 1;
sumvalueDewpoint = sumvalueDewpoint + Double.parseDouble(value[2].toString());
}
if (Double.parseDouble(value[5].toString()) != 0) {
total_records_Windspeed = total_records_Windspeed + 1;
sumvalueWindspeed = sumvalueWindspeed + Double.parseDouble(value[4].toString());
}
}
/*
* if (value[1].get()!=0) { total_records_Temp = total_records_Temp + 1;
* sumvalueTemp = sumvalueTemp + value[0].get(); } if
* (value[3].get()!=0) { total_records_Dewpoint = total_records_Dewpoint
* + 1; sumvalueDewpoint = sumvalueDewpoint + value[2].get(); } if
* (value[5].get()!=0) { total_records_Windspeed =
* total_records_Windspeed + 1; sumvalueWindspeed = sumvalueWindspeed +
* value[4].get(); } }
*/
average_Temp = sumvalueTemp / total_records_Temp;
average_Dewpoint = sumvalueDewpoint / total_records_Dewpoint;
average_Windspeed = sumvalueWindspeed / total_records_Windspeed;
temp[0].set(average_Temp);
temp[1].set(average_Dewpoint);
temp[2].set(average_Windspeed);
output.set(temp);
context.write(key, output);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]