Port DRILL-3492 and DRILL-3364 to maprdb plugin

+ Fix build issues due to API changes in DRILL-3535


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/df19a019
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/df19a019
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/df19a019

Branch: refs/heads/master
Commit: df19a019117db3448a01f34daf804df747ca94b8
Parents: ba8a785
Author: Smidth Panchamia <spancha...@mapr.com>
Authored: Thu Aug 6 15:50:44 2015 -0700
Committer: Aditya Kishore <a...@apache.org>
Committed: Fri Sep 9 10:08:28 2016 -0700

----------------------------------------------------------------------
 .../store/maprdb/CompareFunctionsProcessor.java | 421 ++++++++++++++++---
 .../exec/store/maprdb/MapRDBFilterBuilder.java  |  79 +++-
 .../exec/store/maprdb/MapRDBFormatMatcher.java  |  14 +-
 .../exec/store/maprdb/MapRDBFormatPlugin.java   |   2 +-
 .../store/maprdb/MapRDBPushFilterIntoScan.java  | 130 ++++--
 5 files changed, 529 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/df19a019/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java
----------------------------------------------------------------------
diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java
index de8e080..c6c2504 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.maprdb;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
+import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
 import org.apache.drill.common.expression.CastExpression;
@@ -35,7 +36,16 @@ import 
org.apache.drill.common.expression.ValueExpressions.IntExpression;
 import org.apache.drill.common.expression.ValueExpressions.LongExpression;
 import org.apache.drill.common.expression.ValueExpressions.QuotedString;
 import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
 import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.hadoop.hbase.util.Order;
+import org.apache.hadoop.hbase.util.PositionedByteRange;
+import org.apache.hadoop.hbase.util.SimplePositionedByteRange;
+
+import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableMap;
@@ -47,6 +57,15 @@ class CompareFunctionsProcessor extends 
AbstractExprVisitor<Boolean, LogicalExpr
   private boolean isEqualityFn;
   private SchemaPath path;
   private String functionName;
+  private boolean sortOrderAscending;
+
+  // Fields for row-key prefix comparison
+  // If the query is on row-key prefix, we cannot use a standard template to 
identify startRow, stopRow and filter
+  // Hence, we use these local variables(set depending upon the encoding type 
in user query)
+  private boolean isRowKeyPrefixComparison;
+  byte[] rowKeyPrefixStartRow;
+  byte[] rowKeyPrefixStopRow;
+  Filter rowKeyPrefixFilter;
 
   public static boolean isCompareFunction(String functionName) {
     return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
@@ -79,6 +98,8 @@ class CompareFunctionsProcessor extends 
AbstractExprVisitor<Boolean, LogicalExpr
     this.functionName = functionName;
     this.isEqualityFn = 
COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)
         && 
COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(functionName);
+    this.isRowKeyPrefixComparison = false;
+    this.sortOrderAscending = true;
   }
 
   public byte[] getValue() {
@@ -97,6 +118,26 @@ class CompareFunctionsProcessor extends 
AbstractExprVisitor<Boolean, LogicalExpr
     return functionName;
   }
 
