Fixed order by limit with select * query

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/82b61d47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/82b61d47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/82b61d47

Branch: refs/heads/12-dev
Commit: 82b61d4799a0e2eea6064d4e997fa1524c7f7b1d
Parents: 68cbe15
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Tue Apr 4 15:18:46 2017 +0530
Committer: kumarvishal <kumarvishal.1...@gmail.com>
Committed: Thu Apr 6 16:28:21 2017 +0530

----------------------------------------------------------------------
 .../sortexpr/AllDataTypesTestCaseSort.scala     | 18 +++++++++--
 .../spark/sql/CarbonCatalystOperators.scala     | 19 +++++++++--
 .../spark/sql/CarbonDictionaryDecoder.scala     | 33 +++++++++++++++++++-
 .../execution/CarbonLateDecodeStrategy.scala    | 14 ++++++---
 .../sql/optimizer/CarbonLateDecodeRule.scala    | 27 ++++++++++++++--
 5 files changed, 98 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82b61d47/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala
index bdb470a..34d3cee 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala
@@ -27,6 +27,8 @@ import org.scalatest.BeforeAndAfterAll
 class AllDataTypesTestCaseSort extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
+    sql("drop table if exists alldatatypestablesort")
+    sql("drop table if exists alldatatypestablesort_hive")
     sql("CREATE TABLE alldatatypestablesort (empno int, empname String, 
designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname 
String, deptno int, deptname String, projectcode int, projectjoindate 
Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) 
STORED BY 'org.apache.carbondata.format'")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
alldatatypestablesort OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""");
 
@@ -41,8 +43,20 @@ class AllDataTypesTestCaseSort extends QueryTest with 
BeforeAndAfterAll {
       sql("select empno,empname,utilization,count(salary),sum(empno) from 
alldatatypestablesort_hive where empname in ('arvind','ayushi') group by 
empno,empname,utilization order by empno"))
   }
 
+  test("select * from alldatatypestablesort order by empname limit 10") {
+    sql("select * from alldatatypestablesort order by empname limit 
10").collect()
+  }
+
+  test("select * from alldatatypestablesort order by salary limit 2") {
+    sql("select * from alldatatypestablesort order by salary limit 
2").collect()
+  }
+
+  test("select * from alldatatypestablesort where empname='arvind' order by 
salary limit 2") {
+    sql("select * from alldatatypestablesort where empname='arvind' order by 
salary limit 2").collect()
+  }
+
   override def afterAll {
-    sql("drop table alldatatypestablesort")
-    sql("drop table alldatatypestablesort_hive")
+    sql("drop table if exists alldatatypestablesort")
+    sql("drop table if exists alldatatypestablesort_hive")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82b61d47/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 4070088..9b1533e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.{HiveContext, HiveSessionCatalog}
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.types.{StringType, TimestampType}
@@ -33,8 +34,22 @@ case class CarbonDictionaryCatalystDecoder(
     isOuter: Boolean,
     child: LogicalPlan) extends UnaryNode {
   // the output should be updated with converted datatype, it is need for 
limit+sort plan.
-  override val output: Seq[Attribute] =
-    CarbonDictionaryDecoder.convertOutput(child.output, relations, profile, 
aliasMap)
+  override def output: Seq[Attribute] = {
+    child match {
+      case l: LogicalRelation =>
+        // If the child is logical plan then firts update all dictionary attr 
with IntegerType
+        val logicalOut =
+          CarbonDictionaryDecoder.updateAttributes(child.output, relations, 
aliasMap)
+        CarbonDictionaryDecoder.convertOutput(logicalOut, relations, profile, 
aliasMap)
+      case Filter(cond, l: LogicalRelation) =>
+        // If the child is logical plan then firts update all dictionary attr 
with IntegerType
+        val logicalOut =
+          CarbonDictionaryDecoder.updateAttributes(child.output, relations, 
aliasMap)
+        CarbonDictionaryDecoder.convertOutput(logicalOut, relations, profile, 
aliasMap)
+      case _ => CarbonDictionaryDecoder.convertOutput(child.output, relations, 
profile, aliasMap)
+    }
+  }
+
 }
 
 abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82b61d47/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 543da6f..d450b69 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -347,11 +347,42 @@ object CarbonDictionaryDecoder {
   }
 
   /**
+   * Updates all dictionary attributes with integer datatype.
+   */
+  def updateAttributes(output: Seq[Attribute],
+      relations: Seq[CarbonDecoderRelation],
+      aliasMap: CarbonAliasDecoderRelation): Seq[Attribute] = {
+    output.map { a =>
+      val attr = aliasMap.getOrElse(a, a)
+      val relation = relations.find(p => p.contains(attr))
+      if (relation.isDefined) {
+        val carbonTable = 
relation.get.carbonRelation.carbonRelation.metaData.carbonTable
+        val carbonDimension = carbonTable
+          .getDimensionByName(carbonTable.getFactTableName, attr.name)
+        if (carbonDimension != null &&
+            carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+            !carbonDimension.isComplex()) {
+          val newAttr = AttributeReference(a.name,
+            IntegerType,
+            a.nullable,
+            a.metadata)(a.exprId).asInstanceOf[Attribute]
+          newAttr
+        } else {
+          a
+        }
+      } else {
+        a
+      }
+    }
+  }
+
+  /**
    * Whether the attributed requires to decode or not based on the profile.
    */
   def canBeDecoded(attr: Attribute, profile: CarbonProfile): Boolean = {
     profile match {
-      case ip: IncludeProfile if ip.attributes.nonEmpty =>
+      case ip: IncludeProfile =>
         ip.attributes
           .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == 
attr.exprId)
       case ep: ExcludeProfile =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82b61d47/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 16e8a99..ed5d362 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -61,11 +61,15 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
             a.map(_.name).toArray, f), needDecoder)) ::
             Nil
       case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, 
child) =>
-        CarbonDictionaryDecoder(relations,
-          profile,
-          aliasMap,
-          planLater(child)
-        ) :: Nil
+        if (profile.isInstanceOf[IncludeProfile] && profile.isEmpty) {
+          planLater(child) :: Nil
+        } else {
+          CarbonDictionaryDecoder(relations,
+            profile,
+            aliasMap,
+            planLater(child)
+          ) :: Nil
+        }
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82b61d47/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 36478b4..181328d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -168,7 +168,21 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with 
PredicateHelper {
           if (attrsOnSort.size() > 0 && !child.isInstanceOf[Sort]) {
             child = CarbonDictionaryTempDecoder(attrsOnSort,
               new util.HashSet[AttributeReferenceWrapper](), sort.child)
+          } else {
+            // In case of select * from query it gets logical relation and 
there is no way
+            // to convert the datatypes of attributes, so just add this dummy 
decoder to convert
+            // to dictionary datatypes.
+            child match {
+              case l: LogicalRelation =>
+                child = CarbonDictionaryTempDecoder(new 
util.HashSet[AttributeReferenceWrapper](),
+                  new util.HashSet[AttributeReferenceWrapper](), sort.child)
+              case Filter(cond, l: LogicalRelation) =>
+                child = CarbonDictionaryTempDecoder(new 
util.HashSet[AttributeReferenceWrapper](),
+                  new util.HashSet[AttributeReferenceWrapper](), sort.child)
+              case _ =>
+            }
           }
+
           if (!decoder) {
             decoder = true
             CarbonDictionaryTempDecoder(new 
util.HashSet[AttributeReferenceWrapper](),
@@ -609,9 +623,16 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with 
PredicateHelper {
     }
     // Remove unnecessary decoders
     val finalPlan = transFormedPlan transform {
-      case CarbonDictionaryCatalystDecoder(_, profile, _, false, child)
-        if profile.isInstanceOf[IncludeProfile] && profile.isEmpty =>
-        child
+      case cd@ CarbonDictionaryCatalystDecoder(_, profile, _, false, child) =>
+        if (profile.isInstanceOf[IncludeProfile] && profile.isEmpty) {
+          child match {
+            case l: LogicalRelation => cd
+            case Filter(condition, l: LogicalRelation) => cd
+            case _ => child
+          }
+        } else {
+          cd
+        }
     }
     finalPlan
   }

Reply via email to