Thanks All.

Finally the works code is below:


object PlayRecord {
  def getUserActions(accounts: RDD[String], idType: Int, timeStart: Long, 
timeStop: Long, cacheSize: Int,
                     filterSongDays: Int, filterPlaylistDays: Int): 
RDD[(String, (Int, Set[Long], Set[Long]))] = {
    accounts.mapPartitions(iterator => {
      if (iterator.nonEmpty) {
        val conf = HBaseConfiguration.create()
        val table = new HTable(conf, "user_action")
        val filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE)
        filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes("stat"), 
Bytes.toBytes("song_id"), CompareOp.EQUAL, new RegexStringComparator("^\\d+$")))
        filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes("stat"), 
Bytes.toBytes("module"), CompareOp.EQUAL, new 
BinaryComparator(Bytes.toBytes("displayed"))))
        iterator.map(id => {
          val scan = new Scan()
          scan.setCaching(cacheSize)
          scan.addFamily(Bytes.toBytes("stat"))
          scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("module"))
          scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("song_id"))
          scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("playlist_ids"))
          scan.addColumn(Bytes.toBytes("stat"), Bytes.toBytes("time"))
          val rowKeyRange = getUserRowKeyRange(id, idType, timeStart, timeStop)
          scan.setStartRow(rowKeyRange._1)
          scan.setStopRow(rowKeyRange._2)
          scan.setFilter(filterList)
          val userData = table.getScanner(scan).iterator().asScala.map(r => {
            val module = Bytes.toString(r.getValue(Bytes.toBytes("stat"), 
Bytes.toBytes("module")))
            val time = Bytes.toLong(r.getValue(Bytes.toBytes("stat"), 
Bytes.toBytes("time")))
            module match {
              case "listen" =>
                val songId = Bytes.toString(r.getValue(Bytes.toBytes("stat"), 
Bytes.toBytes("song_id")))
                (module, (time / DAY_MILLIS, songId))
              case "displayed" =>
                val playlistIds = 
Bytes.toString(r.getValue(Bytes.toBytes("stat"), Bytes.toBytes("playlist_ids")))
                (module, (time / DAY_MILLIS, playlistIds))
              case _ =>
                (module, (0L, ""))
            }
          }).toList.groupBy(_._1)
          val playRecordData = userData.get("listen")
          val playRecords = if (playRecordData.nonEmpty) 
playRecordData.get.map(_._2).groupBy(_._1).toList.sortBy(-_._1).take(filterSongDays).flatMap(_._2).map(_._2.toLong).toSet
          else Set[Long]()
          val playlistDisPlayData = userData.get("displayed")
          val playlistRecords = if (playlistDisPlayData.nonEmpty) 
playlistDisPlayData.get.map(_._2).groupBy(_._1).toList.sortBy(_._1).take(filterPlaylistDays).flatMap(_._2).flatMap(_._2.split(',')).map(_.toLong).toSet
          else Set[Long]()
          val result = (id, (idType, playRecords, playlistRecords))
          if (!iterator.hasNext) {
            table.close()
          }
          result
        })
      } else {
        iterator.map(id => {
          (id, (idType, Set[Long](), Set[Long]()))
        })
      }
    })
  }
}


As Shixiong mentioned Sean Owen's post: 
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/,
 I close table when iterator.hasNext is false, otherwise the application will 
be hung. And there is also another interesting project  
http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/, will 
try it later. 





At 2014-12-15 17:52:47, "Aniket Bhatnagar" <aniket.bhatna...@gmail.com> wrote:
"The reason not using sc.newAPIHadoopRDD is it only support one scan each time."



I am not sure is that's true. You can use multiple scans as following:


val scanStrings = scans.map(scan => convertScanToString(scan)) 
conf.setStrings(MultiTableInputFormat.SCANS, scanStrings : _*)


where convertScanToString is implemented as:


/**
 * Serializes a HBase scan into string.
 * @param scan Scan to serialize.
 * @return Base64 encoded serialized scan.
 */
private def convertScanToString(scan: Scan) = {
  val proto: ClientProtos.Scan = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
}


Thanks,
Aniket


On Mon Dec 15 2014 at 13:31:03 Shixiong Zhu <zsxw...@gmail.com> wrote:

Just point out a bug in your codes. You should not use `mapPartitions` like 
that. For details, I recommend Section "setup() and cleanup()" in Sean Owen's 
post: 
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/



Best Regards,

Shixiong Zhu


2014-12-14 16:35 GMT+08:00 Yanbo <yanboha...@gmail.com>:In #1, class HTable can 
not be serializable.
You also need to check you self defined function getUserActions and make sure 
it is a member function of one class who implement serializable interface.

发自我的 iPad

> 在 2014年12月12日,下午4:35,yangliuyu <yangli...@163.com> 写道:

>
> The scenario is using HTable instance to scan multiple rowkey range in Spark
> tasks look likes below:
> Option 1:
> val users = input
>      .map { case (deviceId, uid) =>
> uid}.distinct().sortBy(x=>x).mapPartitions(iterator=>{
>      val conf = HBaseConfiguration.create()
>      val table = new HTable(conf, "actions")
>      val result = iterator.map{ userId=>
>        (userId, getUserActions(table, userId, timeStart, timeStop))
>      }
>      table.close()
>      result
>    })
>
> But got the exception:
> org.apache.spark.SparkException: Task not serializable
>        at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>        at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>        at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
>        at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)...
> ...
> Caused by: java.io.NotSerializableException:
> org.apache.hadoop.conf.Configuration
>        at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>        at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>        at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
> The reason not using sc.newAPIHadoopRDD is it only support one scan each
> time.
> val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
>      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>      classOf[org.apache.hadoop.hbase.client.Result])
>
> And if using MultiTableInputFormat, driver is not possible put all rowkeys
> into HBaseConfiguration
> Option 2:
> sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
>      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>      classOf[org.apache.hadoop.hbase.client.Result])
>
> It may divide all rowkey ranges into several parts then use option 2, but I
> prefer option 1. So is there any solution for option 1?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to