[CARBONDATA-2134] Prevent implicit column filter list from getting serialized 
while submitting task to executor

Problem
In the current store blocklet pruning in driver and no further pruning takes 
place in the executor side. But still the implicit column filter list being 
sent to executor. As the size of list grows the cost of serializing and 
deserializing the list is increasing which can impact the query performance.

Solution
Remove the list from the filter expression before submitting the task to 
executor.

This closes #1935


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/11a795ce
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/11a795ce
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/11a795ce

Branch: refs/heads/carbonstore-rebase
Commit: 11a795ceca80e74de8264cba3571ca78ce03fae4
Parents: 7c6c42f
Author: m00258959 <manish.gu...@huawei.com>
Authored: Mon Feb 5 17:10:18 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Fri Feb 9 13:03:05 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/scan/filter/FilterUtil.java | 35 ++++++++++++-
 .../core/scan/filter/FilterUtilTest.java        | 48 ++++++++++++++++++
 .../org/apache/carbondata/spark/util/Util.java  | 19 +++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 52 +++++++++++++++++++-
 4 files changed, 152 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/11a795ce/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index a08edc0..689da9f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -67,9 +67,12 @@ import 
org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.ExpressionResult;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.InExpression;
 import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
 import 
org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
 import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.TrueExpression;
 import org.apache.carbondata.core.scan.filter.executer.AndFilterExecuterImpl;
 import 
org.apache.carbondata.core.scan.filter.executer.DimColumnExecuterFilterInfo;
 import 
org.apache.carbondata.core.scan.filter.executer.ExcludeColGroupFilterExecuterImpl;
@@ -1824,4 +1827,34 @@ public final class FilterUtil {
     }
     return columnFilterInfo;
   }
