I used below code, and it still failed with the same error. Anyone has experience on bulk loading using scala? Thanks.
import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat val conf = HBaseConfiguration.create() val tableName = "t1" val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) val num = sc.parallelize(1 to 10) val rdd = num.map(x=>{ val put: Put = new Put(Bytes.toBytes(x)) put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes()) (new ImmutableBytesWritable(Bytes.toBytes(x)), put) }) rdd.saveAsNewAPIHadoopFile("/tmp/xxxx13", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf) On Tue, Jan 27, 2015 at 12:17 PM, Jim Green <openkbi...@gmail.com> wrote: > Thanks Ted. Could you give me a simple example to load one row data in > hbase? How should I generate the KeyValue? > I tried multiple times, and still can not figure it out. > > On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Here is the method signature used by HFileOutputFormat : >> public void write(ImmutableBytesWritable row, KeyValue kv) >> >> Meaning, KeyValue is expected, not Put. >> >> On Tue, Jan 27, 2015 at 10:54 AM, Jim Green <openkbi...@gmail.com> wrote: >> >>> Hi Team, >>> >>> I need some help on writing a scala to bulk load some data into hbase. >>> *Env:* >>> hbase 0.94 >>> spark-1.0.2 >>> >>> I am trying below code to just bulk load some data into hbase table “t1”. >>> >>> import org.apache.spark._ >>> import org.apache.spark.rdd.NewHadoopRDD >>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} >>> import org.apache.hadoop.hbase.client.HBaseAdmin >>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat >>> import org.apache.hadoop.fs.Path; >>> import org.apache.hadoop.hbase.HColumnDescriptor >>> import org.apache.hadoop.hbase.util.Bytes >>> import org.apache.hadoop.hbase.client.Put; >>> import org.apache.hadoop.hbase.client.HTable; >>> import org.apache.hadoop.hbase.mapred.TableOutputFormat >>> import org.apache.hadoop.mapred.JobConf >>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable >>> import org.apache.hadoop.mapreduce.Job >>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat >>> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat >>> import org.apache.hadoop.hbase.KeyValue >>> import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat >>> >>> val conf = HBaseConfiguration.create() >>> val tableName = "t1" >>> val table = new HTable(conf, tableName) >>> >>> conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) >>> val job = Job.getInstance(conf) >>> job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) >>> job.setMapOutputValueClass (classOf[KeyValue]) >>> HFileOutputFormat.configureIncrementalLoad (job, table) >>> >>> val num = sc.parallelize(1 to 10) >>> val rdd = num.map(x=>{ >>> val put: Put = new Put(Bytes.toBytes(x)) >>> put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes()) >>> (new ImmutableBytesWritable(Bytes.toBytes(x)), put) >>> }) >>> rdd.saveAsNewAPIHadoopFile("/tmp/xxxx8", >>> classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat], >>> conf) >>> >>> >>> However I am allways getting below error: >>> java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot >>> be cast to org.apache.hadoop.hbase.KeyValue >>> at >>> org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161) >>> at >>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718) >>> at >>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699) >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) >>> at org.apache.spark.scheduler.Task.run(Task.scala:51) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> My questions are: >>> 1. Do we have a sample code to do bulk load into hbase directly? >>> Can we use saveAsNewAPIHadoopFile? >>> >>> 2. Is there any other way to do this? >>> For example, firstly write a hfile on hdfs, and then use hbase command >>> to bulk load? >>> Any sample code using scala? >>> >>> Thanks. >>> >>> >>> >>> >>> -- >>> Thanks, >>> www.openkb.info >>> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool) >>> >> >> > > > -- > Thanks, > www.openkb.info > (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool) > -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)