Please take a look at
http://hbase.apache.org/book.html#_language_integrated_query

The above is based on hbase-spark module which is part of the upcoming
hbase 2.0 release.

Cheers

On Sun, Oct 16, 2016 at 11:37 AM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> Hi,
>
> I have trade data stored in Hbase table. Data arrives in csv format to HDFS
> and then loaded into Hbase via periodic load with
> org.apache.hadoop.hbase.mapreduce.ImportTsv.
>
> The Hbase table has one Column family "trade_info" and three columns:
> ticker, timecreated, price.
>
> The RowKey is UUID. So each row has UUID, ticker, timecreated and price in
> the csv file
>
> Each row in Hbase is a key, value map. In my case, I have one Column Family
> and three columns. Without going into semantics I see Hbase as a column
> oriented database where column data stay together.
>
> So I thought of this way of accessing the data.
>
> I define an RDD for each column in the column family as below. In this case
> column trade_info:ticker
>
> //create rdd
> val hBaseRDD = sc.newAPIHadoopRDD(conf,
> classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.
> ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
> val rdd1 = hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow,
> result.getColumn("price_info".getBytes(), "ticker".getBytes()))).map(row
> =>
> {
> (
>   row._1.map(_.toChar).mkString,
>   row._2.asScala.reduceLeft {
>     (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
>   }.getValue.map(_.toChar).mkString
> )
> })
> case class columns (key: String, ticker: String)
> val dfticker = rdd1.toDF.map(p => columns(p(0).toString,p(1).toString))
>
> Note that the end result is a DataFrame with the RowKey -> key and column
> -> ticker
>
> I use the same approach to create two other DataFrames, namely
> dftimecreated
> and dfprice for the two other columns.
>
> Note that if I don't need a column, then I do not create a DF for it. So a
> DF with each column I use. I am not sure how this compares if I read the
> full row through other methods if any.
>
> Anyway all I need to do after creating a DataFrame for each column is to
> join themthrough RowKey to slice and dice data. Like below.
>
> Get me the latest prices ordered by timecreated and ticker (ticker is
> stock)
>
> val rs =
> dfticker.join(dftimecreated,"key").join(dfprice,"key").
> orderBy('timecreated
> desc, 'price desc).select('timecreated, 'ticker,
> 'price.cast("Float").as("Latest price"))
> rs.show(10)
>
> +-------------------+------+------------+
> |        timecreated|ticker|Latest price|
> +-------------------+------+------------+
> |2016-10-16T18:44:57|   S16|   97.631966|
> |2016-10-16T18:44:57|   S13|    92.11406|
> |2016-10-16T18:44:57|   S19|    85.93021|
> |2016-10-16T18:44:57|   S09|   85.714645|
> |2016-10-16T18:44:57|   S15|    82.38932|
> |2016-10-16T18:44:57|   S17|    80.77747|
> |2016-10-16T18:44:57|   S06|    79.81854|
> |2016-10-16T18:44:57|   S18|    74.10128|
> |2016-10-16T18:44:57|   S07|    66.13622|
> |2016-10-16T18:44:57|   S20|    60.35727|
> +-------------------+------+------------+
> only showing top 10 rows
>
> Is this a workable solution?
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=
> AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCd
> OABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>

Reply via email to