Hi Team, I need some help on writing a scala to bulk load some data into hbase. *Env:* hbase 0.94 spark-1.0.2
I am trying below code to just bulk load some data into hbase table “t1”. import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat val conf = HBaseConfiguration.create() val tableName = "t1" val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) val num = sc.parallelize(1 to 10) val rdd = num.map(x=>{ val put: Put = new Put(Bytes.toBytes(x)) put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes()) (new ImmutableBytesWritable(Bytes.toBytes(x)), put) }) rdd.saveAsNewAPIHadoopFile("/tmp/xxxx8", classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat], conf) However I am allways getting below error: java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) My questions are: 1. Do we have a sample code to do bulk load into hbase directly? Can we use saveAsNewAPIHadoopFile? 2. Is there any other way to do this? For example, firstly write a hfile on hdfs, and then use hbase command to bulk load? Any sample code using scala? Thanks. -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)