HBASE-14796 Enhance the Gets in the connector (Zhan Zhang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6868c636 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6868c636 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6868c636 Branch: refs/heads/trunk Commit: 6868c6366002d5b4e25980f37ede8839e7a7e92d Parents: 2fba25b Author: tedyu <yuzhih...@gmail.com> Authored: Mon Dec 28 15:48:10 2015 -0800 Committer: tedyu <yuzhih...@gmail.com> Committed: Mon Dec 28 15:48:10 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/spark/DefaultSource.scala | 33 ++----- .../hadoop/hbase/spark/datasources/Bound.scala | 24 +++++ .../spark/datasources/HBaseResources.scala | 14 +++ .../spark/datasources/HBaseSparkConf.scala | 2 + .../spark/datasources/HBaseTableScanRDD.scala | 92 +++++++++++++++++--- 5 files changed, 127 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6868c636/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala index 73cab3c..b6d7982 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala @@ -159,6 +159,10 @@ case class HBaseRelation (val tableName:String, .getOrElse(sqlContext.sparkContext.getConf.getInt( HBaseSparkConf.BATCH_NUM, HBaseSparkConf.defaultBatchNum)) + val bulkGetSize = parameters.get(HBaseSparkConf.BULKGET_SIZE).map(_.toInt) + .getOrElse(sqlContext.sparkContext.getConf.getInt( + HBaseSparkConf.BULKGET_SIZE, HBaseSparkConf.defaultBulkGetSize)) + //create or get latest HBaseContext @transient val hbaseContext:HBaseContext = if (useHBaseContext) { LatestHBaseContextCache.latest @@ -267,6 +271,7 @@ case class HBaseRelation (val tableName:String, None } val hRdd = new HBaseTableScanRDD(this, pushDownFilterJava, requiredQualifierDefinitionList.seq) + pushDownRowKeyFilter.points.foreach(hRdd.addPoint(_)) pushDownRowKeyFilter.ranges.foreach(hRdd.addRange(_)) var resultRDD: RDD[Row] = { val tmp = hRdd.map{ r => @@ -280,34 +285,6 @@ case class HBaseRelation (val tableName:String, } } - //If there are gets then we can get them from the driver and union that rdd in - // with the rest of the values. - if (getList.size() > 0) { - val connection = - ConnectionFactory.createConnection(hbaseContext.tmpHdfsConfiguration) - try { - val table = connection.getTable(TableName.valueOf(tableName)) - try { - val results = table.get(getList) - val rowList = mutable.MutableList[Row]() - for (i <- 0 until results.length) { - val rowArray = requiredColumns.map(c => - DefaultSourceStaticUtils.getValue(c, schemaMappingDefinition, results(i))) - rowList += Row.fromSeq(rowArray) - } - val getRDD = sqlContext.sparkContext.parallelize(rowList) - if (resultRDD == null) resultRDD = getRDD - else { - resultRDD = resultRDD.union(getRDD) - } - } finally { - table.close() - } - } finally { - connection.close() - } - } - if (resultRDD == null) { val scan = new Scan() scan.setCacheBlocks(blockCacheEnable) http://git-wip-us.apache.org/repos/asf/hbase/blob/6868c636/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala index 0f6098d..8e03e95 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/Bound.scala @@ -87,3 +87,27 @@ object Ranges { } } +object Points { + def and(r: Range, ps: Seq[Array[Byte]]): Seq[Array[Byte]] = { + ps.flatMap { p => + if (ord.compare(r.lower.get.b, p) <= 0) { + // if region lower bound is less or equal to the point + if (r.upper.isDefined) { + // if region upper bound is defined + if (ord.compare(r.upper.get.b, p) > 0) { + // if the upper bound is greater than the point (because upper bound is exclusive) + Some(p) + } else { + None + } + } else { + // if the region upper bound is not defined (infinity) + Some(p) + } + } else { + None + } + } + } +} + http://git-wip-us.apache.org/repos/asf/hbase/blob/6868c636/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala index 19a6ea7..14c5fd0 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseResources.scala @@ -38,6 +38,12 @@ case class ScanResource(tbr: TableResource, rs: ResultScanner) extends Resource } } +case class GetResource(tbr: TableResource, rs: Array[Result]) extends Resource { + def release() { + tbr.release() + } +} + trait ReferencedResource { var count: Int = 0 def init(): Unit @@ -100,6 +106,10 @@ case class TableResource(relation: HBaseRelation) extends ReferencedResource { def getScanner(scan: Scan): ScanResource = releaseOnException { ScanResource(this, table.getScanner(scan)) } + + def get(list: java.util.List[org.apache.hadoop.hbase.client.Get]) = releaseOnException { + GetResource(this, table.get(list)) + } } case class RegionResource(relation: HBaseRelation) extends ReferencedResource { @@ -138,6 +148,10 @@ object HBaseResources{ sr.rs } + implicit def GetResToResult(gr: GetResource): Array[Result] = { + gr.rs + } + implicit def TableResToTable(tr: TableResource): Table = { tr.table } http://git-wip-us.apache.org/repos/asf/hbase/blob/6868c636/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala index 67580b0..5e11356 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala @@ -29,4 +29,6 @@ object HBaseSparkConf{ val defaultCachingSize = 1000 val BATCH_NUM = "spark.hbase.batchNum" val defaultBatchNum = 1000 + val BULKGET_SIZE = "spark.hbase.bulkGetSize" + val defaultBulkGetSize = 1000 } http://git-wip-us.apache.org/repos/asf/hbase/blob/6868c636/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala index eb9d39a..f288c34 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala @@ -17,6 +17,8 @@ package org.apache.hadoop.hbase.spark.datasources +import java.util.ArrayList + import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.spark.{ScanRange, SchemaQualifierDefinition, HBaseRelation, SparkSQLPushDownFilter} import org.apache.hadoop.hbase.spark.hbase._ @@ -32,7 +34,12 @@ class HBaseTableScanRDD(relation: HBaseRelation, val columns: Seq[SchemaQualifierDefinition] = Seq.empty )extends RDD[Result](relation.sqlContext.sparkContext, Nil) with Logging { private def sparkConf = SparkEnv.get.conf - var ranges = Seq.empty[Range] + @transient var ranges = Seq.empty[Range] + @transient var points = Seq.empty[Array[Byte]] + def addPoint(p: Array[Byte]) { + points :+= p + } + def addRange(r: ScanRange) = { val lower = if (r.lowerBound != null && r.lowerBound.length > 0) { Some(Bound(r.lowerBound, r.isLowerBoundEqualTo)) @@ -65,12 +72,13 @@ class HBaseTableScanRDD(relation: HBaseRelation, logDebug(s"There are ${regions.size} regions") val ps = regions.flatMap { x => val rs = Ranges.and(Range(x), ranges) - if (rs.size > 0) { + val ps = Points.and(Range(x), points) + if (rs.size > 0 || ps.size > 0) { if(log.isDebugEnabled) { rs.foreach(x => logDebug(x.toString)) } idx += 1 - Some(HBaseScanPartition(idx - 1, x, rs, SerializedFilter.toSerializedTypedFilter(filter))) + Some(HBaseScanPartition(idx - 1, x, rs, ps, SerializedFilter.toSerializedTypedFilter(filter))) } else { None } @@ -86,6 +94,57 @@ class HBaseTableScanRDD(relation: HBaseRelation, }.toSeq } + private def buildGets( + tbr: TableResource, + g: Seq[Array[Byte]], + filter: Option[SparkSQLPushDownFilter], + columns: Seq[SchemaQualifierDefinition]): Iterator[Result] = { + g.grouped(relation.bulkGetSize).flatMap{ x => + val gets = new ArrayList[Get]() + x.foreach{ y => + val g = new Get(y) + columns.foreach { d => + if (d.columnFamilyBytes.length > 0) { + g.addColumn(d.columnFamilyBytes, d.qualifierBytes) + } + } + filter.foreach(g.setFilter(_)) + gets.add(g) + } + val tmp = tbr.get(gets) + rddResources.addResource(tmp) + toResultIterator(tmp) + } + } + + private def toResultIterator(result: GetResource): Iterator[Result] = { + val iterator = new Iterator[Result] { + var idx = 0 + var cur: Option[Result] = None + override def hasNext: Boolean = { + while(idx < result.length && cur.isEmpty) { + val r = result(idx) + idx += 1 + if (!r.isEmpty) { + cur = Some(r) + } + } + if (cur.isEmpty) { + rddResources.release(result) + } + cur.isDefined + } + override def next(): Result = { + hasNext + val ret = cur.get + cur = None + ret + } + } + iterator + } + + private def buildScan(range: Range, filter: Option[SparkSQLPushDownFilter], columns: Seq[SchemaQualifierDefinition]): Scan = { @@ -130,6 +189,7 @@ class HBaseTableScanRDD(relation: HBaseRelation, } iterator } + lazy val rddResources = RDDResources(new mutable.HashSet[Resource]()) private def close() { @@ -138,18 +198,29 @@ class HBaseTableScanRDD(relation: HBaseRelation, override def compute(split: Partition, context: TaskContext): Iterator[Result] = { val partition = split.asInstanceOf[HBaseScanPartition] - + val filter = SerializedFilter.fromSerializedFilter(partition.sf) val scans = partition.scanRanges - .map(buildScan(_, SerializedFilter.fromSerializedFilter(partition.sf), columns)) + .map(buildScan(_, filter, columns)) val tableResource = TableResource(relation) context.addTaskCompletionListener(context => close()) - val sIts = scans.par - .map(tableResource.getScanner(_)) - .map(toResultIterator(_)) + val points = partition.points + val gIt: Iterator[Result] = { + if (points.isEmpty) { + Iterator.empty: Iterator[Result] + } else { + buildGets(tableResource, points, filter, columns) + } + } + val rIts = scans.par + .map { scan => + val scanner = tableResource.getScanner(scan) + rddResources.addResource(scanner) + scanner + }.map(toResultIterator(_)) .fold(Iterator.empty: Iterator[Result]){ case (x, y) => x ++ y - } - sIts + } ++ gIt + rIts } } @@ -176,6 +247,7 @@ private[hbase] case class HBaseScanPartition( override val index: Int, val regions: HBaseRegion, val scanRanges: Seq[Range], + val points: Seq[Array[Byte]], val sf: SerializedFilter) extends Partition case class RDDResources(set: mutable.HashSet[Resource]) {