+  public boolean isRowKeyPrefixComparison() {
+       return isRowKeyPrefixComparison;
+  }
+
+  public byte[] getRowKeyPrefixStartRow() {
+    return rowKeyPrefixStartRow;
+  }
+
+  public byte[] getRowKeyPrefixStopRow() {
+  return rowKeyPrefixStopRow;
+  }
+
+  public Filter getRowKeyPrefixFilter() {
+  return rowKeyPrefixFilter;
+  }
+
+  public boolean isSortOrderAscending() {
+    return sortOrderAscending;
+  }
+
   @Override
   public Boolean visitCastExpression(CastExpression e, LogicalExpression 
valueArg) throws RuntimeException {
     if (e.getInput() instanceof CastExpression || e.getInput() instanceof 
SchemaPath) {
@@ -107,75 +148,341 @@ class CompareFunctionsProcessor extends 
AbstractExprVisitor<Boolean, LogicalExpr
 
   @Override
   public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression 
valueArg) throws RuntimeException {
-    if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM && 
e.getInput() instanceof SchemaPath) {
-      ByteBuf bb = null;
+    if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM) {
+
       String encodingType = e.getEncodingType();
-      switch (encodingType) {
-      case "INT_BE":
-      case "INT":
-      case "UINT_BE":
-      case "UINT":
-      case "UINT4_BE":
-      case "UINT4":
-        if (valueArg instanceof IntExpression
-            && (isEqualityFn || encodingType.startsWith("U"))) {
-          bb = newByteBuf(4, encodingType.endsWith("_BE"));
-          bb.writeInt(((IntExpression)valueArg).getInt());
+      int prefixLength    = 0;
+
+      // Handle scan pruning in the following scenario:
+      // The row-key is a composite key and the CONVERT_FROM() function has 
byte_substr() as input function which is
+      // querying for the first few bytes of the row-key(start-offset 1)
+      // Example WHERE clause:
+      // CONVERT_FROM(BYTE_SUBSTR(row_key, 1, 8), 'DATE_EPOCH_BE') < DATE 
'2015-06-17'
+      if (e.getInput() instanceof FunctionCall) {
+
+        // We can prune scan range only for big-endian encoded data
+        if (encodingType.endsWith("_BE") == false) {
+          return false;
         }
-        break;
-      case "BIGINT_BE":
-      case "BIGINT":
-      case "UINT8_BE":
-      case "UINT8":
-        if (valueArg instanceof LongExpression
-            && (isEqualityFn || encodingType.startsWith("U"))) {
-          bb = newByteBuf(8, encodingType.endsWith("_BE"));
-          bb.writeLong(((LongExpression)valueArg).getLong());
+
+        FunctionCall call = (FunctionCall)e.getInput();
+        String functionName = call.getName();
+        if (!functionName.equalsIgnoreCase("byte_substr")) {
+          return false;
         }
-        break;
-      case "FLOAT":
-        if (valueArg instanceof FloatExpression && isEqualityFn) {
-          bb = newByteBuf(4, true);
-          bb.writeFloat(((FloatExpression)valueArg).getFloat());
+
+        LogicalExpression nameArg = call.args.get(0);
+        LogicalExpression valueArg1 = call.args.size() >= 2 ? call.args.get(1) 
: null;
+        LogicalExpression valueArg2 = call.args.size() >= 3 ? call.args.get(2) 
: null;
+
+        if (((nameArg instanceof SchemaPath) == false) ||
+             (valueArg1 == null) || ((valueArg1 instanceof IntExpression) == 
false) ||
+             (valueArg2 == null) || ((valueArg2 instanceof IntExpression) == 
false)) {
+          return false;
         }
-        break;
-      case "DOUBLE":
-        if (valueArg instanceof DoubleExpression && isEqualityFn) {
-          bb = newByteBuf(8, true);
-          bb.writeDouble(((DoubleExpression)valueArg).getDouble());
+
+        boolean isRowKey = 
((SchemaPath)nameArg).getAsUnescapedPath().equals(DrillHBaseConstants.ROW_KEY);
+        int offset = ((IntExpression)valueArg1).getInt();
+
+        if (!isRowKey || (offset != 1)) {
+          return false;
+        }
+
+        this.path    = (SchemaPath)nameArg;
+        prefixLength = ((IntExpression)valueArg2).getInt();
+        this.isRowKeyPrefixComparison = true;
+        return visitRowKeyPrefixConvertExpression(e, prefixLength, valueArg);
+      }
+
+      if (e.getInput() instanceof SchemaPath) {
+        ByteBuf bb = null;
+
+        switch (encodingType) {
+        case "INT_BE":
+        case "INT":
+        case "UINT_BE":
+        case "UINT":
+        case "UINT4_BE":
+        case "UINT4":
+          if (valueArg instanceof IntExpression
+              && (isEqualityFn || encodingType.startsWith("U"))) {
+            bb = newByteBuf(4, encodingType.endsWith("_BE"));
+            bb.writeInt(((IntExpression)valueArg).getInt());
+          }
+          break;
+        case "BIGINT_BE":
+        case "BIGINT":
+        case "UINT8_BE":
+        case "UINT8":
+          if (valueArg instanceof LongExpression
+              && (isEqualityFn || encodingType.startsWith("U"))) {
+            bb = newByteBuf(8, encodingType.endsWith("_BE"));
+            bb.writeLong(((LongExpression)valueArg).getLong());
+          }
+          break;
+        case "FLOAT":
+          if (valueArg instanceof FloatExpression && isEqualityFn) {
+            bb = newByteBuf(4, true);
+            bb.writeFloat(((FloatExpression)valueArg).getFloat());
+          }
+          break;
+        case "DOUBLE":
+          if (valueArg instanceof DoubleExpression && isEqualityFn) {
+            bb = newByteBuf(8, true);
+            bb.writeDouble(((DoubleExpression)valueArg).getDouble());
+          }
+          break;
+        case "TIME_EPOCH":
+        case "TIME_EPOCH_BE":
+          if (valueArg instanceof TimeExpression) {
+            bb = newByteBuf(8, encodingType.endsWith("_BE"));
+            bb.writeLong(((TimeExpression)valueArg).getTime());
+          }
+          break;
+        case "DATE_EPOCH":
+        case "DATE_EPOCH_BE":
+          if (valueArg instanceof DateExpression) {
+            bb = newByteBuf(8, encodingType.endsWith("_BE"));
+            bb.writeLong(((DateExpression)valueArg).getDate());
+          }
+          break;
+        case "BOOLEAN_BYTE":
+          if (valueArg instanceof BooleanExpression) {
+            bb = newByteBuf(1, false /* does not matter */);
+            bb.writeByte(((BooleanExpression)valueArg).getBoolean() ? 1 : 0);
+          }
+          break;
+        case "DOUBLE_OB":
+        case "DOUBLE_OBD":
+          if (valueArg instanceof DoubleExpression) {
+            bb = newByteBuf(9, true);
+            PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 
0, 9);
+            if (encodingType.endsWith("_OBD")) {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br,
+                  ((DoubleExpression)valueArg).getDouble(), Order.DESCENDING);
+              this.sortOrderAscending = false;
+            } else {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br,
+                  ((DoubleExpression)valueArg).getDouble(), Order.ASCENDING);
+            }
+          }
+          break;
+        case "FLOAT_OB":
+        case "FLOAT_OBD":
+          if (valueArg instanceof FloatExpression) {
+            bb = newByteBuf(5, true);
+            PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 
0, 5);
+            if (encodingType.endsWith("_OBD")) {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br,
+                  ((FloatExpression)valueArg).getFloat(), Order.DESCENDING);
+              this.sortOrderAscending = false;
+            } else {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br,
+                        ((FloatExpression)valueArg).getFloat(), 
Order.ASCENDING);
+            }
+          }
+          break;
+        case "BIGINT_OB":
+        case "BIGINT_OBD":
+          if (valueArg instanceof LongExpression) {
+            bb = newByteBuf(9, true);
+            PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 
0, 9);
+            if (encodingType.endsWith("_OBD")) {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br,
+                        ((LongExpression)valueArg).getLong(), 
Order.DESCENDING);
+              this.sortOrderAscending = false;
+            } else {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br,
+                  ((LongExpression)valueArg).getLong(), Order.ASCENDING);
+            }
+          }
+          break;
+        case "INT_OB":
+        case "INT_OBD":
+          if (valueArg instanceof IntExpression) {
+            bb = newByteBuf(5, true);
+            PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 
0, 5);
+            if (encodingType.endsWith("_OBD")) {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br,
+                  ((IntExpression)valueArg).getInt(), Order.DESCENDING);
+              this.sortOrderAscending = false;
+            } else {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br,
+                        ((IntExpression)valueArg).getInt(), Order.ASCENDING);
+            }
+          }
+          break;
+        case "UTF8_OB":
+        case "UTF8_OBD":
+          if (valueArg instanceof QuotedString) {
+            int stringLen = ((QuotedString) 
valueArg).value.getBytes(Charsets.UTF_8).length;
+            bb = newByteBuf(stringLen + 2, true);
+            PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 
0, stringLen + 2);
+            if (encodingType.endsWith("_OBD")) {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br,
+                  ((QuotedString)valueArg).value, Order.DESCENDING);
+              this.sortOrderAscending = false;
+            } else {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br,
+                        ((QuotedString)valueArg).value, Order.ASCENDING);
+            }
+          }
+          break;
+        case "UTF8":
+        // let visitSchemaPath() handle this.
+          return e.getInput().accept(this, valueArg);
+        }
+
+        if (bb != null) {
+          this.value = bb.array();
+          this.path = (SchemaPath)e.getInput();
+          return true;
         }
