Re: Spark 1.5.2 memory error

2016-02-02 Thread Jim Green
Look at part#3 in below blog:
http://www.openkb.info/2015/06/resource-allocation-configurations-for.html

You may want to increase the executor memory, not just the
spark.yarn.executor.memoryOverhead.

On Tue, Feb 2, 2016 at 2:14 PM, Stefan Panayotov  wrote:

> For the memoryOvethead I have the default of 10% of 16g, and Spark version
> is 1.5.2.
>
>
>
> Stefan Panayotov, PhD
> Sent from Outlook Mail for Windows 10 phone
>
>
>
>
> *From: *Ted Yu 
> *Sent: *Tuesday, February 2, 2016 4:52 PM
> *To: *Jakob Odersky 
> *Cc: *Stefan Panayotov ; user@spark.apache.org
> *Subject: *Re: Spark 1.5.2 memory error
>
>
>
> What value do you use for spark.yarn.executor.memoryOverhead ?
>
>
>
> Please see https://spark.apache.org/docs/latest/running-on-yarn.html for
> description of the parameter.
>
>
>
> Which Spark release are you using ?
>
>
>
> Cheers
>
>
>
> On Tue, Feb 2, 2016 at 1:38 PM, Jakob Odersky  wrote:
>
> Can you share some code that produces the error? It is probably not
> due to spark but rather the way data is handled in the user code.
> Does your code call any reduceByKey actions? These are often a source
> for OOM errors.
>
>
> On Tue, Feb 2, 2016 at 1:22 PM, Stefan Panayotov 
> wrote:
> > Hi Guys,
> >
> > I need help with Spark memory errors when executing ML pipelines.
> > The error that I see is:
> >
> >
> > 16/02/02 20:34:17 INFO Executor: Executor is trying to kill task 32.0 in
> > stage 32.0 (TID 3298)
> >
> >
> > 16/02/02 20:34:17 INFO Executor: Executor is trying to kill task 12.0 in
> > stage 32.0 (TID 3278)
> >
> >
> > 16/02/02 20:34:39 INFO MemoryStore: ensureFreeSpace(2004728720) called
> with
> > curMem=296303415, maxMem=8890959790
> >
> >
> > 16/02/02 20:34:39 INFO MemoryStore: Block taskresult_3298 stored as
> bytes in
> > memory (estimated size 1911.9 MB, free 6.1 GB)
> >
> >
> > 16/02/02 20:34:39 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15:
> > SIGTERM
> >
> >
> > 16/02/02 20:34:39 ERROR Executor: Exception in task 12.0 in stage 32.0
> (TID
> > 3278)
> >
> >
> > java.lang.OutOfMemoryError: Java heap space
> >
> >
> >at java.util.Arrays.copyOf(Arrays.java:2271)
> >
> >
> >at
> > java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)
> >
> >
> >at
> >
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:86)
> >
> >
> >at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:256)
> >
> >
> >at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >
> >
> >at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >
> >
> >at java.lang.Thread.run(Thread.java:745)
> >
> >
> > 16/02/02 20:34:39 INFO DiskBlockManager: Shutdown hook called
> >
> >
> > 16/02/02 20:34:39 INFO Executor: Finished task 32.0 in stage 32.0 (TID
> > 3298). 2004728720 bytes result sent via BlockManager)
> >
> >
> > 16/02/02 20:34:39 ERROR SparkUncaughtExceptionHandler: Uncaught
> exception in
> > thread Thread[Executor task launch worker-8,5,main]
> >
> >
> > java.lang.OutOfMemoryError: Java heap space
> >
> >
> >at java.util.Arrays.copyOf(Arrays.java:2271)
> >
> >
> >at
> > java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)
> >
> >
> >at
> >
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:86)
> >
> >
> >at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:256)
> >
> >
> >at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >
> >
> >at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >
> >
> >at java.lang.Thread.run(Thread.java:745)
> >
> >
> > 16/02/02 20:34:39 INFO ShutdownHookManager: Shutdown hook called
> >
> >
> > 16/02/02 20:34:39 INFO MetricsSystemImpl: Stopping azure-file-system
> metrics
> > system...
> >
> >
> > 16/02/02 20:34:39 INFO MetricsSinkAdapter: azurefs2 thread interrupted.
> >
> >
> > 16/02/02 20:34:39 INFO MetricsSystemImpl: azure-file-system metrics
> system
> > stopped.
> >
> >
> > 16/02/02 20:34:39 INFO MetricsSystemImpl: azure-file-system metrics
> system
> > shutdown complete.
> >
> >
> >
> >
> >
> > And …..
> >
> >
> >
> >
> >
> > 16/02/02 20:09:03 INFO impl.ContainerManagementProtocolProxy: Opening
> proxy
> > : 10.0.0.5:30050
> >
> >
> > 16/02/02 20:33:51 INFO yarn.YarnAllocator: Completed container
> > container_1454421662639_0011_01_05 (state: COMPLETE, exit status:
> -104)
> >
> >
> > 16/02/02 20:33:51 WARN yarn.YarnAllocator: Container killed by YARN for
> > exceeding memory limits. 16.8 GB of 16.5 GB physical memory used.
> Consider
> > boosting spark.yarn.executor.memoryOverhead.
> >
> >
> > 16/02/02 20:33:56 INFO yarn.YarnAllocator: Will request 1 executor
> > 

