"The reason not using sc.newAPIHadoopRDD is it only support one scan each time."
I am not sure is that's true. You can use multiple scans as following: val scanStrings = scans.map(scan => convertScanToString(scan)) conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*) where convertScanToString is implemented as: /** * Serializes a HBase scan into string. * @param scan Scan to serialize. * @return Base64 encoded serialized scan. */ private def convertScanToString(scan: Scan) = { val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } Thanks, Aniket On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu <zsxw...@gmail.com> wrote: > Just point out a bug in your codes. You should not use `mapPartitions` > like that. For details, I recommend Section "setup() and cleanup()" in Sean > Owen's post: > http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/ > > Best Regards, > Shixiong Zhu > > 2014-12-14 16:35 GMT+08:00 Yanbo <yanboha...@gmail.com>: >> >> In #1, class HTable can not be serializable. >> You also need to check you self defined function getUserActions and make >> sure it is a member function of one class who implement serializable >> interface. >> >> 发自我的 iPad >> >> > 在 2014年12月12日,下午4:35,yangliuyu <yangli...@163.com> 写道: >> > >> > The scenario is using HTable instance to scan multiple rowkey range in >> Spark >> > tasks look likes below: >> > Option 1: >> > val users = input >> > .map { case (deviceId, uid) => >> > uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{ >> > val conf = HBaseConfiguration.create() >> > val table = new HTable(conf, "actions") >> > val result = iterator.map{ userId=> >> > (userId, getUserActions(table, userId, timeStart, timeStop)) >> > } >> > table.close() >> > result >> > }) >> > >> > But got the exception: >> > org.apache.spark.SparkException: Task not serializable >> > at >> > >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) >> > at >> > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) >> > at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) >> > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) >> > at >> > >> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)... >> > ... >> > Caused by: java.io.NotSerializableException: >> > org.apache.hadoop.conf.Configuration >> > at >> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) >> > at >> > >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> > at >> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> > >> > The reason not using sc.newAPIHadoopRDD is it only support one scan each >> > time. >> > val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], >> > classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], >> > classOf[org.apache.hadoop.hbase.client.Result]) >> > >> > And if using MultiTableInputFormat, driver is not possible put all >> rowkeys >> > into HBaseConfiguration >> > Option 2: >> > sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], >> > classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], >> > classOf[org.apache.hadoop.hbase.client.Result]) >> > >> > It may divide all rowkey ranges into several parts then use option 2, >> but I >> > prefer option 1. So is there any solution for option 1? >> > >> > >> > >> > -- >> > View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html >> > Sent from the Apache Spark User List mailing list archive at Nabble.com. >> > >> > --------------------------------------------------------------------- >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> > For additional commands, e-mail: user-h...@spark.apache.org >> > >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >>