HConnectionManager needed some modifications in order to make it work, it's not just about backporting that job.
J-D On Wed, Apr 13, 2011 at 7:27 AM, Manuel de Ferran <[email protected]> wrote: > Greetings, > > I'm trying to backport CopyTable to HBase 0.20.6. > In other words, the challenge is to write a job that would copy data from > one HTable on cluster A to another HTable on cluster B. > > I'm able to copy HTable to another HTable on the same cluster, but I can not > find a way to point to the second cluster on the reduce phase. > > Here is the code : > > package org.myorg.sandbox; > > import java.io.IOException; > > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.conf.Configured; > import org.apache.hadoop.hbase.HBaseConfiguration; > import org.apache.hadoop.hbase.client.Put; > import org.apache.hadoop.hbase.client.Result; > import org.apache.hadoop.hbase.client.Scan; > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; > import org.apache.hadoop.hbase.mapreduce.TableMapper; > import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; > import org.apache.hadoop.hbase.util.Bytes; > import org.apache.hadoop.io.IntWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.io.Writable; > import org.apache.hadoop.mapreduce.Job; > import org.apache.hadoop.util.Tool; > import org.apache.hadoop.util.ToolRunner; > > public class CopyHTable extends Configured implements Tool { > static class CopyMapper > extends TableMapper<ImmutableBytesWritable, Put> { > @Override > public void map(ImmutableBytesWritable row, Result result, Context > context) > throws IOException { > > byte[] family = Bytes.toBytes("data"); > byte[] qual01 = Bytes.toBytes("year"); > byte[] qual02 = Bytes.toBytes("temperature"); > byte[] key = result.getRow(); > byte[] val01 = result.getValue(family, qual01); > byte[] val02 = result.getValue(family, qual02); > > Put put = new Put(key); > put.add(family, qual01, val01); > put.add(family, qual02, val02); > > try { > context.write(new ImmutableBytesWritable(row), put); > } catch (InterruptedException e) { > e.printStackTrace(); > } > } > } > > @Override > public int run(String[] args) throws Exception { > Configuration conf = new HBaseConfiguration(); > String tableName = "temp"; > > Job job = new Job(conf); > job.setJarByClass(CopyHTable.class); > > Scan s= new Scan(); > s.addFamily(Bytes.toBytes("data")); > > TableMapReduceUtil.initTableMapperJob(tableName, s, > CopyMapper.class, Text.class, > IntWritable.class, job); > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "temp"); > /* no luck with: > job.getConfiguration().set("hbase.zookeeper.quorum", "nodeb"); > */ > job.setOutputFormatClass(TableOutputFormat.class); > job.setOutputKeyClass(ImmutableBytesWritable.class); > job.setOutputValueClass(Writable.class); > > job.setNumReduceTasks(0); > > if (job.waitForCompletion(true)) return 0; > else return 1; > } > > public static void main(String[] args) throws Exception { > int exitCode = ToolRunner.run(new CopyHTable(), args); > System.exit(exitCode); > } > } >
