[CARBONDATA-2200] Fix bug of LIKE operation on streaming table

Fix bug of LIKE operation on streaming table,
LIKE operation will be converted to StartsWith / EndsWith / Contains expression.
Carbon will use RowLevelFilterExecuterImpl to evaluate this expression.
Streaming table also should implement RowLevelFilterExecuterImpl.

This closes #1996


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b0a2fabc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b0a2fabc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b0a2fabc

Branch: refs/heads/branch-1.3
Commit: b0a2fabcc8584dfba24ad0ea135948f5365a7335
Parents: 6f9016d
Author: QiangCai <qiang...@qq.com>
Authored: Sun Feb 25 18:53:41 2018 +0800
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Tue Feb 27 12:51:38 2018 +0530

----------------------------------------------------------------------
 .../executer/RowLevelFilterExecuterImpl.java    | 82 +++++++++++++++++++-
 .../TestStreamingTableOperation.scala           | 20 ++++-
 2 files changed, 100 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b0a2fabc/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index 8956f30..de97e82 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -60,6 +60,7 @@ import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnRes
 import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.util.BitSetGroup;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
@@ -276,13 +277,92 @@ public class RowLevelFilterExecuterImpl implements 
FilterExecuter {
   public boolean applyFilter(RowIntf value, int dimOrdinalMax)
       throws FilterUnsupportedException, IOException {
     try {
-      return exp.evaluate(value).getBoolean();
+      Boolean result = exp.evaluate(convertRow(value, 
dimOrdinalMax)).getBoolean();
+      return result == null ? false : result;
     } catch (FilterIllegalMemberException e) {
       throw new FilterUnsupportedException(e);
     }
   }
 
   /**
+   * convert encoded row to actual value row for filter to evaluate expression
+   * @param value this row will be converted to actual value
+   * @param dimOrdinalMax for measure column, its index in row = dimOrdinalMax 
+ its ordinal
+   * @return actual value row
+   * @throws IOException
+   */
+  private RowIntf convertRow(RowIntf value, int dimOrdinalMax) throws 
IOException {
+    Object[] record = new Object[value.size()];
+    String memberString;
+    for (int i = 0; i < dimColEvaluatorInfoList.size(); i++) {
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = 
dimColEvaluatorInfoList.get(i);
+      int index = dimColumnEvaluatorInfo.getDimension().getOrdinal();
+      // if filter dimension is not present in the current add its default 
value
+      if (!isDimensionPresentInCurrentBlock[i]) {
+        // fill default value here
+        record[index] = getDimensionDefaultValue(dimColumnEvaluatorInfo);
+        // already set value, so continue to set next dimension
+        continue;
+      }
+      if 
(!dimColumnEvaluatorInfo.getDimension().getDataType().isComplexType()) {
+        if (!dimColumnEvaluatorInfo.isDimensionExistsInCurrentSilce()) {
+          record[index] = 
dimColumnEvaluatorInfo.getDimension().getDefaultValue();
+        }
+        byte[] memberBytes = (byte[]) value.getVal(index);
+        if 
(!dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY)) {
+          // no dictionary
+          if (null != memberBytes) {
+            if (Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 
memberBytes)) {
+              memberBytes = null;
+            } else if (memberBytes.length == 0) {
+              memberBytes = null;
+            }
+            record[index] = 
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(memberBytes,
+                dimColumnEvaluatorInfo.getDimension().getDataType());
+          }
+        } else {
+          // dictionary
+          int dictionaryValue = ByteUtil.toInt(memberBytes, 0);
+          if 
(dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY)
+              && 
!dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) 
{
+            memberString =
+                
getFilterActualValueFromDictionaryValue(dimColumnEvaluatorInfo, 
dictionaryValue);
+            record[index] = DataTypeUtil.getDataBasedOnDataType(memberString,
+                dimColumnEvaluatorInfo.getDimension().getDataType());
+          } else if (
+              
dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+            Object member = 
getFilterActualValueFromDirectDictionaryValue(dimColumnEvaluatorInfo,
+                dictionaryValue);
+            record[index] = member;
+          }
+        }
+      } else {
+        // complex
+        record[index] = value.getVal(index);
+      }
+    }
+
+    for (int i = 0; i < msrColEvalutorInfoList.size(); i++) {
+      MeasureColumnResolvedFilterInfo msrColumnEvalutorInfo = 
msrColEvalutorInfoList.get(i);
+      int index = msrColumnEvalutorInfo.getMeasure().getOrdinal() + 
dimOrdinalMax;
+      // add default value for the measure in case filter measure is not 
present
+      // in the current block measure list
+      if (!isMeasurePresentInCurrentBlock[i]) {
+        byte[] defaultValue = 
msrColumnEvalutorInfo.getCarbonColumn().getDefaultValue();
+        record[index] = RestructureUtil.getMeasureDefaultValue(
+            msrColumnEvalutorInfo.getCarbonColumn().getColumnSchema(), 
defaultValue);
+        // already set value, so continue to set next measure
+        continue;
+      }
+      // measure
+      record[index] = value.getVal(index);
+    }
+    RowIntf row = new RowImpl();
+    row.setValues(record);
+    return row;
+  }
+
+  /**
    * Method will read the members of particular dimension block and create
    * a row instance for further processing of the filters
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b0a2fabc/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 94baf86..5644302 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -332,6 +332,12 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
       sql("select * from stream_table_filter where name like '%me_3%' and id < 
30"),
       Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, 
Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"))))
 
+    checkAnswer(sql("select count(*) from stream_table_filter where name like 
'%ame%'"),
+      Seq(Row(49)))
+
+    checkAnswer(sql("select count(*) from stream_table_filter where name like 
'%batch%'"),
+      Seq(Row(5)))
+
     checkAnswer(
       sql("select * from stream_table_filter where name >= 'name_3' and id < 
4"),
       Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, 
Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"))))
@@ -350,6 +356,9 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
       Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, 
Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0")),
         Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 
80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"))))
 
+    checkAnswer(sql("select count(*) from stream_table_filter where city like 
'%city%'"),
+      Seq(Row(54)))
+
     checkAnswer(
       sql("select * from stream_table_filter where city > 'city_09' and city < 
'city_10'"),
       Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, 
Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0")),
@@ -649,6 +658,12 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
       sql("select * from stream_table_filter_complex where name like '%me_3%' 
and id < 30"),
       Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, 
Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", 
"school_33")), 3))))
 
+    checkAnswer(sql("select count(*) from stream_table_filter_complex where 
name like '%ame%'"),
+      Seq(Row(49)))
+
+    checkAnswer(sql("select count(*) from stream_table_filter_complex where 
name like '%batch%'"),
+      Seq(Row(5)))
+
     checkAnswer(
       sql("select * from stream_table_filter_complex where name >= 'name_3' 
and id < 4"),
       Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, 
Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", 
"school_33")), 3))))
@@ -663,6 +678,9 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
       Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, 
Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", 
"school_11")), 1)),
         Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 
80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", 
"school_11")), 20))))
 
+    checkAnswer(sql("select count(*) from stream_table_filter_complex where 
city like '%city%'"),
+      Seq(Row(54)))
+
     checkAnswer(
       sql("select * from stream_table_filter_complex where city > 'city_09' 
and city < 'city_10'"),
       Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, 
Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", 
"school_11")), 1)),
@@ -1056,7 +1074,7 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
     //Verify MergeTO column entry for compacted Segments
     newSegments.filter(_.getString(1).equals("Compacted")).foreach{ rw =>
       assertResult("Compacted")(rw.getString(1))
-      
assertResult((Integer.parseInt(rw.getString(0))+2).toString)(rw.getString(4))
+      assert(Integer.parseInt(rw.getString(0)) < 
Integer.parseInt(rw.getString(4)))
     }
     checkAnswer(
       sql("select count(*) from streaming.stream_table_reopen"),

Reply via email to