-}
\ No newline at end of file
+
+  /**
+   * This method will check for ColumnExpression with column name positionID 
and if found will
+   * replace the InExpression with true expression. This is done to stop 
serialization of List
+   * expression which is right children of InExpression as it can impact the 
query performance
+   * as the size of list grows bigger.
+   *
+   * @param expression
+   */
+  public static void removeInExpressionNodeWithPositionIdColumn(Expression 
expression) {
+    ExpressionType filterExpressionType = expression.getFilterExpressionType();
+    if (ExpressionType.AND == filterExpressionType) {
+      Expression rightExpression = ((AndExpression) expression).getRight();
+      if (rightExpression instanceof InExpression) {
+        List<Expression> children = rightExpression.getChildren();
+        if (null != children && !children.isEmpty()) {
+          Expression childExpression = children.get(0);
+          // check for the positionId as the column name in ColumnExpression
+          if (childExpression instanceof ColumnExpression && 
((ColumnExpression) childExpression)
+              
.getColumnName().equalsIgnoreCase(CarbonCommonConstants.POSITION_ID)) {
+            // Remove the right expression node and point the expression to 
left node expression
+            expression
+                .findAndSetChild(((AndExpression) expression).getRight(), new 
TrueExpression(null));
+            LOGGER.info("In expression removed from the filter expression list 
to prevent it from"
+                + " serializing on executor");
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11a795ce/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java 
b/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
index 89b3122..565da04 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
@@ -35,8 +35,11 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.InExpression;
 import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
 import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.TrueExpression;
 import org.apache.carbondata.core.scan.filter.intf.RowImpl;
 import org.apache.carbondata.core.util.BitSetGroup;
 
@@ -399,4 +402,49 @@ public class FilterUtilTest extends 
AbstractDictionaryCacheTest {
         FilterUtil.createBitSetGroupWithDefaultValue(15, 448200, true);
     assertTrue(bitSetGroupWithDefaultValue.getNumberOfPages() == 15);
   }
+
+  @Test public void testRemoveInExpressionNodeWithPositionIdColumn() {
+    List<Expression> children = new 
ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    // create literal expression
+    LiteralExpression literalExpression =
+        new LiteralExpression("0/1/0-0_batchno0-0-1517808273200/0", 
DataTypes.STRING);
+    children.add(literalExpression);
+    // create list expression
+    ListExpression listExpression = new ListExpression(children);
+    // create column expression with column name as positionId
+    ColumnExpression columnExpression =
+        new ColumnExpression(CarbonCommonConstants.POSITION_ID, 
DataTypes.STRING);
+    // create InExpression as right node
+    InExpression inExpression = new InExpression(columnExpression, 
listExpression);
+    // create a dummy true expression as left node
+    TrueExpression trueExpression = new TrueExpression(null);
+    // create and expression as the root node
+    Expression expression = new AndExpression(trueExpression, inExpression);
+    // test remove expression method
+    FilterUtil.removeInExpressionNodeWithPositionIdColumn(expression);
+    // after removing the right node instance of right node should be of true 
expression
+    assert (((AndExpression) expression).getRight() instanceof TrueExpression);
+  }
+
+  @Test public void testRemoveInExpressionNodeWithDifferentColumn() {
+    List<Expression> children = new 
ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    // create literal expression
+    LiteralExpression literalExpression =
+        new LiteralExpression("testName", DataTypes.STRING);
+    children.add(literalExpression);
+    // create list expression
+    ListExpression listExpression = new ListExpression(children);
+    // create column expression with column name as positionId
+    ColumnExpression columnExpression = new ColumnExpression("name", 
DataTypes.STRING);
+    // create InExpression as right node
+    InExpression inExpression = new InExpression(columnExpression, 
listExpression);
+    // create a dummy true expression as left node
+    TrueExpression trueExpression = new TrueExpression(null);
+    // create and expression as the root node
+    Expression expression = new AndExpression(trueExpression, inExpression);
+    // test remove expression method
+    FilterUtil.removeInExpressionNodeWithPositionIdColumn(expression);
+    // after removing the right node instance of right node should be of true 
expression
+    assert (((AndExpression) expression).getRight() instanceof InExpression);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11a795ce/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
index 8c14cd3..cd2b81c 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
@@ -17,6 +17,10 @@
 
 package org.apache.carbondata.spark.util;
 
+import java.util.List;
+
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+
 import org.apache.spark.SparkConf;
 import org.apache.spark.util.Utils;
 
@@ -27,4 +31,19 @@ public class Util {
   public static String[] getConfiguredLocalDirs(SparkConf conf) {
     return Utils.getConfiguredLocalDirs(conf);
   }
+
+  /**
+   * Method to check whether there exists any block which does not contain the 
blocklet info
+   *
+   * @param splitList
+   * @return
+   */
+  public static boolean 
isBlockWithoutBlockletInfoExists(List<CarbonInputSplit> splitList) {
+    for (CarbonInputSplit inputSplit : splitList) {
+      if (null == inputSplit.getDetailInfo().getBlockletInfo()) {
+        return true;
+      }
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11a795ce/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 102c6c8..e554a58 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.Random
+import scala.util.control.Breaks.{break, breakable}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
@@ -42,6 +43,7 @@ import 
org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.filter.FilterUtil
 import org.apache.carbondata.core.scan.model.QueryModel
 import org.apache.carbondata.core.stats.{QueryStatistic, 
QueryStatisticsConstants, QueryStatisticsRecorder}
 import org.apache.carbondata.core.statusmanager.FileFormat
@@ -51,7 +53,7 @@ import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, 
CarbonStreamRecordReader}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
-import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
+import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
 
 /**
  * This RDD is used to perform query on CarbonData file. Before sending tasks 
to scan
@@ -109,6 +111,8 @@ class CarbonScanRDD(
       }
     }
     val batchPartitions = distributeColumnarSplits(columnarSplits)
+    // check and remove InExpression from filterExpression
+    checkAndRemoveInExpressinFromFilterExpression(format, batchPartitions)
     if (streamSplits.isEmpty) {
       batchPartitions.toArray
     } else {
@@ -471,6 +475,52 @@ class CarbonScanRDD(
   }
 
   /**
+   * This method will check and remove InExpression from filterExpression to 
prevent the List
+   * Expression values from serializing and deserializing on executor
+   *
+   * @param format
+   * @param identifiedPartitions
+   */
+  private def checkAndRemoveInExpressinFromFilterExpression(
+      format: CarbonTableInputFormat[Object],
+      identifiedPartitions: mutable.Buffer[Partition]) = {
+    if (null != filterExpression) {
+      if (identifiedPartitions.nonEmpty &&
+          !checkForBlockWithoutBlockletInfo(identifiedPartitions)) {
+        FilterUtil.removeInExpressionNodeWithPositionIdColumn(filterExpression)
+      }
+    }
+  }
+
+  /**
+   * This method will check for presence of any block from old store (version 
1.1). If any of the
+   * blocks identified does not contain the blocklet info that means that 
block is from old store
+   *
+   * @param identifiedPartitions
+   * @return
+   */
+  private def checkForBlockWithoutBlockletInfo(
+      identifiedPartitions: mutable.Buffer[Partition]): Boolean = {
+    var isBlockWithoutBlockletInfoPresent = false
+    breakable {
+      identifiedPartitions.foreach { value =>
+        val inputSplit = value.asInstanceOf[CarbonSparkPartition].split.value
+        val splitList = if (inputSplit.isInstanceOf[CarbonMultiBlockSplit]) {
+          inputSplit.asInstanceOf[CarbonMultiBlockSplit].getAllSplits
+        } else {
+          new 
java.util.ArrayList().add(inputSplit.asInstanceOf[CarbonInputSplit])
+        }.asInstanceOf[java.util.List[CarbonInputSplit]]
+        // check for block from old store (version 1.1 and below)
+        if (Util.isBlockWithoutBlockletInfoExists(splitList)) {
+          isBlockWithoutBlockletInfoPresent = true
+          break
+        }
+      }
+    }
+    isBlockWithoutBlockletInfoPresent
+  }
+
+  /**
    * Get the preferred locations where to launch this task.
    */
   override def getPreferredLocations(split: Partition): Seq[String] = {

Reply via email to