Repository: incubator-carbondata Updated Branches: refs/heads/master 72900c553 -> f47bbc2c2
fix bug in late decode optimizer and strategy Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/07761876 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/07761876 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/07761876 Branch: refs/heads/master Commit: 07761876e45bb76d9932fd2009108c722b718280 Parents: 72900c5 Author: QiangCai <qiang...@qq.com> Authored: Fri Dec 2 07:50:08 2016 +0800 Committer: jackylk <jacky.li...@huawei.com> Committed: Fri Dec 2 12:27:02 2016 +0800 ---------------------------------------------------------------------- conf/dataload.properties.template | 4 +- examples/spark2/src/main/resources/data.csv | 20 +- .../carbondata/examples/CarbonExample.scala | 36 +- .../sql/CarbonDatasourceHadoopRelation.scala | 6 +- .../spark/sql/CarbonDictionaryDecoder.scala | 151 +++++++- .../execution/CarbonLateDecodeStrategy.scala | 345 ++++++++++++++++++- .../sql/optimizer/CarbonLateDecodeRule.scala | 101 +----- 7 files changed, 533 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/conf/dataload.properties.template ---------------------------------------------------------------------- diff --git a/conf/dataload.properties.template b/conf/dataload.properties.template index d5e9d6a..cfafb4c 100644 --- a/conf/dataload.properties.template +++ b/conf/dataload.properties.template @@ -18,14 +18,14 @@ #carbon store path # you should change to the code path of your local machine -carbon.storelocation=/Users/jackylk/code/incubator-carbondata/examples/spark2/target/store +carbon.storelocation=/home/david/Documents/incubator-carbondata/examples/spark2/target/store #true: use kettle to load data #false: use new flow to load data use_kettle=true # you should change to the code path of your local machine -carbon.kettle.home=/Users/jackylk/code/incubator-carbondata/processing/carbonplugins +carbon.kettle.home=/home/david/Documents/incubator-carbondata/processing/carbonplugins #csv delimiter character delimiter=, http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/examples/spark2/src/main/resources/data.csv ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/resources/data.csv b/examples/spark2/src/main/resources/data.csv index 5d3169e..83ea3b3 100644 --- a/examples/spark2/src/main/resources/data.csv +++ b/examples/spark2/src/main/resources/data.csv @@ -1,11 +1,11 @@ shortField,intField,bigintField,doubleField,stringField,timestampField -1, 10, 100, 48.4, spark, 2015/4/23 -5, 17, 140, 43.4, spark, 2015/7/27 -1, 11, 100, 44.4, flink, 2015/5/23 -1, 10, 150, 43.4, spark, 2015/7/24 -1, 10, 100, 47.4, spark, 2015/7/23 -3, 14, 160, 43.4, hive, 2015/7/26 -2, 10, 100, 43.4, impala, 2015/7/23 -1, 10, 100, 43.4, spark, 2015/5/23 -4, 16, 130, 42.4, impala, 2015/7/23 -1, 10, 100, 43.4, spark, 2015/7/23 +1,10,100,48.4,spark,2015/4/23 +5,17,140,43.4,spark,2015/7/27 +1,11,100,44.4,flink,2015/5/23 +1,10,150,43.4,spark,2015/7/24 +1,10,100,47.4,spark,2015/7/23 +3,14,160,43.4,hive,2015/7/26 +2,10,100,43.4,impala,2015/7/23 +1,10,100,43.4,spark,2015/5/23 +4,16,130,42.4,impala,2015/7/23 +1,10,100,43.4,spark,2015/7/23 http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala index 75fdd1c..d3a7e86 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala @@ -26,7 +26,7 @@ object CarbonExample { def main(args: Array[String]): Unit = { // to run the example, plz change this path to your local machine path - val rootPath = "/Users/jackylk/code/incubator-carbondata" + val rootPath = "/home/david/Documents/incubator-carbondata" val spark = SparkSession .builder() .master("local") @@ -38,10 +38,10 @@ object CarbonExample { spark.sparkContext.setLogLevel("WARN") // Drop table - spark.sql("DROP TABLE IF EXISTS carbon_table") - spark.sql("DROP TABLE IF EXISTS csv_table") - - // Create table +// spark.sql("DROP TABLE IF EXISTS carbon_table") +// spark.sql("DROP TABLE IF EXISTS csv_table") +// +// // Create table spark.sql( s""" | CREATE TABLE carbon_table( @@ -96,14 +96,26 @@ object CarbonExample { FROM carbon_table """).show -// spark.sql(""" -// SELECT sum(intField), stringField -// FROM carbon_table -// GROUP BY stringField -// """).show + spark.sql(""" + SELECT * + FROM carbon_table where length(stringField) = 5 + """).show + + spark.sql(""" + SELECT sum(intField), stringField + FROM carbon_table + GROUP BY stringField + """).show + + spark.sql( + """ + |select t1.*, t2.* + |from carbon_table t1, carbon_table t2 + |where t1.stringField = t2.stringField + """.stripMargin).show // Drop table - spark.sql("DROP TABLE IF EXISTS carbon_table") - spark.sql("DROP TABLE IF EXISTS csv_table") +// spark.sql("DROP TABLE IF EXISTS carbon_table") +// spark.sql("DROP TABLE IF EXISTS csv_table") } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 24182ec..3b951ba 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -41,7 +41,7 @@ case class CarbonDatasourceHadoopRelation( paths: Array[String], parameters: Map[String, String], tableSchema: Option[StructType]) - extends BaseRelation with PrunedFilteredScan { + extends BaseRelation { lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head) lazy val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier) @@ -59,7 +59,7 @@ case class CarbonDatasourceHadoopRelation( override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema) - override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { val job = new Job(new JobConf()) val conf = new Configuration(job.getConfiguration) val filterExpression: Option[Expression] = filters.flatMap { filter => @@ -74,5 +74,5 @@ case class CarbonDatasourceHadoopRelation( new CarbonScanRDD[Row](sqlContext.sparkContext, projection, filterExpression.orNull, absIdentifier, carbonTable) } - + override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0) } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index d05aefd..c7ca61d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import org.apache.spark.TaskContext +import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors.attachTree @@ -33,8 +33,7 @@ import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, ColumnIdentif import org.apache.carbondata.core.carbon.metadata.datatype.DataType import org.apache.carbondata.core.carbon.metadata.encoder.Encoding import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension -import org.apache.carbondata.core.carbon.querystatistics._ -import org.apache.carbondata.core.util.{CarbonTimeStatisticsFactory, DataTypeUtil} +import org.apache.carbondata.core.util.DataTypeUtil import org.apache.carbondata.spark.CarbonAliasDecoderRelation /** @@ -220,3 +219,149 @@ case class CarbonDictionaryDecoder( } } + + + + +class CarbonDecoderRDD(relations: Seq[CarbonDecoderRelation], + profile: CarbonProfile, + aliasMap: CarbonAliasDecoderRelation, + prev: RDD[Row], + output: Seq[Attribute]) + extends RDD[Row](prev) { + + def canBeDecoded(attr: Attribute): Boolean = { + profile match { + case ip: IncludeProfile if ip.attributes.nonEmpty => + ip.attributes + .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId) + case ep: ExcludeProfile => + !ep.attributes + .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId) + case _ => true + } + } + + def convertCarbonToSparkDataType(carbonDimension: CarbonDimension, + relation: CarbonRelation): types.DataType = { + carbonDimension.getDataType match { + case DataType.STRING => StringType + case DataType.SHORT => ShortType + case DataType.INT => IntegerType + case DataType.LONG => LongType + case DataType.DOUBLE => DoubleType + case DataType.BOOLEAN => BooleanType + case DataType.DECIMAL => + val scale: Int = carbonDimension.getColumnSchema.getScale + val precision: Int = carbonDimension.getColumnSchema.getPrecision + if (scale == 0 && precision == 0) { + DecimalType(18, 2) + } else { + DecimalType(precision, scale) + } + case DataType.TIMESTAMP => TimestampType + case DataType.STRUCT => + CarbonMetastoreTypes + .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>") + case DataType.ARRAY => + CarbonMetastoreTypes + .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>") + } + } + + val getDictionaryColumnIds = { + val dictIds: Array[(String, ColumnIdentifier, DataType)] = output.map { a => + val attr = aliasMap.getOrElse(a, a) + val relation = relations.find(p => p.contains(attr)) + if(relation.isDefined && canBeDecoded(attr)) { + val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable + val carbonDimension = + carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name) + if (carbonDimension != null && + carbonDimension.hasEncoding(Encoding.DICTIONARY) && + !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) { + (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier, + carbonDimension.getDataType) + } else { + (null, null, null) + } + } else { + (null, null, null) + } + + }.toArray + dictIds + } + + override def compute(split: Partition, context: TaskContext): Iterator[Row] = { + val storepath = CarbonEnv.get.carbonMetastore.storePath + val absoluteTableIdentifiers = relations.map { relation => + val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable + (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) + }.toMap + + val cacheProvider: CacheProvider = CacheProvider.getInstance + val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = + cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storepath) + val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers, + forwardDictionaryCache) + val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2) + // add a task completion listener to clear dictionary that is a decisive factor for + // LRU eviction policy + val dictionaryTaskCleaner = TaskContext.get + dictionaryTaskCleaner.addTaskCompletionListener(context => + dicts.foreach { dictionary => + if (null != dictionary) { + dictionary.clear + } + } + ) + val iter = firstParent[Row].iterator(split, context) + new Iterator[Row] { + var flag = true + var total = 0L + + override final def hasNext: Boolean = iter.hasNext + + override final def next(): Row = { + val startTime = System.currentTimeMillis() + val data = iter.next().asInstanceOf[GenericRow].toSeq.toArray + dictIndex.foreach { index => + if ( data(index) != null) { + data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index) + .getDictionaryValueForKey(data(index).asInstanceOf[Int]), + getDictionaryColumnIds(index)._3) + } + } + new GenericRow(data) + } + } + } + + private def isRequiredToDecode = { + getDictionaryColumnIds.find(p => p._1 != null) match { + case Some(value) => true + case _ => false + } + } + + private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier], + cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = { + val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f => + if (f._2 != null) { + try { + cache.get(new DictionaryColumnUniqueIdentifier( + atiMap(f._1).getCarbonTableIdentifier, + f._2, f._3)) + } catch { + case _: Throwable => null + } + } else { + null + } + } + dicts + } + + override protected def getPartitions: Array[Partition] = firstParent[Row].partitions +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala index 4ae8d61..c73fde6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala @@ -17,16 +17,47 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.{CarbonDictionaryCatalystDecoder, CarbonDictionaryDecoder} +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.CatalystTypeConverters._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions.{Attribute, _} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.optimizer.CarbonDecoderRelation +import org.apache.spark.sql.sources.{BaseRelation, Filter} +import org.apache.spark.sql.types.{IntegerType, StringType} +import org.apache.spark.unsafe.types.UTF8String + +import org.apache.carbondata.spark.CarbonAliasDecoderRelation + + + /** * Carbon strategy for late decode (convert dictionary key to value as late as possible), which * can improve the aggregation performance and reduce memory usage */ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { + val PUSHED_FILTERS = "PushedFilters" + def apply(plan: LogicalPlan): Seq[SparkPlan] = { plan match { + case PhysicalOperation(projects, filters, l: LogicalRelation) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation] + pruneFilterProject( + l, + projects, + filters, + (a, f, needDecoder) => toCatalystRDD(l, a, relation.buildScan( + a.map(_.name).toArray, f), needDecoder)) :: + Nil case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) => CarbonDictionaryDecoder(relations, profile, @@ -37,4 +68,316 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } } + + def getDecoderRDD(logicalRelation: LogicalRelation, + projectExprsNeedToDecode: ArrayBuffer[AttributeReference], + rdd: RDD[Row], + output: Seq[Attribute]): RDD[Row] = { + val table = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation] + val relation = CarbonDecoderRelation(logicalRelation.attributeMap, + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]) + val attrs = projectExprsNeedToDecode.map { attr => + val newAttr = AttributeReference(attr.name, + attr.dataType, + attr.nullable, + attr.metadata)(attr.exprId, Option(table.carbonRelation.tableName)) + relation.addAttribute(newAttr) + newAttr + } + new CarbonDecoderRDD(Seq(relation), IncludeProfile(attrs), + CarbonAliasDecoderRelation(), rdd, output) + } + + private[this] def toCatalystRDD( + relation: LogicalRelation, + output: Seq[Attribute], + rdd: RDD[Row], + needoDecode: ArrayBuffer[AttributeReference]): + RDD[InternalRow] = { + val newRdd = if (needoDecode.size > 0) { + getDecoderRDD(relation, needoDecode, rdd, output) + } else { + rdd + } + if (relation.relation.needConversion) { + execution.RDDConversions.rowToRowRdd(newRdd, output.map(_.dataType)) + } else { + newRdd.asInstanceOf[RDD[InternalRow]] + } + } + + protected def pruneFilterProject( + relation: LogicalRelation, + projects: Seq[NamedExpression], + filterPredicates: Seq[Expression], + scanBuilder: (Seq[Attribute], Array[Filter], + ArrayBuffer[AttributeReference]) => + RDD[InternalRow]) = { + pruneFilterProjectRaw( + relation, + projects, + filterPredicates, + (requestedColumns, _, pushedFilters, a) => { + scanBuilder(requestedColumns, pushedFilters.toArray, a) + }) + } + + protected def pruneFilterProjectRaw( + relation: LogicalRelation, + projects: Seq[NamedExpression], + filterPredicates: Seq[Expression], + scanBuilder: (Seq[Attribute], Seq[Expression], + Seq[Filter], ArrayBuffer[AttributeReference]) => + RDD[InternalRow]) = { + + val projectSet = AttributeSet(projects.flatMap(_.references)) + val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) + + val candidatePredicates = filterPredicates.map { + _ transform { + case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes. + } + } + + val (unhandledPredicates, pushedFilters) = + selectFilters(relation.relation, candidatePredicates) + + // A set of column attributes that are only referenced by pushed down filters. We can eliminate + // them from requested columns. + val handledSet = { + val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains) + val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references)) + AttributeSet(handledPredicates.flatMap(_.references)) -- + (projectSet ++ unhandledSet).map(relation.attributeMap) + } + + // Combines all Catalyst filter `Expression`s that are either not convertible to data source + // `Filter`s or cannot be handled by `relation`. + val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) + val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation] + val map = table.carbonRelation.metaData.dictionaryMap + + val metadata: Map[String, String] = { + val pairs = ArrayBuffer.empty[(String, String)] + + if (pushedFilters.nonEmpty) { + pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]")) + } + pairs.toMap + } + + + val needDecoder = ArrayBuffer[AttributeReference]() + filterCondition match { + case Some(exp: Expression) => + exp.references.collect { + case attr: AttributeReference => + val dict = map.get(attr.name) + if (dict.isDefined && dict.get) { + needDecoder += attr + } + } + case None => + } + + projects.map { + case attr: AttributeReference => + case Alias(attr: AttributeReference, _) => + case others => + others.references.map { f => + val dictionary = map.get(f.name) + if (dictionary.isDefined && dictionary.get) { + needDecoder += f.asInstanceOf[AttributeReference] + } + } + } + + if (projects.map(_.toAttribute) == projects && + projectSet.size == projects.size && + filterSet.subsetOf(projectSet)) { + // When it is possible to just use column pruning to get the right projection and + // when the columns of this projection are enough to evaluate all filter conditions, + // just do a scan followed by a filter, with no extra project. + val requestedColumns = projects + // Safe due to if above. + .asInstanceOf[Seq[Attribute]] + // Match original case of attributes. + .map(relation.attributeMap) + // Don't request columns that are only referenced by pushed filters. + .filterNot(handledSet.contains) + val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder) + + val updateProject = projects.map { expr => + var attr = expr.toAttribute.asInstanceOf[AttributeReference] + if (!needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) { + val dict = map.get(attr.name) + if (dict.isDefined && dict.get) { + attr = AttributeReference(attr.name, IntegerType, attr.nullable, attr.metadata)(attr + .exprId, attr.qualifier) + } + } + attr + } + val scan = execution.DataSourceScanExec.create( + updateProject, + scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder), + relation.relation, metadata, relation.metastoreTableIdentifier) + filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) + } else { + // Don't request columns that are only referenced by pushed filters. + val requestedColumns = + (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq + val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder) + val scan = execution.DataSourceScanExec.create( + updateRequestedColumns, + scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder), + relation.relation, metadata, relation.metastoreTableIdentifier) + execution.ProjectExec( + projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)) + } + } + + def updateRequestedColumnsFunc(requestedColumns: Seq[AttributeReference], + relation: CarbonDatasourceHadoopRelation, + needDecoder: ArrayBuffer[AttributeReference]): Seq[AttributeReference] = { + val map = relation.carbonRelation.metaData.dictionaryMap + requestedColumns.map { attr => + if (needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) { + attr + } else { + val dict = map.get(attr.name) + if (dict.isDefined && dict.get) { + AttributeReference(attr.name, + IntegerType, + attr.nullable, + attr.metadata)(attr.exprId, attr.qualifier) + } else { + attr + } + } + } + } + + + protected[sql] def selectFilters( + relation: BaseRelation, + predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = { + + // For conciseness, all Catalyst filter expressions of type `expressions.Expression` below are + // called `predicate`s, while all data source filters of type `sources.Filter` are simply called + // `filter`s. + + val translated: Seq[(Expression, Filter)] = + for { + predicate <- predicates + filter <- translateFilter(predicate) + } yield predicate -> filter + + // A map from original Catalyst expressions to corresponding translated data source filters. + val translatedMap: Map[Expression, Filter] = translated.toMap + + // Catalyst predicate expressions that cannot be translated to data source filters. + val unrecognizedPredicates = predicates.filterNot(translatedMap.contains) + + // Data source filters that cannot be handled by `relation`. The semantic of a unhandled filter + // at here is that a data source may not be able to apply this filter to every row + // of the underlying dataset. + val unhandledFilters = relation.unhandledFilters(translatedMap.values.toArray).toSet + + val (unhandled, handled) = translated.partition { + case (predicate, filter) => + unhandledFilters.contains(filter) + } + + // Catalyst predicate expressions that can be translated to data source filters, but cannot be + // handled by `relation`. + val (unhandledPredicates, _) = unhandled.unzip + + // Translated data source filters that can be handled by `relation` + val (_, handledFilters) = handled.unzip + + // translated contains all filters that have been converted to the public Filter interface. + // We should always push them to the data source no matter whether the data source can apply + // a filter to every row or not. + val (_, translatedFilters) = translated.unzip + + (unrecognizedPredicates ++ unhandledPredicates, translatedFilters) + } + + /** + * Tries to translate a Catalyst [[Expression]] into data source [[Filter]]. + * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`. + */ + protected[sql] def translateFilter(predicate: Expression): Option[Filter] = { + predicate match { + case expressions.EqualTo(a: Attribute, Literal(v, t)) => + Some(sources.EqualTo(a.name, convertToScala(v, t))) + case expressions.EqualTo(Literal(v, t), a: Attribute) => + Some(sources.EqualTo(a.name, convertToScala(v, t))) + + case expressions.EqualNullSafe(a: Attribute, Literal(v, t)) => + Some(sources.EqualNullSafe(a.name, convertToScala(v, t))) + case expressions.EqualNullSafe(Literal(v, t), a: Attribute) => + Some(sources.EqualNullSafe(a.name, convertToScala(v, t))) + + case expressions.GreaterThan(a: Attribute, Literal(v, t)) => + Some(sources.GreaterThan(a.name, convertToScala(v, t))) + case expressions.GreaterThan(Literal(v, t), a: Attribute) => + Some(sources.LessThan(a.name, convertToScala(v, t))) + + case expressions.LessThan(a: Attribute, Literal(v, t)) => + Some(sources.LessThan(a.name, convertToScala(v, t))) + case expressions.LessThan(Literal(v, t), a: Attribute) => + Some(sources.GreaterThan(a.name, convertToScala(v, t))) + + case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, t)) => + Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t))) + case expressions.GreaterThanOrEqual(Literal(v, t), a: Attribute) => + Some(sources.LessThanOrEqual(a.name, convertToScala(v, t))) + + case expressions.LessThanOrEqual(a: Attribute, Literal(v, t)) => + Some(sources.LessThanOrEqual(a.name, convertToScala(v, t))) + case expressions.LessThanOrEqual(Literal(v, t), a: Attribute) => + Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t))) + + case expressions.InSet(a: Attribute, set) => + val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType) + Some(sources.In(a.name, set.toArray.map(toScala))) + + // Because we only convert In to InSet in Optimizer when there are more than certain + // items. So it is possible we still get an In expression here that needs to be pushed + // down. + case expressions.In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) => + val hSet = list.map(e => e.eval(EmptyRow)) + val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType) + Some(sources.In(a.name, hSet.toArray.map(toScala))) + + case expressions.IsNull(a: Attribute) => + Some(sources.IsNull(a.name)) + case expressions.IsNotNull(a: Attribute) => + Some(sources.IsNotNull(a.name)) + + case expressions.And(left, right) => + (translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And) + + case expressions.Or(left, right) => + for { + leftFilter <- translateFilter(left) + rightFilter <- translateFilter(right) + } yield sources.Or(leftFilter, rightFilter) + + case expressions.Not(child) => + translateFilter(child).map(sources.Not) + + case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) => + Some(sources.StringStartsWith(a.name, v.toString)) + + case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) => + Some(sources.StringEndsWith(a.name, v.toString)) + + case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) => + Some(sources.StringContains(a.name, v.toString)) + case _ => None + } + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index c4b5d70..6b6960d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -55,110 +55,13 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { } } - def updateCarbonRelationDataType(plan: LogicalPlan): LogicalPlan = { - val relations = plan collect { - case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation] - } - if(relations.nonEmpty && !isOptimized(plan)) { - val map = mutable.HashMap[ExprId, AttributeReference]() - val updateRelationPlan = plan transformDown { - case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation] - val newRelation = updateRelation(relation) - val newl = LogicalRelation(newRelation, l.expectedOutputAttributes, l - .metastoreTableIdentifier) - for(i <- 0 until l.output.size) { - map.put(l.output(i).exprId, newl.output(i)) - } - newl - } - - updateRelationPlan transformDown { - case sort: Sort => - val sortExprs = sort.order.map { s => - s.transform { - case attr: AttributeReference => - map.getOrElse(attr.exprId, attr) - }.asInstanceOf[SortOrder] - } - Sort(sortExprs, sort.global, sort.child) - case agg: Aggregate => - val aggExps = agg.aggregateExpressions.map { aggExp => - aggExp transform { - case attr: AttributeReference => - map.getOrElse(attr.exprId, attr) - case other => other - } - }.asInstanceOf[Seq[NamedExpression]] - - val grpExps = agg.groupingExpressions.map { gexp => - gexp.transform { - case attr: AttributeReference => - map.getOrElse(attr.exprId, attr) - } - } - Aggregate(grpExps, aggExps, agg.child) - case expand: Expand => - expand.transformExpressions { - case attr: AttributeReference => - map.getOrElse(attr.exprId, attr) - } - case filter: Filter => - val filterExps = filter.condition transform { - case attr: AttributeReference => - map.getOrElse(attr.exprId, attr) - } - Filter(filterExps, filter.child) - case p: Project if relations.nonEmpty => - val prExps = p.projectList.map { prExp => - prExp.transform { - case attr: AttributeReference => - map.getOrElse(attr.exprId, attr) - } - }.asInstanceOf[Seq[NamedExpression]] - Project(prExps, p.child) - case wd: Window if relations.nonEmpty => - val prExps = wd.output.map { prExp => - prExp.transform { - case attr: AttributeReference => - map.getOrElse(attr.exprId, attr) - } - }.asInstanceOf[Seq[Attribute]] - val wdExps = wd.windowExpressions.map { gexp => - gexp.transform { - case attr: AttributeReference => - map.getOrElse(attr.exprId, attr) - } - }.asInstanceOf[Seq[NamedExpression]] - val partitionSpec = wd.partitionSpec.map{ exp => - exp.transform { - case attr: AttributeReference => - map.getOrElse(attr.exprId, attr) - } - } - val orderSpec = wd.orderSpec.map { exp => - exp.transform { - case attr: AttributeReference => - map.getOrElse(attr.exprId, attr) - } - }.asInstanceOf[Seq[SortOrder]] - Window(wdExps, partitionSpec, orderSpec, wd.child) - case others => others - } - } else { - plan - } - } - def apply(plan: LogicalPlan): LogicalPlan = { - val updatePlan = updateCarbonRelationDataType(plan) - relations = collectCarbonRelation(updatePlan) + relations = collectCarbonRelation(plan) if (relations.nonEmpty && !isOptimized(plan)) { LOGGER.info("Starting to optimize plan") val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("") val queryStatistic = new QueryStatistic() - val result = transformCarbonPlan(updatePlan, relations) + val result = transformCarbonPlan(plan, relations) queryStatistic.addStatistics("Time taken for Carbon Optimizer to optimize: ", System.currentTimeMillis) recorder.recordStatistics(queryStatistic)