Hi Team,

I need some help on writing a scala to bulk load some data into hbase.
*Env:*
hbase 0.94
spark-1.0.2

I am trying below code to just bulk load some data into hbase table “t1”.

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat

val conf = HBaseConfiguration.create()
val tableName = "t1"
val table = new HTable(conf, tableName)

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)

val num = sc.parallelize(1 to 10)
val rdd = num.map(x=>{
    val put: Put = new Put(Bytes.toBytes(x))
    put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes())
    (new ImmutableBytesWritable(Bytes.toBytes(x)), put)
})
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx8", classOf[ImmutableBytesWritable],
classOf[Put], classOf[HFileOutputFormat], conf)


However I am allways getting below error:
java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be
cast to org.apache.hadoop.hbase.KeyValue
at
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

My questions are:
1. Do we have a sample code to do bulk load into hbase directly?
Can we use saveAsNewAPIHadoopFile?

2. Is there any other way to do this?
For example, firstly write a hfile on hdfs, and then use hbase command to
bulk load?
Any sample code using scala?

Thanks.




-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)

Reply via email to