Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 72900c553 -> f47bbc2c2


fix bug in late decode optimizer and strategy


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/07761876
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/07761876
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/07761876

Branch: refs/heads/master
Commit: 07761876e45bb76d9932fd2009108c722b718280
Parents: 72900c5
Author: QiangCai <qiang...@qq.com>
Authored: Fri Dec 2 07:50:08 2016 +0800
Committer: jackylk <jacky.li...@huawei.com>
Committed: Fri Dec 2 12:27:02 2016 +0800

----------------------------------------------------------------------
 conf/dataload.properties.template               |   4 +-
 examples/spark2/src/main/resources/data.csv     |  20 +-
 .../carbondata/examples/CarbonExample.scala     |  36 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   6 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     | 151 +++++++-
 .../execution/CarbonLateDecodeStrategy.scala    | 345 ++++++++++++++++++-
 .../sql/optimizer/CarbonLateDecodeRule.scala    | 101 +-----
 7 files changed, 533 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/conf/dataload.properties.template
----------------------------------------------------------------------
diff --git a/conf/dataload.properties.template 
b/conf/dataload.properties.template
index d5e9d6a..cfafb4c 100644
--- a/conf/dataload.properties.template
+++ b/conf/dataload.properties.template
@@ -18,14 +18,14 @@
 
 #carbon store path
 # you should change to the code path of your local machine
-carbon.storelocation=/Users/jackylk/code/incubator-carbondata/examples/spark2/target/store
+carbon.storelocation=/home/david/Documents/incubator-carbondata/examples/spark2/target/store
 
 #true: use kettle to load data
 #false: use new flow to load data
 use_kettle=true
 
 # you should change to the code path of your local machine
-carbon.kettle.home=/Users/jackylk/code/incubator-carbondata/processing/carbonplugins
+carbon.kettle.home=/home/david/Documents/incubator-carbondata/processing/carbonplugins
 
 #csv delimiter character
 delimiter=,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/examples/spark2/src/main/resources/data.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/data.csv 
b/examples/spark2/src/main/resources/data.csv
index 5d3169e..83ea3b3 100644
--- a/examples/spark2/src/main/resources/data.csv
+++ b/examples/spark2/src/main/resources/data.csv
@@ -1,11 +1,11 @@
 shortField,intField,bigintField,doubleField,stringField,timestampField
