Hello,

Does any body has a working example of DBOutputformat. That connects to the DB Server (MYSQL) and then writes a record to the table.

I tried by following the instruction on "http://www.cloudera.com/blog/2009/03/database-access-with-hadoop/"; as below but was getting an IOException.

It will be great if anyone can send me example for hadoop 0.20.2 . The one below is for an earlier version.

<!-- Runner Class -->

public class EmployeeDBRunner {
    public static void main(String[] args) {
        Configuration configuration = new Configuration();
JobConf jobConf = new JobConf(configuration, EmployeeDBRunner.class);
       DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver",
                "jdbc:mysql://localhost/mydatabase","myuser", "mypass");
        String[] fields = { "employee_id", "name" };
        DBOutputFormat.setOutput(jobConf, "employees", fields);

        JobConf conf = new JobConf(EmployeeDBRunner.class);
        conf.setJobName("Employee");
FileInputFormat.addInputPath(conf, new Path(args[0])); //set input as file
        conf.setMapperClass(TokenMapper.class);
        conf.setReducerClass(DBReducer.class);
conf.setOutputFormat(DBOutputFormat.class); //set output as DBOF to output data to a table.

         // <Text, IntWritable>
        conf.setMapOutputKeyClass(Text.class);
        conf.setMapOutputValueClass(IntWritable.class);

        // <MyRecord,NullWritable>
        conf.setOutputKeyClass(MyRecord.class);
        conf.setOutputValueClass(NullWritable.class);
        try {
            JobClient.runJob(conf);
        } catch (IOException e) {
             e.printStackTrace();
        }

    }
}

<!-- Mapper -->
public class TokenMapper extends MapReduceBase implements
        Mapper<Object, Text, Text, IntWritable> {
    IntWritable single = new IntWritable(1);

    public void map(Object arg0, Text line,
            OutputCollector<Text, IntWritable> collector, Reporter arg3)
            throws IOException {
        StringTokenizer stk = new StringTokenizer(line.toString());
        while (stk.hasMoreTokens()) {
            Text token = new Text(stk.nextToken());
            collector.collect(token, single);
        }

    }
}

<!-- Reducer class-->
public class DBReducer extends MapReduceBase implements
org.apache.hadoop.mapred.Reducer<Text, IntWritable, MyRecord,NullWritable> {
    NullWritable n = NullWritable.get();
    public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<MyRecord,NullWritable> output, Reporter reporter)
            throws IOException {
        long sum = 0;
        for (; values.hasNext();) {
            values.next();
            sum++;
        }
        MyRecord mRecord  = new MyRecord(sum, key.toString());
       System.out.println(mRecord.getName());
        output.collect(mRecord,n);
    }
}


Reply via email to