Array column stored as “.bag” in parquet file instead of “REPEATED INT64

2015-08-27 Thread Jim Green
Hi Team,

Say I have a test.json file: {c1:[1,2,3]}
I can create a parquet file like :
var df = sqlContext.load(/tmp/test.json,json)
var df_c = df.repartition(1)
df_c.select(*).save(/tmp/testjson_spark,parquet”)

The output parquet file’s schema is like:
c1:  OPTIONAL F:1
.bag:REPEATED F:1
..array: OPTIONAL INT64 R:1 D:3

Is there anyway to avoid using “.bag”, instead of, can we create the
parquet file using column type “REPEATED INT64”?
The expected data type is:
c1:  REPEATED INT64 R:1 D:1

Thanks!
-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Re: Is SPARK-3322 fixed in latest version of Spark?

2015-08-05 Thread Jim Green
We tested Spark 1.2 and 1.3 , and this issue is gone. I know starting from
1.2, Spark uses netty instead of nio.
So you mean that bypass this issue?

Another question is , why this error message did not show in Spark 0.9 or
older version?

On Tue, Aug 4, 2015 at 11:01 PM, Aaron Davidson ilike...@gmail.com wrote:

 ConnectionManager has been deprecated and is no longer used by default
 (NettyBlockTransferService is the replacement). Hopefully you would no
 longer see these messages unless you have explicitly flipped it back on.

 On Tue, Aug 4, 2015 at 6:14 PM, Jim Green openkbi...@gmail.com wrote:

 And also https://issues.apache.org/jira/browse/SPARK-3106
 This one is still open.

 On Tue, Aug 4, 2015 at 6:12 PM, Jim Green openkbi...@gmail.com wrote:

 *Symotom:*
 Even sample job fails:
 $ MASTER=spark://xxx:7077 run-example org.apache.spark.examples.SparkPi
 10
 Pi is roughly 3.140636
 ERROR ConnectionManager: Corresponding SendingConnection to
 ConnectionManagerId(xxx,) not found
 WARN ConnectionManager: All connections not cleaned up

 Found https://issues.apache.org/jira/browse/SPARK-3322
 But the code changes are not in newer version os Spark, however this
 jira is marked as fixed.
 Is this issue really fixed in latest version? If so, what is the related
 JIRA?

 --
 Thanks,
 www.openkb.info
 (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)




 --
 Thanks,
 www.openkb.info
 (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)





-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Re: Is SPARK-3322 fixed in latest version of Spark?

2015-08-04 Thread Jim Green
And also https://issues.apache.org/jira/browse/SPARK-3106
This one is still open.

On Tue, Aug 4, 2015 at 6:12 PM, Jim Green openkbi...@gmail.com wrote:

 *Symotom:*
 Even sample job fails:
 $ MASTER=spark://xxx:7077 run-example org.apache.spark.examples.SparkPi 10
 Pi is roughly 3.140636
 ERROR ConnectionManager: Corresponding SendingConnection to
 ConnectionManagerId(xxx,) not found
 WARN ConnectionManager: All connections not cleaned up

 Found https://issues.apache.org/jira/browse/SPARK-3322
 But the code changes are not in newer version os Spark, however this jira
 is marked as fixed.
 Is this issue really fixed in latest version? If so, what is the related
 JIRA?

 --
 Thanks,
 www.openkb.info
 (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)




