Hello,
Does any body has a working example of DBOutputformat. That connects to
the DB Server (MYSQL) and then writes a record to the table.
I tried by following the instruction on
"http://www.cloudera.com/blog/2009/03/database-access-with-hadoop/" as
below but was getting an IOException.
It will be great if anyone can send me example for hadoop 0.20.2 . The
one below is for an earlier version.
<!-- Runner Class -->
public class EmployeeDBRunner {
public static void main(String[] args) {
Configuration configuration = new Configuration();
JobConf jobConf = new JobConf(configuration,
EmployeeDBRunner.class);
DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver",
"jdbc:mysql://localhost/mydatabase","myuser", "mypass");
String[] fields = { "employee_id", "name" };
DBOutputFormat.setOutput(jobConf, "employees", fields);
JobConf conf = new JobConf(EmployeeDBRunner.class);
conf.setJobName("Employee");
FileInputFormat.addInputPath(conf, new Path(args[0])); //set
input as file
conf.setMapperClass(TokenMapper.class);
conf.setReducerClass(DBReducer.class);
conf.setOutputFormat(DBOutputFormat.class); //set output as
DBOF to output data to a table.
// <Text, IntWritable>
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
// <MyRecord,NullWritable>
conf.setOutputKeyClass(MyRecord.class);
conf.setOutputValueClass(NullWritable.class);
try {
JobClient.runJob(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
}
<!-- Mapper -->
public class TokenMapper extends MapReduceBase implements
Mapper<Object, Text, Text, IntWritable> {
IntWritable single = new IntWritable(1);
public void map(Object arg0, Text line,
OutputCollector<Text, IntWritable> collector, Reporter arg3)
throws IOException {
StringTokenizer stk = new StringTokenizer(line.toString());
while (stk.hasMoreTokens()) {
Text token = new Text(stk.nextToken());
collector.collect(token, single);
}
}
}
<!-- Reducer class-->
public class DBReducer extends MapReduceBase implements
org.apache.hadoop.mapred.Reducer<Text, IntWritable,
MyRecord,NullWritable> {
NullWritable n = NullWritable.get();
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<MyRecord,NullWritable> output, Reporter
reporter)
throws IOException {
long sum = 0;
for (; values.hasNext();) {
values.next();
sum++;
}
MyRecord mRecord = new MyRecord(sum, key.toString());
System.out.println(mRecord.getName());
output.collect(mRecord,n);
}
}