I have many map reduce jobs using hbase table as input. Others are all correct.
This one is a little bit difference because it use both hdfs and hbase
as input source.
btw, even there are errors, the job can run successfully.
My codes:
1. Hbase Table Mapper, mapper output key is Text and value is my custom writable
public class UrlDedupHbaseMapper extends TableMapper<Text,
ExtractResultWritable> {
@Override
public void map(NullWritable key, ExtractResultWritable value, Context context)
throws IOException, InterruptedException {
}
}
2. Hdfs Mapper, which read from sequence files, input key is
NullWritable, value is my custom writable. Mapper output key is Text
and value is my custome writable
public class UrlDedupHbaseMapper extends TableMapper<Text,
ExtractResultWritable> {
public void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
}
}
3. Reducer
public class UrlDedupReducer extends
Reducer<Text, ExtractResultWritable, NullWritable, ExtractResultWritable> {
@Override
public void reduce(Text key, Iterable<ExtractResultWritable>
values, Context context)
throws IOException, InterruptedException {
}
}
4. driver class
Configuration conf=getConf();
Configuration myConf = HBaseConfiguration.create(conf);
myConf.set("hbase.zookeeper.quorum", zkQuorum);
myConf.set("hbase.zookeeper.property.clientPort", zkPort);
myConf.set("mapred.child.java.opts", "-Xmx1g");
myConf.set("mapred.output.compress", "true");
Job job = new Job(myConf);
job.setJobName(UrlDedup.class.getSimpleName());
job.setJarByClass(UrlDedup.class);
ArrayList<Path> fileList=new ArrayList<Path>();
FileSystem fs = FileSystem.get(conf);
Path inFile = new Path(args[0]);
FileStatus[] status = fs.listStatus(inFile);
for(FileStatus file:status){
fileList.add(file.getPath());
}
for(Path path:fileList){
MultipleInputs.addInputPath(job, path,
SequenceFileInputFormat.class,UrlDedupHdfsMapper.class);
}
Scan urldbScan=new Scan();
urldbScan.setCaching(hbaseBlockCache);
urldbScan.setCacheBlocks(false);
urldbScan.addFamily(HbaseTools.CF_BT);
job.setInputFormatClass(SequenceFileInputFormat.class);
TableMapReduceUtil.initTableMapperJob(HbaseTools.TB_URL_DB_BT, urldbScan,
UrlDedupHbaseMapper.class, NullWritable.class,
ExtractResultWritable.class, job);
MultipleInputs.addInputPath(job, new Path("/null/path"),
TableInputFormat.class,
UrlDedupHbaseMapper.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setReducerClass(UrlDedupReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(ExtractResultWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ExtractResultWritable.class);
On Wed, Jun 25, 2014 at 4:49 PM, Ted Yu <[email protected]> wrote:
> Can you ping vc141 from this machine ?
>
> Cheers
>
> On Jun 25, 2014, at 1:29 AM, Li Li <[email protected]> wrote:
>
>> I have a map reduce job using hbase table as input. when the job
>> starts, it says:
>>
>> ERROR main org.apache.hadoop.hbase.mapreduce.TableInputFormatBase
>> Cannot resolve the host name for vc141/172.16.10.141 because of
>> javax.naming.CommunicationException: DNS error [Root exception is
>> java.net.PortUnreachableException: ICMP Port Unreachable]; remaining
>> name '141.10.16.172.in-addr.arpa'
>>
>> the /etc/hosts file:
>> 172.16.10.137 vc137
>> 172.16.10.138 vc138
>> 172.16.10.139 vc139
>> 172.16.10.140 vc140
>> 172.16.10.141 vc141
>> 172.16.10.142 vc142
>> 172.16.10.143 vc143
>> 172.16.10.144 vc144
>> 172.16.10.145 vc145
>> 172.16.10.146 vc146
>> 172.16.10.147 vc147
>> 172.16.10.148 vc148
>> 172.16.10.149 vc149