-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Is SPARK-3322 fixed in latest version of Spark?

2015-08-04 Thread Jim Green
*Symotom:*
Even sample job fails:
$ MASTER=spark://xxx:7077 run-example org.apache.spark.examples.SparkPi 10
Pi is roughly 3.140636
ERROR ConnectionManager: Corresponding SendingConnection to
ConnectionManagerId(xxx,) not found
WARN ConnectionManager: All connections not cleaned up

Found https://issues.apache.org/jira/browse/SPARK-3322
But the code changes are not in newer version os Spark, however this jira
is marked as fixed.
Is this issue really fixed in latest version? If so, what is the related
JIRA?

-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Resource allocation configurations for Spark on Yarn

2015-06-12 Thread Jim Green
Hi Team,

Sharing one article which summarize the Resource allocation configurations
for Spark on Yarn:
Resource allocation configurations for Spark on Yarn
http://www.openkb.info/2015/06/resource-allocation-configurations-for.html

-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Spark impersonation

2015-02-02 Thread Jim Green
Hi Team,

Does spark support impersonation?
For example, when spark on yarn/hive/hbase/etc..., which user is used by
default?
The user which starts the spark job?
Any suggestions related to impersonation?

-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Scala on Spark functions examples cheatsheet.

2015-02-02 Thread Jim Green
Hi Team,

I just spent some time these 2 weeks on Scala and tried all Scala on Spark
functions in the Spark Programming Guide
http://spark.apache.org/docs/1.2.0/programming-guide.html.
If you need example codes of Scala on Spark functions, I created this cheat
sheet  http://www.openkb.info/2015/01/scala-on-spark-cheatsheet.htmlwith
examples.

Sharing.

-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Bulk loading into hbase using saveAsNewAPIHadoopFile

2015-01-27 Thread Jim Green
Hi Team,

I need some help on writing a scala to bulk load some data into hbase.
*Env:*
hbase 0.94
spark-1.0.2

I am trying below code to just bulk load some data into hbase table “t1”.

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat

val conf = HBaseConfiguration.create()
val tableName = t1
val table = new HTable(conf, tableName)

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)

val num = sc.parallelize(1 to 10)
val rdd = num.map(x={
val put: Put = new Put(Bytes.toBytes(x))
put.add(cf.getBytes(), c1.getBytes(), (value_xxx).getBytes())
(new ImmutableBytesWritable(Bytes.toBytes(x)), put)
})
rdd.saveAsNewAPIHadoopFile(/tmp/8, classOf[ImmutableBytesWritable],
classOf[Put], classOf[HFileOutputFormat], conf)


However I am allways getting below error:
java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be
cast to org.apache.hadoop.hbase.KeyValue
at
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

My questions are:
1. Do we have a sample code to do bulk load into hbase directly?
Can we use saveAsNewAPIHadoopFile?

2. Is there any other way to do this?
For example, firstly write a hfile on hdfs, and then use hbase command to
bulk load?
Any sample code using scala?

Thanks.




-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Re: Bulk loading into hbase using saveAsNewAPIHadoopFile

2015-01-27 Thread Jim Green
Thanks Ted. Could you give me a simple example to load one row data in
hbase? How should I generate the KeyValue?
I tried multiple times, and still can not figure it out.

