Thanks All.
Finally the works code is below: object PlayRecord { def getUserActions(accounts: RDD[String], idType: Int, timeStart: Long, timeStop: Long, cacheSize: Int, filterSongDays: Int, filterPlaylistDays: Int): RDD[(String, (Int, Set[Long], Set[Long]))] = { accounts.mapPartitions(iterator => { if (iterator.nonEmpty) { val conf = HBaseConfiguration.create() val table = new HTable(conf, "user_action") val filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE) filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes("stat"), Bytes.toBytes("song_id"), CompareOp.EQUAL, new RegexStringComparator("^\\d+$"))) filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes("stat"), Bytes.toBytes("module"), CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("displayed")))) iterator.map(id => { val scan = new Scan() scan.setCaching(cacheSize) scan.addFamily(Bytes.toBytes("stat")) scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("module")) scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("song_id")) scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("playlist_ids")) scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("time")) val rowKeyRange = getUserRowKeyRange(id, idType, timeStart, timeStop) scan.setStartRow(rowKeyRange._1) scan.setStopRow(rowKeyRange._2) scan.setFilter(filterList) val userData = table.getScanner(scan).iterator().asScala.map(r => { val module = Bytes.toString(r.getValue(Bytes.toBytes("stat"), Bytes.toBytes("module"))) val time = Bytes.toLong(r.getValue(Bytes.toBytes("stat"), Bytes.toBytes("time"))) module match { case "listen" => val songId = Bytes.toString(r.getValue(Bytes.toBytes("stat"), Bytes.toBytes("song_id"))) (module, (time / DAY_MILLIS, songId)) case "displayed" => val playlistIds = Bytes.toString(r.getValue(Bytes.toBytes("stat"), Bytes.toBytes("playlist_ids"))) (module, (time / DAY_MILLIS, playlistIds)) case _ => (module, (0L, "")) } }).toList.groupBy(_._1) val playRecordData = userData.get("listen") val playRecords = if (playRecordData.nonEmpty) playRecordData.get.map(_._2).groupBy(_._1).toList.sortBy(-_._1).take(filterSongDays).flatMap(_._2).map(_._2.toLong).toSet else Set[Long]() val playlistDisPlayData = userData.get("displayed") val playlistRecords = if (playlistDisPlayData.nonEmpty) playlistDisPlayData.get.map(_._2).groupBy(_._1).toList.sortBy(_._1).take(filterPlaylistDays).flatMap(_._2).flatMap(_._2.split(',')).map(_.toLong).toSet else Set[Long]() val result = (id, (idType, playRecords, playlistRecords)) if (!iterator.hasNext) { table.close() } result }) } else { iterator.map(id => { (id, (idType, Set[Long](), Set[Long]())) }) } }) } } As Shixiong mentioned Sean Owen's post: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/, I close table when iterator.hasNext is false, otherwise the application will be hung. And there is also another interesting project http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/, will try it later. At 2014-12-15 17:52:47, "Aniket Bhatnagar" <aniket.bhatna...@gmail.com> wrote: "The reason not using sc.newAPIHadoopRDD is it only support one scan each time." I am not sure is that's true. You can use multiple scans as following: val scanStrings = scans.map(scan => convertScanToString(scan)) conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*) where convertScanToString is implemented as: /** * Serializes a HBase scan into string. * @param scan Scan to serialize. * @return Base64 encoded serialized scan. */ private def convertScanToString(scan: Scan) = { val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } Thanks, Aniket On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu <zsxw...@gmail.com> wrote: Just point out a bug in your codes. You should not use `mapPartitions` like that. For details, I recommend Section "setup() and cleanup()" in Sean Owen's post: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/ Best Regards, Shixiong Zhu 2014-12-14 16:35 GMT+08:00 Yanbo <yanboha...@gmail.com>:In #1, class HTable can not be serializable. You also need to check you self defined function getUserActions and make sure it is a member function of one class who implement serializable interface. 发自我的 iPad > 在 2014年12月12日,下午4:35,yangliuyu <yangli...@163.com> 写道: > > The scenario is using HTable instance to scan multiple rowkey range in Spark > tasks look likes below: > Option 1: > val users = input > .map { case (deviceId, uid) => > uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{ > val conf = HBaseConfiguration.create() > val table = new HTable(conf, "actions") > val result = iterator.map{ userId=> > (userId, getUserActions(table, userId, timeStart, timeStop)) > } > table.close() > result > }) > > But got the exception: > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)... > ... > Caused by: java.io.NotSerializableException: > org.apache.hadoop.conf.Configuration > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > > The reason not using sc.newAPIHadoopRDD is it only support one scan each > time. > val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], > classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], > classOf[org.apache.hadoop.hbase.client.Result]) > > And if using MultiTableInputFormat, driver is not possible put all rowkeys > into HBaseConfiguration > Option 2: > sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], > classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], > classOf[org.apache.hadoop.hbase.client.Result]) > > It may divide all rowkey ranges into several parts then use option 2, but I > prefer option 1. So is there any solution for option 1? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org