Thanks for all respnding. Finally I figured out the way to use bulk load to hbase using scala on spark. The sample code is here which others can refer in future:
http://www.openkb.info/2015/01/how-to-use-scala-on-spark-to-load-data.html Thanks! On Tue, Jan 27, 2015 at 6:27 PM, Jim Green <openkbi...@gmail.com> wrote: > Thanks Sun. > My understanding is , savaAsNewHadoopFile is to save as Hfile on hdfs. > > Is it doable to use saveAsNewAPIHadoopDataset to directly loading to hbase? > If so, is there any sample code for that? > > Thanks! > > On Tue, Jan 27, 2015 at 6:07 PM, fightf...@163.com <fightf...@163.com> > wrote: > >> Hi, Jim >> Your generated rdd should be the type of RDD[ImmutableBytesWritable, >> KeyValue], while your current type goes to RDD[ImmutableBytesWritable, Put]. >> You can go like this and the result should be type of >> RDD[ImmutableBytesWritable, KeyValue] that can be savaAsNewHadoopFile >> val result = num.flatMap ( v=> { >> keyValueBuilder(v).map(v => (v,1)) >> }).map(v => ( new ImmutableBytesWritable(v._1.getBuffer(), >> v._1.getRowOffset(), v._1.getRowLength()),v._1)) >> >> where keyValueBuider would be defined as RDD[T] => RDD[List[KeyValue]], >> for example, you can go: >> val keyValueBuilder = (data: (Int, Int)) =>{ >> val rowkeyBytes = Bytes.toBytes(data._1) >> val colfam = Bytes.toBytes("cf") >> val qual = Bytes.toBytes("c1") >> val value = Bytes.toBytes("val_xxx") >> >> val kv = new KeyValue(rowkeyBytes,colfam,qual,value) >> List(kv) >> } >> >> >> Thanks, >> Sun >> ------------------------------ >> fightf...@163.com >> >> >> *From:* Jim Green <openkbi...@gmail.com> >> *Date:* 2015-01-28 04:44 >> *To:* Ted Yu <yuzhih...@gmail.com> >> *CC:* user <user@spark.apache.org> >> *Subject:* Re: Bulk loading into hbase using saveAsNewAPIHadoopFile >> I used below code, and it still failed with the same error. >> Anyone has experience on bulk loading using scala? >> Thanks. >> >> import org.apache.spark._ >> import org.apache.spark.rdd.NewHadoopRDD >> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} >> import org.apache.hadoop.hbase.client.HBaseAdmin >> import org.apache.hadoop.hbase.mapreduce.TableInputFormat >> import org.apache.hadoop.fs.Path; >> import org.apache.hadoop.hbase.HColumnDescriptor >> import org.apache.hadoop.hbase.util.Bytes >> import org.apache.hadoop.hbase.client.Put; >> import org.apache.hadoop.hbase.client.HTable; >> import org.apache.hadoop.hbase.mapred.TableOutputFormat >> import org.apache.hadoop.mapred.JobConf >> import org.apache.hadoop.hbase.io.ImmutableBytesWritable >> import org.apache.hadoop.mapreduce.Job >> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat >> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat >> import org.apache.hadoop.hbase.KeyValue >> import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat >> >> val conf = HBaseConfiguration.create() >> val tableName = "t1" >> val table = new HTable(conf, tableName) >> >> conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) >> val job = Job.getInstance(conf) >> job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) >> job.setMapOutputValueClass (classOf[KeyValue]) >> HFileOutputFormat.configureIncrementalLoad (job, table) >> >> val num = sc.parallelize(1 to 10) >> val rdd = num.map(x=>{ >> val put: Put = new Put(Bytes.toBytes(x)) >> put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes()) >> (new ImmutableBytesWritable(Bytes.toBytes(x)), put) >> }) >> rdd.saveAsNewAPIHadoopFile("/tmp/xxxx13", >> classOf[ImmutableBytesWritable], classOf[KeyValue], >> classOf[HFileOutputFormat], conf) >> >> >> >> On Tue, Jan 27, 2015 at 12:17 PM, Jim Green <openkbi...@gmail.com> wrote: >> >>> Thanks Ted. Could you give me a simple example to load one row data in >>> hbase? How should I generate the KeyValue? >>> I tried multiple times, and still can not figure it out. >>> >>> On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> Here is the method signature used by HFileOutputFormat : >>>> public void write(ImmutableBytesWritable row, KeyValue kv) >>>> >>>> Meaning, KeyValue is expected, not Put. >>>> >>>> On Tue, Jan 27, 2015 at 10:54 AM, Jim Green <openkbi...@gmail.com> >>>> wrote: >>>> >>>>> Hi Team, >>>>> >>>>> I need some help on writing a scala to bulk load some data into hbase. >>>>> *Env:* >>>>> hbase 0.94 >>>>> spark-1.0.2 >>>>> >>>>> I am trying below code to just bulk load some data into hbase table >>>>> “t1”. >>>>> >>>>> import org.apache.spark._ >>>>> import org.apache.spark.rdd.NewHadoopRDD >>>>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} >>>>> import org.apache.hadoop.hbase.client.HBaseAdmin >>>>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat >>>>> import org.apache.hadoop.fs.Path; >>>>> import org.apache.hadoop.hbase.HColumnDescriptor >>>>> import org.apache.hadoop.hbase.util.Bytes >>>>> import org.apache.hadoop.hbase.client.Put; >>>>> import org.apache.hadoop.hbase.client.HTable; >>>>> import org.apache.hadoop.hbase.mapred.TableOutputFormat >>>>> import org.apache.hadoop.mapred.JobConf >>>>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable >>>>> import org.apache.hadoop.mapreduce.Job >>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat >>>>> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat >>>>> import org.apache.hadoop.hbase.KeyValue >>>>> import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat >>>>> >>>>> val conf = HBaseConfiguration.create() >>>>> val tableName = "t1" >>>>> val table = new HTable(conf, tableName) >>>>> >>>>> conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) >>>>> val job = Job.getInstance(conf) >>>>> job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) >>>>> job.setMapOutputValueClass (classOf[KeyValue]) >>>>> HFileOutputFormat.configureIncrementalLoad (job, table) >>>>> >>>>> val num = sc.parallelize(1 to 10) >>>>> val rdd = num.map(x=>{ >>>>> val put: Put = new Put(Bytes.toBytes(x)) >>>>> put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes()) >>>>> (new ImmutableBytesWritable(Bytes.toBytes(x)), put) >>>>> }) >>>>> rdd.saveAsNewAPIHadoopFile("/tmp/xxxx8", >>>>> classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat], >>>>> conf) >>>>> >>>>> >>>>> However I am allways getting below error: >>>>> java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put >>>>> cannot be cast to org.apache.hadoop.hbase.KeyValue >>>>> at >>>>> org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161) >>>>> at >>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718) >>>>> at >>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699) >>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:51) >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> My questions are: >>>>> 1. Do we have a sample code to do bulk load into hbase directly? >>>>> Can we use saveAsNewAPIHadoopFile? >>>>> >>>>> 2. Is there any other way to do this? >>>>> For example, firstly write a hfile on hdfs, and then use hbase command >>>>> to bulk load? >>>>> Any sample code using scala? >>>>> >>>>> Thanks. >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks, >>>>> www.openkb.info >>>>> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool) >>>>> >>>> >>>> >>> >>> >>> -- >>> Thanks, >>> www.openkb.info >>> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool) >>> >> >> >> >> -- >> Thanks, >> www.openkb.info >> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool) >> >> > > > -- > Thanks, > www.openkb.info > (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool) > -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)