"The reason not using sc.newAPIHadoopRDD is it only support one scan each
time."

I am not sure is that's true. You can use multiple scans as following:

val scanStrings = scans.map(scan => convertScanToString(scan))
conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*)

where convertScanToString is implemented as:

/**
 * Serializes a HBase scan into string.
 * @param scan Scan to serialize.
 * @return Base64 encoded serialized scan.
 */
private def convertScanToString(scan: Scan) = {
  val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
}

Thanks,
Aniket

On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu <zsxw...@gmail.com> wrote:

> Just point out a bug in your codes. You should not use `mapPartitions`
> like that. For details, I recommend Section "setup() and cleanup()" in Sean
> Owen's post:
> http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/
>
> Best Regards,
> Shixiong Zhu
>
> 2014-12-14 16:35 GMT+08:00 Yanbo <yanboha...@gmail.com>:
>>
>> In #1, class HTable can not be serializable.
>> You also need to check you self defined function getUserActions and make
>> sure it is a member function of one class who implement serializable
>> interface.
>>
>> 发自我的 iPad
>>
>> > 在 2014年12月12日,下午4:35,yangliuyu <yangli...@163.com> 写道:
>> >
>> > The scenario is using HTable instance to scan multiple rowkey range in
>> Spark
>> > tasks look likes below:
>> > Option 1:
>> > val users = input
>> >      .map { case (deviceId, uid) =>
>> > uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
>> >      val conf = HBaseConfiguration.create()
>> >      val table = new HTable(conf, "actions")
>> >      val result = iterator.map{ userId=>
>> >        (userId, getUserActions(table, userId, timeStart, timeStop))
>> >      }
>> >      table.close()
>> >      result
>> >    })
>> >
>> > But got the exception:
>> > org.apache.spark.SparkException: Task not serializable
>> >        at
>> >
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>> >        at
>> > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>> >        at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>> >        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
>> >        at
>> >
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
>> > ...
>> > Caused by: java.io.NotSerializableException:
>> > org.apache.hadoop.conf.Configuration
>> >        at
>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>> >        at
>> >
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> >        at
>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >
>> > The reason not using sc.newAPIHadoopRDD is it only support one scan each
>> > time.
>> > val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
>> >      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>> >      classOf[org.apache.hadoop.hbase.client.Result])
>> >
>> > And if using MultiTableInputFormat, driver is not possible put all
>> rowkeys
>> > into HBaseConfiguration
>> > Option 2:
>> > sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
>> >      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>> >      classOf[org.apache.hadoop.hbase.client.Result])
>> >
>> > It may divide all rowkey ranges into several parts then use option 2,
>> but I
>> > prefer option 1. So is there any solution for option 1?
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>

Reply via email to