On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu yuzhih...@gmail.com wrote:

 Here is the method signature used by HFileOutputFormat :
   public void write(ImmutableBytesWritable row, KeyValue kv)

 Meaning, KeyValue is expected, not Put.

 On Tue, Jan 27, 2015 at 10:54 AM, Jim Green openkbi...@gmail.com wrote:

 Hi Team,

 I need some help on writing a scala to bulk load some data into hbase.
 *Env:*
 hbase 0.94
 spark-1.0.2

 I am trying below code to just bulk load some data into hbase table “t1”.

 import org.apache.spark._
 import org.apache.spark.rdd.NewHadoopRDD
 import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
 import org.apache.hadoop.hbase.client.HBaseAdmin
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HColumnDescriptor
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.mapred.TableOutputFormat
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 import org.apache.hadoop.hbase.KeyValue
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat

 val conf = HBaseConfiguration.create()
 val tableName = t1
 val table = new HTable(conf, tableName)

 conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
 val job = Job.getInstance(conf)
 job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
 job.setMapOutputValueClass (classOf[KeyValue])
 HFileOutputFormat.configureIncrementalLoad (job, table)

 val num = sc.parallelize(1 to 10)
 val rdd = num.map(x={
 val put: Put = new Put(Bytes.toBytes(x))
 put.add(cf.getBytes(), c1.getBytes(), (value_xxx).getBytes())
 (new ImmutableBytesWritable(Bytes.toBytes(x)), put)
 })
 rdd.saveAsNewAPIHadoopFile(/tmp/8, classOf[ImmutableBytesWritable],
 classOf[Put], classOf[HFileOutputFormat], conf)


 However I am allways getting below error:
 java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot
 be cast to org.apache.hadoop.hbase.KeyValue
 at
 org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 My questions are:
 1. Do we have a sample code to do bulk load into hbase directly?
 Can we use saveAsNewAPIHadoopFile?

 2. Is there any other way to do this?
 For example, firstly write a hfile on hdfs, and then use hbase command to
 bulk load?
 Any sample code using scala?

 Thanks.




 --
 Thanks,
 www.openkb.info
 (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)





-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Re: Bulk loading into hbase using saveAsNewAPIHadoopFile

2015-01-27 Thread Jim Green
I used below code, and it still failed with the same error.
Anyone has experience on bulk loading using scala?
Thanks.

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat

val conf = HBaseConfiguration.create()
val tableName = t1
val table = new HTable(conf, tableName)

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)

val num = sc.parallelize(1 to 10)
val rdd = num.map(x={
val put: Put = new Put(Bytes.toBytes(x))
put.add(cf.getBytes(), c1.getBytes(), (value_xxx).getBytes())
(new ImmutableBytesWritable(Bytes.toBytes(x)), put)
})
rdd.saveAsNewAPIHadoopFile(/tmp/13, classOf[ImmutableBytesWritable],
classOf[KeyValue], classOf[HFileOutputFormat], conf)



On Tue, Jan 27, 2015 at 12:17 PM, Jim Green openkbi...@gmail.com wrote:

 Thanks Ted. Could you give me a simple example to load one row data in
 hbase? How should I generate the KeyValue?
 I tried multiple times, and still can not figure it out.

 On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu yuzhih...@gmail.com wrote:

 Here is the method signature used by HFileOutputFormat :
   public void write(ImmutableBytesWritable row, KeyValue kv)

 Meaning, KeyValue is expected, not Put.

 On Tue, Jan 27, 2015 at 10:54 AM, Jim Green openkbi...@gmail.com wrote:

 Hi Team,

 I need some help on writing a scala to bulk load some data into hbase.
 *Env:*
 hbase 0.94
 spark-1.0.2

 I am trying below code to just bulk load some data into hbase table “t1”.

 import org.apache.spark._
 import org.apache.spark.rdd.NewHadoopRDD
 import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
 import org.apache.hadoop.hbase.client.HBaseAdmin
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HColumnDescriptor
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.mapred.TableOutputFormat
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 import org.apache.hadoop.hbase.KeyValue
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat

 val conf = HBaseConfiguration.create()
 val tableName = t1
 val table = new HTable(conf, tableName)

 conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
 val job = Job.getInstance(conf)
 job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
 job.setMapOutputValueClass (classOf[KeyValue])
 HFileOutputFormat.configureIncrementalLoad (job, table)

 val num = sc.parallelize(1 to 10)
 val rdd = num.map(x={
 val put: Put = new Put(Bytes.toBytes(x))
 put.add(cf.getBytes(), c1.getBytes(), (value_xxx).getBytes())
 (new ImmutableBytesWritable(Bytes.toBytes(x)), put)
 })
 rdd.saveAsNewAPIHadoopFile(/tmp/8,
 classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat],
 conf)


 However I am allways getting below error:
 java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot
 be cast to org.apache.hadoop.hbase.KeyValue
 at
 org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 My questions are:
 1. Do we have a sample code to do bulk load into hbase directly?
 Can we use