-1, 10, 100, 48.4, spark, 2015/4/23
-5, 17, 140, 43.4, spark, 2015/7/27
-1, 11, 100, 44.4, flink, 2015/5/23
-1, 10, 150, 43.4, spark, 2015/7/24
-1, 10, 100, 47.4, spark, 2015/7/23
-3, 14, 160, 43.4, hive, 2015/7/26
-2, 10, 100, 43.4, impala, 2015/7/23
-1, 10, 100, 43.4, spark, 2015/5/23
-4, 16, 130, 42.4, impala, 2015/7/23
-1, 10, 100, 43.4, spark, 2015/7/23
+1,10,100,48.4,spark,2015/4/23
+5,17,140,43.4,spark,2015/7/27
+1,11,100,44.4,flink,2015/5/23
+1,10,150,43.4,spark,2015/7/24
+1,10,100,47.4,spark,2015/7/23
+3,14,160,43.4,hive,2015/7/26
+2,10,100,43.4,impala,2015/7/23
+1,10,100,43.4,spark,2015/5/23
+4,16,130,42.4,impala,2015/7/23
+1,10,100,43.4,spark,2015/7/23

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index 75fdd1c..d3a7e86 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -26,7 +26,7 @@ object CarbonExample {
 
   def main(args: Array[String]): Unit = {
     // to run the example, plz change this path to your local machine path
-    val rootPath = "/Users/jackylk/code/incubator-carbondata"
+    val rootPath = "/home/david/Documents/incubator-carbondata"
     val spark = SparkSession
         .builder()
         .master("local")
@@ -38,10 +38,10 @@ object CarbonExample {
     spark.sparkContext.setLogLevel("WARN")
 
     // Drop table
-    spark.sql("DROP TABLE IF EXISTS carbon_table")
-    spark.sql("DROP TABLE IF EXISTS csv_table")
-
-    // Create table
+//    spark.sql("DROP TABLE IF EXISTS carbon_table")
+//    spark.sql("DROP TABLE IF EXISTS csv_table")
+//
+//    // Create table
     spark.sql(
       s"""
          | CREATE TABLE carbon_table(
@@ -96,14 +96,26 @@ object CarbonExample {
              FROM carbon_table
               """).show
 
-//    spark.sql("""
-//           SELECT sum(intField), stringField
-//           FROM carbon_table
-//           GROUP BY stringField
-//           """).show
+    spark.sql("""
+             SELECT *
+             FROM carbon_table where length(stringField) = 5
+              """).show
+
+    spark.sql("""
+           SELECT sum(intField), stringField
+           FROM carbon_table
+           GROUP BY stringField
+           """).show
+
+    spark.sql(
+      """
+        |select t1.*, t2.*
+        |from carbon_table t1, carbon_table t2
+        |where t1.stringField = t2.stringField
+      """.stripMargin).show
 
     // Drop table
-    spark.sql("DROP TABLE IF EXISTS carbon_table")
-    spark.sql("DROP TABLE IF EXISTS csv_table")
+//    spark.sql("DROP TABLE IF EXISTS carbon_table")
+//    spark.sql("DROP TABLE IF EXISTS csv_table")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 24182ec..3b951ba 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -41,7 +41,7 @@ case class CarbonDatasourceHadoopRelation(
     paths: Array[String],
     parameters: Map[String, String],
     tableSchema: Option[StructType])
-  extends BaseRelation with PrunedFilteredScan {
+  extends BaseRelation {
 
   lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
   lazy val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier)
@@ -59,7 +59,7 @@ case class CarbonDatasourceHadoopRelation(
 
   override def schema: StructType = 
tableSchema.getOrElse(carbonRelation.schema)
 
-  override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+  def buildScan(requiredColumns: Array[String], filters: Array[Filter]): 
RDD[Row] = {
     val job = new Job(new JobConf())
     val conf = new Configuration(job.getConfiguration)
     val filterExpression: Option[Expression] = filters.flatMap { filter =>
@@ -74,5 +74,5 @@ case class CarbonDatasourceHadoopRelation(
     new CarbonScanRDD[Row](sqlContext.sparkContext, projection, 
filterExpression.orNull,
       absIdentifier, carbonTable)
   }
-
+  override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new 
Array[Filter](0)
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index d05aefd..c7ca61d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors.attachTree
@@ -33,8 +33,7 @@ import 
org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, ColumnIdentif
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
 import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.carbon.querystatistics._
-import org.apache.carbondata.core.util.{CarbonTimeStatisticsFactory, 
DataTypeUtil}
+import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 
 /**
@@ -220,3 +219,149 @@ case class CarbonDictionaryDecoder(
   }
 
 }
+
+
+
+
+class CarbonDecoderRDD(relations: Seq[CarbonDecoderRelation],
+                profile: CarbonProfile,
+                aliasMap: CarbonAliasDecoderRelation,
+                prev: RDD[Row],
+                       output: Seq[Attribute])
+    extends RDD[Row](prev) {
+
+  def canBeDecoded(attr: Attribute): Boolean = {
+    profile match {
+      case ip: IncludeProfile if ip.attributes.nonEmpty =>
+        ip.attributes
+            .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == 
attr.exprId)
+      case ep: ExcludeProfile =>
+        !ep.attributes
+            .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == 
attr.exprId)
+      case _ => true
+    }
+  }
+
+  def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
+                                   relation: CarbonRelation): types.DataType = 
{
+    carbonDimension.getDataType match {
+      case DataType.STRING => StringType
+      case DataType.SHORT => ShortType
+      case DataType.INT => IntegerType
+      case DataType.LONG => LongType
+      case DataType.DOUBLE => DoubleType
+      case DataType.BOOLEAN => BooleanType
+      case DataType.DECIMAL =>
+        val scale: Int = carbonDimension.getColumnSchema.getScale
+        val precision: Int = carbonDimension.getColumnSchema.getPrecision
+        if (scale == 0 && precision == 0) {
+          DecimalType(18, 2)
+        } else {
+          DecimalType(precision, scale)
+        }
+      case DataType.TIMESTAMP => TimestampType
+      case DataType.STRUCT =>
+        CarbonMetastoreTypes
+            .toDataType(s"struct<${ 
relation.getStructChildren(carbonDimension.getColName) }>")
+      case DataType.ARRAY =>
+        CarbonMetastoreTypes
+            .toDataType(s"array<${ 
relation.getArrayChildren(carbonDimension.getColName) }>")
+    }
+  }
+
+  val getDictionaryColumnIds = {
+    val dictIds: Array[(String, ColumnIdentifier, DataType)] = output.map { a 
=>
+      val attr = aliasMap.getOrElse(a, a)
+      val relation = relations.find(p => p.contains(attr))
+      if(relation.isDefined && canBeDecoded(attr)) {
+        val carbonTable = 
relation.get.carbonRelation.carbonRelation.metaData.carbonTable
+        val carbonDimension =
+          carbonTable.getDimensionByName(carbonTable.getFactTableName, 
attr.name)
+        if (carbonDimension != null &&
+            carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+          (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
+              carbonDimension.getDataType)
+        } else {
+          (null, null, null)
+        }
+      } else {
+        (null, null, null)
+      }
+
+    }.toArray
+    dictIds
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[Row] 
= {
+          val storepath = CarbonEnv.get.carbonMetastore.storePath
+    val absoluteTableIdentifiers = relations.map { relation =>
+      val carbonTable = 
relation.carbonRelation.carbonRelation.metaData.carbonTable
+      (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
+    }.toMap
+
+      val cacheProvider: CacheProvider = CacheProvider.getInstance
+      val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, 
Dictionary] =
+        cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storepath)
+      val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
+        forwardDictionaryCache)
+      val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => 
x._2)
+      // add a task completion listener to clear dictionary that is a decisive 
factor for
+      // LRU eviction policy
+      val dictionaryTaskCleaner = TaskContext.get
+      dictionaryTaskCleaner.addTaskCompletionListener(context =>
+        dicts.foreach { dictionary =>
+          if (null != dictionary) {
+            dictionary.clear
+          }
+        }
+      )
+      val iter = firstParent[Row].iterator(split, context)
+      new Iterator[Row] {
+        var flag = true
+        var total = 0L
+
+        override final def hasNext: Boolean = iter.hasNext
+
+        override final def next(): Row = {
+          val startTime = System.currentTimeMillis()
+          val data = iter.next().asInstanceOf[GenericRow].toSeq.toArray
+          dictIndex.foreach { index =>
+            if ( data(index) != null) {
+              data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
+                  .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+                getDictionaryColumnIds(index)._3)
+            }
+          }
+          new GenericRow(data)
+        }
+      }
+  }
+
+  private def isRequiredToDecode = {
+    getDictionaryColumnIds.find(p => p._1 != null) match {
+      case Some(value) => true
+      case _ => false
+    }
+  }
+
+  private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier],
+                            cache: Cache[DictionaryColumnUniqueIdentifier, 
Dictionary]) = {
+    val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f =>
+      if (f._2 != null) {
+        try {
+          cache.get(new DictionaryColumnUniqueIdentifier(
+            atiMap(f._1).getCarbonTableIdentifier,
+            f._2, f._3))
+        } catch {
+          case _: Throwable => null
+        }
+      } else {
+        null
+      }
+    }
+    dicts
+  }
+
+  override protected def getPartitions: Array[Partition] = 
firstParent[Row].partitions
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 4ae8d61..c73fde6 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -17,16 +17,47 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.{CarbonDictionaryCatalystDecoder, 
CarbonDictionaryDecoder}
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.CatalystTypeConverters._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.{IntegerType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.spark.CarbonAliasDecoderRelation
+
+
+
 
 /**
  * Carbon strategy for late decode (convert dictionary key to value as late as 
possible), which
  * can improve the aggregation performance and reduce memory usage
  */
 private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
+  val PUSHED_FILTERS = "PushedFilters"
+
   def apply(plan: LogicalPlan): Seq[SparkPlan] = {
     plan match {
+      case PhysicalOperation(projects, filters, l: LogicalRelation)
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+        pruneFilterProject(
+          l,
+          projects,
+          filters,
+          (a, f, needDecoder) => toCatalystRDD(l, a, relation.buildScan(
+            a.map(_.name).toArray, f), needDecoder)) ::
+            Nil
       case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, 
child) =>
         CarbonDictionaryDecoder(relations,
           profile,
@@ -37,4 +68,316 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
     }
   }
 
+
+  def getDecoderRDD(logicalRelation: LogicalRelation,
+                    projectExprsNeedToDecode: ArrayBuffer[AttributeReference],
+                    rdd: RDD[Row],
+                    output: Seq[Attribute]): RDD[Row] = {
+    val table = 
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+    val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
+      logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
+    val attrs = projectExprsNeedToDecode.map { attr =>
+      val newAttr = AttributeReference(attr.name,
+        attr.dataType,
+        attr.nullable,
+        attr.metadata)(attr.exprId, Option(table.carbonRelation.tableName))
+      relation.addAttribute(newAttr)
+      newAttr
+    }
+    new CarbonDecoderRDD(Seq(relation), IncludeProfile(attrs),
+      CarbonAliasDecoderRelation(), rdd, output)
+  }
+
+  private[this] def toCatalystRDD(
+                                   relation: LogicalRelation,
+                                   output: Seq[Attribute],
+                                   rdd: RDD[Row],
+                                   needoDecode: 
ArrayBuffer[AttributeReference]):
+  RDD[InternalRow] = {
+    val newRdd = if (needoDecode.size > 0) {
+      getDecoderRDD(relation, needoDecode, rdd, output)
+    } else {
+      rdd
+    }
+    if (relation.relation.needConversion) {
+      execution.RDDConversions.rowToRowRdd(newRdd, output.map(_.dataType))
+    } else {
+      newRdd.asInstanceOf[RDD[InternalRow]]
+    }
+  }
+
+  protected def pruneFilterProject(
+                                    relation: LogicalRelation,
+                                    projects: Seq[NamedExpression],
+                                    filterPredicates: Seq[Expression],
+                                    scanBuilder: (Seq[Attribute], 
Array[Filter],
+                                        ArrayBuffer[AttributeReference]) =>
+                                        RDD[InternalRow]) = {
+    pruneFilterProjectRaw(
+      relation,
+      projects,
+      filterPredicates,
+      (requestedColumns, _, pushedFilters, a) => {
+        scanBuilder(requestedColumns, pushedFilters.toArray, a)
+      })
+  }
+
+  protected def pruneFilterProjectRaw(
+                                       relation: LogicalRelation,
+                                       projects: Seq[NamedExpression],
+                                       filterPredicates: Seq[Expression],
+                                       scanBuilder: (Seq[Attribute], 
Seq[Expression],
+                                           Seq[Filter], 
ArrayBuffer[AttributeReference]) =>
+                                           RDD[InternalRow]) = {
+
+    val projectSet = AttributeSet(projects.flatMap(_.references))
+    val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
+
+    val candidatePredicates = filterPredicates.map {
+      _ transform {
+        case a: AttributeReference => relation.attributeMap(a) // Match 
original case of attributes.
+      }
+    }
+
+    val (unhandledPredicates, pushedFilters) =
+      selectFilters(relation.relation, candidatePredicates)
+
+    // A set of column attributes that are only referenced by pushed down 
filters.  We can eliminate
+    // them from requested columns.
+    val handledSet = {
+      val handledPredicates = 
filterPredicates.filterNot(unhandledPredicates.contains)
+      val unhandledSet = 
AttributeSet(unhandledPredicates.flatMap(_.references))
+      AttributeSet(handledPredicates.flatMap(_.references)) --
+          (projectSet ++ unhandledSet).map(relation.attributeMap)
+    }
+
+    // Combines all Catalyst filter `Expression`s that are either not 
convertible to data source
+    // `Filter`s or cannot be handled by `relation`.
+    val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
+    val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+    val map = table.carbonRelation.metaData.dictionaryMap
+
+    val metadata: Map[String, String] = {
+      val pairs = ArrayBuffer.empty[(String, String)]
+
+      if (pushedFilters.nonEmpty) {
+        pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
+      }
+      pairs.toMap
+    }
+
+
+    val needDecoder = ArrayBuffer[AttributeReference]()
+    filterCondition match {
+      case Some(exp: Expression) =>
+        exp.references.collect {
+          case attr: AttributeReference =>
+            val dict = map.get(attr.name)
+            if (dict.isDefined && dict.get) {
+              needDecoder += attr
+            }
+        }
+      case None =>
+    }
+
+    projects.map {
+      case attr: AttributeReference =>
+      case Alias(attr: AttributeReference, _) =>
+      case others =>
+        others.references.map { f =>
+          val dictionary = map.get(f.name)
+          if (dictionary.isDefined && dictionary.get) {
+            needDecoder += f.asInstanceOf[AttributeReference]
+          }
+        }
+    }
+
+    if (projects.map(_.toAttribute) == projects &&
+        projectSet.size == projects.size &&
+        filterSet.subsetOf(projectSet)) {
+      // When it is possible to just use column pruning to get the right 
projection and
+      // when the columns of this projection are enough to evaluate all filter 
conditions,
+      // just do a scan followed by a filter, with no extra project.
+      val requestedColumns = projects
+          // Safe due to if above.
+          .asInstanceOf[Seq[Attribute]]
+          // Match original case of attributes.
+          .map(relation.attributeMap)
+          // Don't request columns that are only referenced by pushed filters.
+          .filterNot(handledSet.contains)
+      val updateRequestedColumns = 
updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
+
+      val updateProject = projects.map { expr =>
+        var attr = expr.toAttribute.asInstanceOf[AttributeReference]
+        if (!needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) {
+          val dict = map.get(attr.name)
+          if (dict.isDefined && dict.get) {
+            attr = AttributeReference(attr.name, IntegerType, attr.nullable, 
attr.metadata)(attr
+                .exprId, attr.qualifier)
+          }
+        }
+        attr
+      }
+      val scan = execution.DataSourceScanExec.create(
+        updateProject,
+        scanBuilder(updateRequestedColumns, candidatePredicates, 
pushedFilters, needDecoder),
+        relation.relation, metadata, relation.metastoreTableIdentifier)
+      filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
+    } else {
+      // Don't request columns that are only referenced by pushed filters.
+      val requestedColumns =
+      (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
+      val updateRequestedColumns = 
updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
+      val scan = execution.DataSourceScanExec.create(
+        updateRequestedColumns,
+        scanBuilder(updateRequestedColumns, candidatePredicates, 
pushedFilters, needDecoder),
+        relation.relation, metadata, relation.metastoreTableIdentifier)
+      execution.ProjectExec(
+        projects, filterCondition.map(execution.FilterExec(_, 
scan)).getOrElse(scan))
+    }
+  }
+
+  def updateRequestedColumnsFunc(requestedColumns: Seq[AttributeReference],
+      relation: CarbonDatasourceHadoopRelation,
+      needDecoder: ArrayBuffer[AttributeReference]): Seq[AttributeReference] = 
{
+    val map = relation.carbonRelation.metaData.dictionaryMap
+    requestedColumns.map { attr =>
+      if (needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) {
+        attr
+      } else {
+        val dict = map.get(attr.name)
+        if (dict.isDefined && dict.get) {
+          AttributeReference(attr.name,
+            IntegerType,
+            attr.nullable,
+            attr.metadata)(attr.exprId, attr.qualifier)
+        } else {
+          attr
+        }
+      }
+    }
+  }
+
+
+  protected[sql] def selectFilters(
+                                    relation: BaseRelation,
+                                    predicates: Seq[Expression]): 
(Seq[Expression], Seq[Filter]) = {
+
+    // For conciseness, all Catalyst filter expressions of type 
`expressions.Expression` below are
+    // called `predicate`s, while all data source filters of type 
`sources.Filter` are simply called
+    // `filter`s.
+
+    val translated: Seq[(Expression, Filter)] =
+      for {
+        predicate <- predicates
+        filter <- translateFilter(predicate)
+      } yield predicate -> filter
+
+    // A map from original Catalyst expressions to corresponding translated 
data source filters.
+    val translatedMap: Map[Expression, Filter] = translated.toMap
+
+    // Catalyst predicate expressions that cannot be translated to data source 
filters.
+    val unrecognizedPredicates = predicates.filterNot(translatedMap.contains)
+
+    // Data source filters that cannot be handled by `relation`. The semantic 
of a unhandled filter
+    // at here is that a data source may not be able to apply this filter to 
every row
+    // of the underlying dataset.
+    val unhandledFilters = 
relation.unhandledFilters(translatedMap.values.toArray).toSet
+
+    val (unhandled, handled) = translated.partition {
+      case (predicate, filter) =>
+        unhandledFilters.contains(filter)
+    }
+
+    // Catalyst predicate expressions that can be translated to data source 
filters, but cannot be
+    // handled by `relation`.
+    val (unhandledPredicates, _) = unhandled.unzip
+
+    // Translated data source filters that can be handled by `relation`
+    val (_, handledFilters) = handled.unzip
+
+    // translated contains all filters that have been converted to the public 
Filter interface.
+    // We should always push them to the data source no matter whether the 
data source can apply
+    // a filter to every row or not.
+    val (_, translatedFilters) = translated.unzip
+
+    (unrecognizedPredicates ++ unhandledPredicates, translatedFilters)
+  }
+
+  /**
+   * Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
+   * @return a `Some[Filter]` if the input [[Expression]] is convertible, 
otherwise a `None`.
+   */
+  protected[sql] def translateFilter(predicate: Expression): Option[Filter] = {
+    predicate match {
+      case expressions.EqualTo(a: Attribute, Literal(v, t)) =>
+        Some(sources.EqualTo(a.name, convertToScala(v, t)))
+      case expressions.EqualTo(Literal(v, t), a: Attribute) =>
+        Some(sources.EqualTo(a.name, convertToScala(v, t)))
+
+      case expressions.EqualNullSafe(a: Attribute, Literal(v, t)) =>
+        Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))
+      case expressions.EqualNullSafe(Literal(v, t), a: Attribute) =>
+        Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))
+
+      case expressions.GreaterThan(a: Attribute, Literal(v, t)) =>
+        Some(sources.GreaterThan(a.name, convertToScala(v, t)))
+      case expressions.GreaterThan(Literal(v, t), a: Attribute) =>
+        Some(sources.LessThan(a.name, convertToScala(v, t)))
+
+      case expressions.LessThan(a: Attribute, Literal(v, t)) =>
+        Some(sources.LessThan(a.name, convertToScala(v, t)))
+      case expressions.LessThan(Literal(v, t), a: Attribute) =>
+        Some(sources.GreaterThan(a.name, convertToScala(v, t)))
+
+      case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
+        Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))
+      case expressions.GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
+        Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))
+
+      case expressions.LessThanOrEqual(a: Attribute, Literal(v, t)) =>
+        Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))
+      case expressions.LessThanOrEqual(Literal(v, t), a: Attribute) =>
+        Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))
+
+      case expressions.InSet(a: Attribute, set) =>
+        val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
+        Some(sources.In(a.name, set.toArray.map(toScala)))
+
+      // Because we only convert In to InSet in Optimizer when there are more 
than certain
+      // items. So it is possible we still get an In expression here that 
needs to be pushed
+      // down.
+      case expressions.In(a: Attribute, list) if 
!list.exists(!_.isInstanceOf[Literal]) =>
+        val hSet = list.map(e => e.eval(EmptyRow))
+        val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
+        Some(sources.In(a.name, hSet.toArray.map(toScala)))
+
+      case expressions.IsNull(a: Attribute) =>
+        Some(sources.IsNull(a.name))
+      case expressions.IsNotNull(a: Attribute) =>
+        Some(sources.IsNotNull(a.name))
+
+      case expressions.And(left, right) =>
+        (translateFilter(left) ++ 
translateFilter(right)).reduceOption(sources.And)
+
+      case expressions.Or(left, right) =>
+        for {
+          leftFilter <- translateFilter(left)
+          rightFilter <- translateFilter(right)
+        } yield sources.Or(leftFilter, rightFilter)
+
+      case expressions.Not(child) =>
+        translateFilter(child).map(sources.Not)
+
+      case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, 
StringType)) =>
+        Some(sources.StringStartsWith(a.name, v.toString))
+
+      case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, 
StringType)) =>
+        Some(sources.StringEndsWith(a.name, v.toString))
+
+      case expressions.Contains(a: Attribute, Literal(v: UTF8String, 
StringType)) =>
+        Some(sources.StringContains(a.name, v.toString))
+      case _ => None
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index c4b5d70..6b6960d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -55,110 +55,13 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with 
PredicateHelper {
     }
   }
 