-        break;
-      case "TIME_EPOCH":
-      case "TIME_EPOCH_BE":
-        if (valueArg instanceof TimeExpression) {
-          bb = newByteBuf(8, encodingType.endsWith("_BE"));
-          bb.writeLong(((TimeExpression)valueArg).getTime());
+      }
+    }
+    return false;
+  }
+
+  private Boolean visitRowKeyPrefixConvertExpression(ConvertExpression e,
+    int prefixLength, LogicalExpression valueArg) {
+    String encodingType = e.getEncodingType();
+    rowKeyPrefixStartRow = HConstants.EMPTY_START_ROW;
+    rowKeyPrefixStopRow  = HConstants.EMPTY_START_ROW;
+    rowKeyPrefixFilter   = null;
+
+    if ((encodingType.compareTo("UINT4_BE") == 0) ||
+        (encodingType.compareTo("UINT_BE") == 0)) {
+      if (prefixLength != 4) {
+        throw new RuntimeException("Invalid length(" + prefixLength + ") of 
row-key prefix");
+      }
+
+      int val;
+      if ((valueArg instanceof IntExpression) == false) {
+        return false;
+      }
+
+      val = ((IntExpression)valueArg).getInt();
+
+      // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down 
are =, <>, <, <=, >, >=
+      switch (functionName) {
+      case "equal":
+        rowKeyPrefixFilter = new 
PrefixFilter(ByteBuffer.allocate(4).putInt(val).array());
+        rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array();
+        rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array();
+        return true;
+      case "greater_than_or_equal_to":
+        rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array();
+        return true;
+      case "greater_than":
+        rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val + 1).array();
+        return true;
+      case "less_than_or_equal_to":
+        rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array();
+        return true;
+      case "less_than":
+        rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val).array();
+        return true;
+      }
+
+      return false;
+    }
+
+    if ((encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) ||
+        (encodingType.compareTo("TIME_EPOCH_BE") == 0) ||
+        (encodingType.compareTo("UINT8_BE") == 0)) {
+
+      if (prefixLength != 8) {
+        throw new RuntimeException("Invalid length(" + prefixLength + ") of 
row-key prefix");
+      }
+
+      long val;
+      if (encodingType.compareTo("TIME_EPOCH_BE") == 0) {
+        if ((valueArg instanceof TimeExpression) == false) {
+          return false;
         }
-        break;
-      case "DATE_EPOCH":
-      case "DATE_EPOCH_BE":
-        if (valueArg instanceof DateExpression) {
-          bb = newByteBuf(8, encodingType.endsWith("_BE"));
-          bb.writeLong(((DateExpression)valueArg).getDate());
+
+        val = ((TimeExpression)valueArg).getTime();
+      } else if (encodingType.compareTo("UINT8_BE") == 0){
+        if ((valueArg instanceof LongExpression) == false) {
+          return false;
         }
-        break;
-      case "BOOLEAN_BYTE":
-        if (valueArg instanceof BooleanExpression) {
-          bb = newByteBuf(1, false /* does not matter */);
-          bb.writeByte(((BooleanExpression)valueArg).getBoolean() ? 1 : 0);
+
+        val = ((LongExpression)valueArg).getLong();
+      } else if (encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) {
+        if ((valueArg instanceof TimeStampExpression) == false) {
+          return false;
         }
-        break;
-      case "UTF8":
-        // let visitSchemaPath() handle this.
-        return e.getInput().accept(this, valueArg);
+
+        val = ((TimeStampExpression)valueArg).getTimeStamp();
+      } else {
+        // Should not reach here.
+        return false;
       }
 
-      if (bb != null) {
-        this.value = bb.array();
-        this.path = (SchemaPath)e.getInput();
+      // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down 
are =, <>, <, <=, >, >=
+      switch (functionName) {
+      case "equal":
+        rowKeyPrefixFilter = new 
PrefixFilter(ByteBuffer.allocate(8).putLong(val).array());
+        rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array();
+        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array();
+        return true;
+      case "greater_than_or_equal_to":
+        rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array();
         return true;
+      case "greater_than":
+        rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val + 1).array();
+        return true;
+      case "less_than_or_equal_to":
+        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array();
+        return true;
+      case "less_than":
+        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val).array();
+        return true;
+      }
+
+      return false;
+    }
+
+    if (encodingType.compareTo("DATE_EPOCH_BE") == 0) {
+      if ((valueArg instanceof DateExpression) == false) {
+        return false;
+      }
+
+      if (prefixLength != 8) {
+        throw new RuntimeException("Invalid length(" + prefixLength + ") of 
row-key prefix");
       }
+
+      final long MILLISECONDS_IN_A_DAY  = (long)1000 * 60 * 60 * 24;
+      long dateToSet;
+      // For DATE encoding, the operators that we push-down are =, <>, <, <=, 
>, >=
+      switch (functionName) {
+      case "equal":
+        long startDate = ((DateExpression)valueArg).getDate();
+        rowKeyPrefixStartRow = 
ByteBuffer.allocate(8).putLong(startDate).array();
+        long stopDate  = ((DateExpression)valueArg).getDate() + 
MILLISECONDS_IN_A_DAY;
+        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(stopDate).array();
+        return true;
+      case "greater_than_or_equal_to":
+        dateToSet = ((DateExpression)valueArg).getDate();
+        rowKeyPrefixStartRow = 
ByteBuffer.allocate(8).putLong(dateToSet).array();
+        return true;
+      case "greater_than":
+        dateToSet = ((DateExpression)valueArg).getDate() + 
MILLISECONDS_IN_A_DAY;
+        rowKeyPrefixStartRow = 
ByteBuffer.allocate(8).putLong(dateToSet).array();
+        return true;
+      case "less_than_or_equal_to":
+        dateToSet = ((DateExpression)valueArg).getDate() + 
MILLISECONDS_IN_A_DAY;
+        rowKeyPrefixStopRow = 
ByteBuffer.allocate(8).putLong(dateToSet).array();
+        return true;
+      case "less_than":
+        dateToSet = ((DateExpression)valueArg).getDate();
+        rowKeyPrefixStopRow = 
ByteBuffer.allocate(8).putLong(dateToSet).array();
+        return true;
+      }
+
+      return false;
     }
+
     return false;
   }
 
@@ -237,4 +544,4 @@ class CompareFunctionsProcessor extends 
AbstractExprVisitor<Boolean, LogicalExpr
         .build();
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/df19a019/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java
----------------------------------------------------------------------
diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java
index 45fe22f..a4a3938 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.NullComparator;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.filter.RegexStringComparator;
 import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
@@ -159,6 +160,7 @@ public class MapRDBFilterBuilder extends 
AbstractExprVisitor<HBaseScanSpec, Void
     String functionName = processor.getFunctionName();
     SchemaPath field = processor.getPath();
     byte[] fieldValue = processor.getValue();
+    boolean sortOrderAscending = processor.isSortOrderAscending();
     boolean isRowKey = field.getAsUnescapedPath().equals(ROW_KEY);
     if (!(isRowKey
         || (!field.getRootSegment().isLastPath()
@@ -172,6 +174,10 @@ public class MapRDBFilterBuilder extends 
AbstractExprVisitor<HBaseScanSpec, Void
       return null;
     }
 
+    if (processor.isRowKeyPrefixComparison()) {
+      return createRowKeyPrefixScanSpec(call, processor);
+    }
+
     CompareOp compareOp = null;
     boolean isNullTest = false;
     ByteArrayComparable comparator = new BinaryComparator(fieldValue);
@@ -189,29 +195,59 @@ public class MapRDBFilterBuilder extends 
AbstractExprVisitor<HBaseScanSpec, Void
       compareOp = CompareOp.NOT_EQUAL;
       break;
     case "greater_than_or_equal_to":
-      compareOp = CompareOp.GREATER_OR_EQUAL;
-      if (isRowKey) {
-        startRow = fieldValue;
+      if (sortOrderAscending) {
+        compareOp = CompareOp.GREATER_OR_EQUAL;
+        if (isRowKey) {
+          startRow = fieldValue;
+        }
+      } else {
+        compareOp = CompareOp.LESS_OR_EQUAL;
+        if (isRowKey) {
+          // stopRow should be just greater than 'value'
+          stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+        }
       }
       break;
     case "greater_than":
-      compareOp = CompareOp.GREATER;
-      if (isRowKey) {
-        // startRow should be just greater than 'value'
-        startRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+      if (sortOrderAscending) {
+        compareOp = CompareOp.GREATER;
+        if (isRowKey) {
+          // startRow should be just greater than 'value'
+          startRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+        }
+      } else {
+        compareOp = CompareOp.LESS;
+        if (isRowKey) {
+          stopRow = fieldValue;
+        }
       }
       break;
     case "less_than_or_equal_to":
-      compareOp = CompareOp.LESS_OR_EQUAL;
-      if (isRowKey) {
-        // stopRow should be just greater than 'value'
-        stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+      if (sortOrderAscending) {
+        compareOp = CompareOp.LESS_OR_EQUAL;
+        if (isRowKey) {
+          // stopRow should be just greater than 'value'
+          stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+        }
+      } else {
+        compareOp = CompareOp.GREATER_OR_EQUAL;
+        if (isRowKey) {
+          startRow = fieldValue;
+        }
       }
       break;
     case "less_than":
-      compareOp = CompareOp.LESS;
-      if (isRowKey) {
-        stopRow = fieldValue;
+      if (sortOrderAscending) {
+        compareOp = CompareOp.LESS;
+        if (isRowKey) {
+          stopRow = fieldValue;
+        }
+      } else {
+        compareOp = CompareOp.GREATER;
+        if (isRowKey) {
+          // startRow should be just greater than 'value'
+          startRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+        }
       }
       break;
     case "isnull":
@@ -299,4 +335,19 @@ public class MapRDBFilterBuilder extends 
AbstractExprVisitor<HBaseScanSpec, Void
     return null;
   }
 
+  private HBaseScanSpec createRowKeyPrefixScanSpec(FunctionCall call,
+      CompareFunctionsProcessor processor) {
+    byte[] startRow = processor.getRowKeyPrefixStartRow();
+    byte[] stopRow  = processor.getRowKeyPrefixStopRow();
+    Filter filter   = processor.getRowKeyPrefixFilter();
+
+    if (startRow != HConstants.EMPTY_START_ROW ||
+      stopRow != HConstants.EMPTY_END_ROW ||
+      filter != null) {
+      return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, 
filter);
+    }
+
+    // else
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/df19a019/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java
----------------------------------------------------------------------
diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java
index f2e6ceb..b048e37 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java
@@ -43,12 +43,16 @@ public class MapRDBFormatMatcher extends FormatMatcher {
   @Override
   public FormatSelection isReadable(DrillFileSystem fs, FileSelection 
selection) throws IOException {
     FileStatus status = selection.getFirstPath(fs);
-    if (status instanceof MapRFileStatus) {
-      if (((MapRFileStatus) status).isTable()) {
-        return new FormatSelection(getFormatPlugin().getConfig(), selection);
-      }
+    if (!isFileReadable(fs, status)) {
+      return null;
     }
-    return null;
+
+    return new FormatSelection(getFormatPlugin().getConfig(), selection);
+  }
+
+  @Override
+  public boolean isFileReadable(DrillFileSystem fs, FileStatus status) throws 
IOException {
+    return (status instanceof MapRFileStatus) &&  ((MapRFileStatus) 
status).isTable();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/df19a019/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
----------------------------------------------------------------------
diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
index eb72db0..6a10d22 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
@@ -99,7 +99,7 @@ public class MapRDBFormatPlugin implements FormatPlugin {
   @Override
   @JsonIgnore
   public Set<StoragePluginOptimizerRule> getOptimizerRules() {
-    return ImmutableSet.of(MapRDBPushFilterIntoScan.INSTANCE);
+    return ImmutableSet.of(MapRDBPushFilterIntoScan.FILTER_ON_SCAN, 
MapRDBPushFilterIntoScan.FILTER_ON_PROJECT);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/df19a019/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
index 7d79a8e..50f3d95 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
@@ -24,71 +24,121 @@ import 
org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.FilterPrel;
 import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.hbase.HBaseScanSpec;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rex.RexNode;
 
 import com.google.common.collect.ImmutableList;
 
-public class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRule {
+public abstract class MapRDBPushFilterIntoScan extends 
StoragePluginOptimizerRule {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MapRDBPushFilterIntoScan.class);
        
-  public static final StoragePluginOptimizerRule INSTANCE = new 
MapRDBPushFilterIntoScan();
+//  public static final StoragePluginOptimizerRule INSTANCE = new 
MapRDBPushFilterIntoScan();
 
-  private MapRDBPushFilterIntoScan() {
-    super(RelOptHelper.some(FilterPrel.class, 
RelOptHelper.any(ScanPrel.class)), "MapRDBPushFilterIntoScan");
+  private MapRDBPushFilterIntoScan(RelOptRuleOperand operand, String 
description) {
+    super(operand, description);
   }
 
-  @Override
-  public void onMatch(RelOptRuleCall call) {
-    final ScanPrel scan = (ScanPrel) call.rel(1);
-    final FilterPrel filter = (FilterPrel) call.rel(0);
-    final RexNode condition = filter.getCondition();
-
-    MapRDBGroupScan groupScan = (MapRDBGroupScan)scan.getGroupScan();
-    if (groupScan.isFilterPushedDown()) {
-      /*
-       * The rule can get triggered again due to the transformed "scan => 
filter" sequence
-       * created by the earlier execution of this rule when we could not do a 
complete
-       * conversion of Optiq Filter's condition to HBase Filter. In such 
cases, we rely upon
-       * this flag to not do a re-processing of the rule on the already 
transformed call.
-       */
-      return;
+  public static final StoragePluginOptimizerRule FILTER_ON_SCAN = new 
MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, 
RelOptHelper.any(ScanPrel.class)), "MapRDBPushFilterIntoScan:Filter_On_Scan") {
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      final ScanPrel scan = (ScanPrel) call.rel(1);
+      final FilterPrel filter = (FilterPrel) call.rel(0);
+      final RexNode condition = filter.getCondition();
+
+      MapRDBGroupScan groupScan = (MapRDBGroupScan)scan.getGroupScan();
+      if (groupScan.isFilterPushedDown()) {
+        /*
+         * The rule can get triggered again due to the transformed "scan => 
filter" sequence
+         * created by the earlier execution of this rule when we could not do 
a complete
+         * conversion of Optiq Filter's condition to HBase Filter. In such 
cases, we rely upon
+         * this flag to not do a re-processing of the rule on the already 
transformed call.
+         */
+        return;
+      }
+
+      doPushFilterToScan(call, filter, null, scan, groupScan, condition);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      final ScanPrel scan = (ScanPrel) call.rel(1);
+      if (scan.getGroupScan() instanceof MapRDBGroupScan) {
+        return super.matches(call);
+      }
+      return false;
     }
+  };
 
-    LogicalExpression conditionExp = DrillOptiq.toDrill(new 
DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, 
condition);
-    MapRDBFilterBuilder hbaseFilterBuilder = new 
MapRDBFilterBuilder(groupScan, conditionExp);
-    HBaseScanSpec newScanSpec = hbaseFilterBuilder.parseTree();
+  public static final StoragePluginOptimizerRule FILTER_ON_PROJECT = new 
MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, 
RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))), 
"MapRDBPushFilterIntoScan:Filter_On_Project") {
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      final ScanPrel scan = (ScanPrel) call.rel(2);
+      final ProjectPrel project = (ProjectPrel) call.rel(1);
+      final FilterPrel filter = (FilterPrel) call.rel(0);
+
+      MapRDBGroupScan groupScan = (MapRDBGroupScan)scan.getGroupScan();
+      if (groupScan.isFilterPushedDown()) {
+        /*
+         * The rule can get triggered again due to the transformed "scan => 
filter" sequence
+         * created by the earlier execution of this rule when we could not do 
a complete
+         * conversion of Optiq Filter's condition to HBase Filter. In such 
cases, we rely upon
+         * this flag to not do a re-processing of the rule on the already 
transformed call.
+         */
+         return;
+      }
+
+      // convert the filter to one that references the child of the project
+      final RexNode condition =  
RelOptUtil.pushFilterPastProject(filter.getCondition(), project);
+
+      doPushFilterToScan(call, filter, project, scan, groupScan, condition);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      final ScanPrel scan = (ScanPrel) call.rel(2);
+      if (scan.getGroupScan() instanceof MapRDBGroupScan) {
+        return super.matches(call);
+      }
+      return false;
+    }
+  };
+
+  protected void doPushFilterToScan(final RelOptRuleCall call, final 
FilterPrel filter, final ProjectPrel project, final ScanPrel scan, final 
MapRDBGroupScan groupScan, final RexNode condition) {
+
+    final LogicalExpression conditionExp = DrillOptiq.toDrill(new 
DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, 
condition);
+    final MapRDBFilterBuilder maprdbFilterBuilder = new 
MapRDBFilterBuilder(groupScan, conditionExp);
+    final HBaseScanSpec newScanSpec = maprdbFilterBuilder.parseTree();
     if (newScanSpec == null) {
       return; //no filter pushdown ==> No transformation.
     }
 
-    final MapRDBGroupScan newGroupsScan = new 
MapRDBGroupScan(groupScan.getUserName(),
-        groupScan.getStoragePlugin(), groupScan.getFormatPlugin(), 
newScanSpec, groupScan.getColumns());
+    final MapRDBGroupScan newGroupsScan = new 
MapRDBGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
+                                                              
groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns());
     newGroupsScan.setFilterPushedDown(true);
 
     final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), 
newGroupsScan, scan.getRowType());
-    if (hbaseFilterBuilder.isAllExpressionsConverted()) {
-      /*
-       * Since we could convert the entire filter condition expression into an 
HBase filter,
-       * we can eliminate the filter operator altogether.
-       */
-      call.transformTo(newScanPrel);
-    } else {
-      call.transformTo(filter.copy(filter.getTraitSet(), 
ImmutableList.of((RelNode)newScanPrel)));
-    }
-  }
 
-  @Override
-  public boolean matches(RelOptRuleCall call) {
-    final ScanPrel scan = (ScanPrel) call.rel(1);
-    if (scan.getGroupScan() instanceof MapRDBGroupScan) {
-      return super.matches(call);
+    // Depending on whether is a project in the middle, assign either scan or 
copy of project to childRel.
+    final RelNode childRel = project == null ? newScanPrel : 
project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));;
+
+    if (maprdbFilterBuilder.isAllExpressionsConverted()) {
+        /*
+         * Since we could convert the entire filter condition expression into 
an HBase filter,
+         * we can eliminate the filter operator altogether.
+         */
+      call.transformTo(childRel);
+    } else {
+      call.transformTo(filter.copy(filter.getTraitSet(), 
ImmutableList.of(childRel)));
     }
-    return false;
   }
 
 }

Reply via email to