-  def updateCarbonRelationDataType(plan: LogicalPlan): LogicalPlan = {
-    val relations = plan collect {
-      case l: LogicalRelation if 
l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-    }
-    if(relations.nonEmpty && !isOptimized(plan)) {
-      val map = mutable.HashMap[ExprId, AttributeReference]()
-      val updateRelationPlan = plan transformDown {
-        case l: LogicalRelation if 
l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-          val relation = 
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-          val newRelation = updateRelation(relation)
-          val newl = LogicalRelation(newRelation, l.expectedOutputAttributes, l
-              .metastoreTableIdentifier)
-          for(i <- 0 until l.output.size) {
-            map.put(l.output(i).exprId, newl.output(i))
-          }
-          newl
-      }
-
-      updateRelationPlan transformDown {
-        case sort: Sort =>
-          val sortExprs = sort.order.map { s =>
-            s.transform {
-              case attr: AttributeReference =>
-                map.getOrElse(attr.exprId, attr)
-            }.asInstanceOf[SortOrder]
-          }
-          Sort(sortExprs, sort.global, sort.child)
-        case agg: Aggregate =>
-          val aggExps = agg.aggregateExpressions.map { aggExp =>
-            aggExp transform {
-              case attr: AttributeReference =>
-                map.getOrElse(attr.exprId, attr)
-              case other => other
-            }
-          }.asInstanceOf[Seq[NamedExpression]]
-
-          val grpExps = agg.groupingExpressions.map { gexp =>
-            gexp.transform {
-              case attr: AttributeReference =>
-                map.getOrElse(attr.exprId, attr)
-            }
-          }
-          Aggregate(grpExps, aggExps, agg.child)
-        case expand: Expand =>
-          expand.transformExpressions {
-            case attr: AttributeReference =>
-              map.getOrElse(attr.exprId, attr)
-          }
-        case filter: Filter =>
-          val filterExps = filter.condition transform {
-            case attr: AttributeReference =>
-              map.getOrElse(attr.exprId, attr)
-          }
-          Filter(filterExps, filter.child)
-        case p: Project if relations.nonEmpty =>
-          val prExps = p.projectList.map { prExp =>
-            prExp.transform {
-              case attr: AttributeReference =>
-                map.getOrElse(attr.exprId, attr)
-            }
-          }.asInstanceOf[Seq[NamedExpression]]
-          Project(prExps, p.child)
-        case wd: Window if relations.nonEmpty =>
-          val prExps = wd.output.map { prExp =>
-            prExp.transform {
-              case attr: AttributeReference =>
-                map.getOrElse(attr.exprId, attr)
-            }
-          }.asInstanceOf[Seq[Attribute]]
-          val wdExps = wd.windowExpressions.map { gexp =>
-            gexp.transform {
-              case attr: AttributeReference =>
-                map.getOrElse(attr.exprId, attr)
-            }
-          }.asInstanceOf[Seq[NamedExpression]]
-          val partitionSpec = wd.partitionSpec.map{ exp =>
-            exp.transform {
-              case attr: AttributeReference =>
-                map.getOrElse(attr.exprId, attr)
-            }
-          }
-          val orderSpec = wd.orderSpec.map { exp =>
-            exp.transform {
-              case attr: AttributeReference =>
-                map.getOrElse(attr.exprId, attr)
-            }
-          }.asInstanceOf[Seq[SortOrder]]
-          Window(wdExps, partitionSpec, orderSpec, wd.child)
-        case others => others
-      }
-    } else {
-      plan
-    }
-  }
-
   def apply(plan: LogicalPlan): LogicalPlan = {
-    val updatePlan = updateCarbonRelationDataType(plan)
-    relations = collectCarbonRelation(updatePlan)
+    relations = collectCarbonRelation(plan)
     if (relations.nonEmpty && !isOptimized(plan)) {
       LOGGER.info("Starting to optimize plan")
       val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("")
       val queryStatistic = new QueryStatistic()
-      val result = transformCarbonPlan(updatePlan, relations)
+      val result = transformCarbonPlan(plan, relations)
       queryStatistic.addStatistics("Time taken for Carbon Optimizer to 
optimize: ",
         System.currentTimeMillis)
       recorder.recordStatistics(queryStatistic)

Reply via email to