This is an automated email from the ASF dual-hosted git repository.

amansinha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 0abcbe3f36bf6c0a2b5fe07a778d201ead8dd2ce
Author: rebase <bui...@mapr.com>
AuthorDate: Mon Feb 12 14:10:56 2018 -0800

    DRILL-6381: (Part 1) Secondary Index framework
    
      1. Secondary Index planning interfaces and abstract classes like 
DBGroupScan, DbSubScan, IndexDecriptor etc.
      2. Statistics and Cost model interfaces/classes: PluginCost, Statistics, 
StatisticsPayload, AbstractIndexStatistics
      3. ScanBatch and RecordReader to support repeatable scan
      4. Secondary Index execution related interfaces: RangePartitionSender, 
RowKeyJoin, PartitionFunction
      5. MD-3979: Query using cast index plan fails with NPE
    
    Co-authored-by: Aman Sinha <asi...@maprtech.com>
    Co-authored-by: chunhui-shi <c...@maprtech.com>
    Co-authored-by: Gautam Parai <gpa...@maprtech.com>
    Co-authored-by: Padma Penumarthy <ppenuma...@yahoo.com>
    Co-authored-by: Hanumath Rao Maduri <hmad...@maprtech.com>
    
    Conflicts:
        
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
        
exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
        
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
        protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
        
protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
        protocol/src/main/protobuf/UserBitShared.proto
---
 .../java/org/apache/drill/exec/ExecConstants.java  |   2 +
 .../exec/physical/base/AbstractDbGroupScan.java    |  95 +++++++
 .../exec/physical/base/AbstractDbSubScan.java      |  37 +++
 .../physical/base/AbstractPhysicalVisitor.java     |  12 +
 .../drill/exec/physical/base/DbGroupScan.java      | 129 +++++++++
 .../apache/drill/exec/physical/base/DbSubScan.java |  43 +++
 .../drill/exec/physical/base/IndexGroupScan.java   |  76 ++++++
 .../drill/exec/physical/base/PhysicalVisitor.java  |   4 +
 ...{RangeSender.java => RangePartitionSender.java} |  52 ++--
 .../apache/drill/exec/physical/impl/ScanBatch.java |  15 +-
 .../drill/exec/physical/impl/join/RowKeyJoin.java  |  79 ++++++
 .../drill/exec/planner/common/DrillRelOptUtil.java |  17 ++
 .../apache/drill/exec/planner/cost/PluginCost.java |  79 ++++++
 .../planner/index/AbstractIndexCollection.java     |  96 +++++++
 .../planner/index/AbstractIndexDescriptor.java     |  74 ++++++
 .../planner/index/AbstractIndexStatistics.java     |  51 ++++
 .../drill/exec/planner/index/CollationContext.java |  37 +++
 .../exec/planner/index/DrillIndexCollection.java   |  75 ++++++
 .../exec/planner/index/DrillIndexDefinition.java   | 278 ++++++++++++++++++++
 .../exec/planner/index/DrillIndexDescriptor.java   | 110 ++++++++
 .../exec/planner/index/FunctionalIndexInfo.java    |  85 ++++++
 .../drill/exec/planner/index/IndexCallContext.java |  76 ++++++
 .../drill/exec/planner/index/IndexCollection.java  |  99 +++++++
 .../drill/exec/planner/index/IndexDefinition.java  | 105 ++++++++
 .../drill/exec/planner/index/IndexDescriptor.java  |  68 +++++
 .../drill/exec/planner/index/IndexDiscover.java    |  23 ++
 .../exec/planner/index/IndexDiscoverBase.java      | 110 ++++++++
 .../exec/planner/index/IndexDiscoverable.java      |  37 +++
 .../drill/exec/planner/index/IndexGroup.java       |  63 +++++
 .../drill/exec/planner/index/IndexProperties.java  |  64 +++++
 .../drill/exec/planner/index/IndexStatistics.java  |  36 +++
 .../exec/planner/index/IndexableExprMarker.java    | 262 +++++++++++++++++++
 .../index/InvalidIndexDefinitionException.java     |  27 ++
 .../drill/exec/planner/index/Statistics.java       |  66 +++++
 .../exec/planner/index/StatisticsPayload.java      |  24 ++
 .../drill/exec/planner/logical/DrillTable.java     |   4 +
 .../exec/planner/physical/PartitionFunction.java   |  56 ++++
 .../drill/exec/record/AbstractRecordBatch.java     |   2 +-
 .../drill/exec/store/AbstractRecordReader.java     |   5 +
 .../org/apache/drill/exec/store/RecordReader.java  |   8 +
 .../drill/exec/util/EncodedSchemaPathSet.java      | 291 +++++++++++++++++++++
 .../org/apache/drill/exec/proto/UserBitShared.java |  75 +++---
 .../drill/exec/proto/beans/CoreOperatorType.java   |   8 +-
 protocol/src/main/protobuf/UserBitShared.proto     |   3 +-
 44 files changed, 2893 insertions(+), 65 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 21e16eb..cb0fc5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -807,6 +807,8 @@ public final class ExecConstants {
    */
   public static final String ENABLE_ITERATOR_VALIDATION = 
"drill.exec.debug.validate_iterators";
 
+  public static final String QUERY_ROWKEYJOIN_BATCHSIZE_KEY = 
"exec.query.rowkeyjoin_batchsize";
+  public static final PositiveLongValidator QUERY_ROWKEYJOIN_BATCHSIZE = new 
PositiveLongValidator(QUERY_ROWKEYJOIN_BATCHSIZE_KEY, Long.MAX_VALUE, null);
   /**
    * When iterator validation is enabled, additionally validates the vectors in
    * each batch passed to each iterator.
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbGroupScan.java
new file mode 100644
index 0000000..42e4bb9
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbGroupScan.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.planner.index.IndexCollection;
+import org.apache.drill.exec.planner.cost.PluginCost;
+import org.apache.drill.exec.planner.physical.PartitionFunction;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+
+public abstract class AbstractDbGroupScan extends AbstractGroupScan implements 
DbGroupScan {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractDbGroupScan.class);
+
+  private static final String ROW_KEY = "_id";
+  private static final SchemaPath ROW_KEY_PATH = 
SchemaPath.getSimplePath(ROW_KEY);
+
+  public AbstractDbGroupScan(String userName) {
+    super(userName);
+  }
+
+  public AbstractDbGroupScan(AbstractDbGroupScan that) {
+    super(that);
+  }
+
+  public abstract AbstractStoragePlugin getStoragePlugin();
+
+  public abstract StoragePluginConfig getStorageConfig();
+
+  public abstract List<SchemaPath> getColumns();
+
+  @Override
+  public boolean supportsSecondaryIndex() {
+    return false;
+  }
+
+  @Override
+  public IndexCollection getSecondaryIndexCollection(RelNode scanrel) {
+    return null;
+  }
+
+  @Override
+  public boolean supportsRestrictedScan() {
+    return false;
+  }
+
+  @Override
+  public boolean isRestrictedScan() {
+    return false;
+  }
+
+  @Override
+  public DbGroupScan getRestrictedScan(List<SchemaPath> columns) {
+    return null;
+  }
+
+  @Override
+  public String getRowKeyName() {
+    return ROW_KEY;
+  }
+
+  @Override
+  public SchemaPath getRowKeyPath() {
+    return ROW_KEY_PATH;
+  }
+
+  @Override
+  public PartitionFunction getRangePartitionFunction(List<FieldReference> 
refList) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public PluginCost getPluginCostModel() {
+    return null;
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbSubScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbSubScan.java
new file mode 100644
index 0000000..caa5831
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbSubScan.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+import org.apache.drill.exec.physical.impl.join.RowKeyJoin;
+
+public abstract class AbstractDbSubScan extends AbstractSubScan implements 
DbSubScan {
+
+  public AbstractDbSubScan(String userName) {
+    super(userName);
+  }
+
+  public boolean isRestrictedSubScan() {
+    return false;
+  }
+
+  @Override
+  public void addJoinForRestrictedSubScan(RowKeyJoin batch) {
+    throw new UnsupportedOperationException();
+  }
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 340c303..ca82ca6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -22,6 +22,7 @@ import org.apache.drill.exec.physical.config.Filter;
 import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.HashToRandomExchange;
 import org.apache.drill.exec.physical.config.IteratorValidator;
 import org.apache.drill.exec.physical.config.LateralJoinPOP;
 import org.apache.drill.exec.physical.config.Limit;
@@ -29,6 +30,7 @@ import 
org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.physical.config.OrderedPartitionSender;
 import org.apache.drill.exec.physical.config.ProducerConsumer;
 import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.RangePartitionSender;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.physical.config.Sort;
@@ -157,6 +159,16 @@ public abstract class AbstractPhysicalVisitor<T, X, E 
extends Throwable> impleme
   }
 
   @Override
+  public T visitHashPartitionSender(HashToRandomExchange op, X value) throws E 
{
+    return visitExchange(op, value);
+  }
+
+  @Override
+  public T visitRangePartitionSender(RangePartitionSender op, X value) throws 
E {
+    return visitSender(op, value);
+  }
+
+  @Override
   public T visitBroadcastSender(BroadcastSender op, X value) throws E {
     return visitSender(op, value);
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbGroupScan.java
new file mode 100644
index 0000000..e16fba1
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbGroupScan.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.planner.index.IndexCollection;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.exec.planner.cost.PluginCost;
+import org.apache.drill.exec.planner.physical.PartitionFunction;
+import org.apache.drill.exec.planner.index.Statistics;
+
+import java.util.List;
+
+/**
+ * A DbGroupScan operator represents the scan associated with a database. The 
underlying
+ * database may support secondary indexes, so there are interface methods for 
indexes.
+ */
+public interface DbGroupScan extends GroupScan {
+
+
+  @JsonIgnore
+  public boolean supportsSecondaryIndex();
+
+  /**
+   * Get the index collection associated with this table if any
+   */
+  @JsonIgnore
+  public IndexCollection getSecondaryIndexCollection(RelNode scan);
+
+  /**
+   * Set the artificial row count after applying the {@link RexNode} condition
+   * @param condition
+   * @param count
+   * @param capRowCount
+   */
+  @JsonIgnore
+  public void setRowCount(RexNode condition, double count, double capRowCount);
+
+  /**
+   * Get the row count after applying the {@link RexNode} condition
+   * @param condition, filter to apply
+   * @param scanRel, the current scan rel
+   * @return row count post filtering
+   */
+  @JsonIgnore
+  public double getRowCount(RexNode condition, RelNode scanRel);
+
+  /**
+   * Get the statistics for this {@link DbGroupScan}
+   * @return the {@link Statistics} for this Scan
+   */
+  @JsonIgnore
+  public Statistics getStatistics();
+
+  public List<SchemaPath> getColumns();
+
+  public void setCostFactor(double sel);
+
+  @JsonIgnore
+  boolean isIndexScan();
+
+  /**
+   * Whether this DbGroupScan supports creating a restricted (skip) scan
+   * @return true if restricted scan is supported, false otherwise
+   */
+  @JsonIgnore
+  boolean supportsRestrictedScan();
+
+  /**
+   * Whether this DbGroupScan is itself a restricted scan
+   * @return true if this DbGroupScan is itself a restricted scan, false 
otherwise
+   */
+  @JsonIgnore
+  boolean isRestrictedScan();
+
+  /**
+   * If this DbGroupScan supports restricted scan, create a restricted scan 
from this DbGroupScan.
+   * @param columns
+   * @return a non-null DbGroupScan if restricted scan is supported, null 
otherwise
+   */
+  @JsonIgnore
+  DbGroupScan getRestrictedScan(List<SchemaPath> columns);
+
+  @JsonIgnore
+  String getRowKeyName();
+
+  @JsonIgnore
+  String getIndexHint();
+
+  @JsonIgnore
+  SchemaPath getRowKeyPath();
+
+  /**
+   * Get a partition function instance for range based partitioning
+   * @param refList a list of FieldReference exprs that are participating in 
the range partitioning
+   * @return instance of a partitioning function
+   */
+  @JsonIgnore
+  PartitionFunction getRangePartitionFunction(List<FieldReference> refList);
+
+  /**
+   * Get the format plugin cost model. The cost model will provide cost 
factors such as seq. scan cost,
+   * random scan cost, block size.
+   * @return a PluginCost cost model
+   */
+  @JsonIgnore
+  PluginCost getPluginCostModel();
+
+  @JsonIgnore
+  boolean isFilterPushedDown();
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbSubScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbSubScan.java
new file mode 100644
index 0000000..874468d
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbSubScan.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+import org.apache.drill.exec.physical.impl.join.RowKeyJoin;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+
+public interface DbSubScan extends SubScan {
+
+  /**
+   * Whether this subscan is a restricted (skip) subscan
+   * @return true if this subscan is a restricted subscan, false otherwise
+   */
+  @JsonIgnore
+  boolean isRestrictedSubScan();
+
+  /**
+   * For a restricted sub-scan, this method allows associating a (hash)join 
instance.  A subscan within a minor
+   * fragment must have a corresponding (hash)join batch instance from which 
it will retrieve its set of
+   * rowkeys to perform the restricted scan.
+   * @param batch
+   */
+  @JsonIgnore
+  void addJoinForRestrictedSubScan(RowKeyJoin batch);
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/IndexGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/IndexGroupScan.java
new file mode 100644
index 0000000..1047e82
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/IndexGroupScan.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.planner.index.Statistics;
+
+
+import java.util.List;
+
+/**
+ * An IndexGroupScan operator represents the scan associated with an Index.
+ */
+public interface IndexGroupScan extends GroupScan {
+
+  /**
+   * Get the column ordinal of the rowkey column from the output schema of the 
IndexGroupScan
+   * @return
+   */
+  @JsonIgnore
+  public int getRowKeyOrdinal();
+
+  /**
+   * Set the artificial row count after applying the {@link RexNode} condition
+   * Mainly used for debugging
+   * @param condition
+   * @param count
+   * @param capRowCount
+   */
+  @JsonIgnore
+  public void setRowCount(RexNode condition, double count, double capRowCount);
+
+  /**
+   * Get the row count after applying the {@link RexNode} condition
+   * @param condition, filter to apply
+   * @return row count post filtering
+   */
+  @JsonIgnore
+  public double getRowCount(RexNode condition, RelNode scanRel);
+
+  /**
+   * Set the statistics for {@link IndexGroupScan}
+   * @param statistics
+   */
+  @JsonIgnore
+  public void setStatistics(Statistics statistics);
+
+  @JsonIgnore
+  public void setColumns(List<SchemaPath> columns);
+
+  @JsonIgnore
+  public List<SchemaPath> getColumns();
+
+  @JsonIgnore
+  public void setParallelizationWidth(int width);
+
+}
\ No newline at end of file
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index f2e53eb..1bb1545 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -22,6 +22,7 @@ import org.apache.drill.exec.physical.config.Filter;
 import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.HashToRandomExchange;
 import org.apache.drill.exec.physical.config.IteratorValidator;
 import org.apache.drill.exec.physical.config.LateralJoinPOP;
 import org.apache.drill.exec.physical.config.Limit;
@@ -29,6 +30,7 @@ import 
org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.physical.config.OrderedPartitionSender;
 import org.apache.drill.exec.physical.config.ProducerConsumer;
 import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.RangePartitionSender;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.physical.config.Sort;
@@ -73,6 +75,8 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
   public RETURN visitOrderedPartitionSender(OrderedPartitionSender op, EXTRA 
value) throws EXCEP;
   public RETURN visitUnorderedReceiver(UnorderedReceiver op, EXTRA value) 
throws EXCEP;
   public RETURN visitMergingReceiver(MergingReceiverPOP op, EXTRA value) 
throws EXCEP;
+  public RETURN visitHashPartitionSender(HashToRandomExchange op, EXTRA value) 
throws EXCEP;
+  public RETURN visitRangePartitionSender(RangePartitionSender op, EXTRA 
value) throws EXCEP;
   public RETURN visitBroadcastSender(BroadcastSender op, EXTRA value) throws 
EXCEP;
   public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP;
   public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangePartitionSender.java
similarity index 50%
rename from 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
rename to 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangePartitionSender.java
index 88c3be0..0c0852a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangePartitionSender.java
@@ -17,57 +17,57 @@
  */
 package org.apache.drill.exec.physical.config;
 
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.base.AbstractSender;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.planner.physical.PartitionFunction;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
-@JsonTypeName("range-sender")
-public class RangeSender extends AbstractSender{
+@JsonTypeName("range-partition-sender")
+public class RangePartitionSender extends AbstractSender{
 
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RangeSender.class);
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RangePartitionSender.class);
 
-  List<EndpointPartition> partitions;
+  // The number of records in the outgoing batch. This is overriding the 
default value in Partitioner
+  public static final int RANGE_PARTITION_OUTGOING_BATCH_SIZE = (1 << 12) - 1;
+
+  @JsonProperty("partitionFunction")
+  private PartitionFunction partitionFunction;
 
   @JsonCreator
-  public RangeSender(@JsonProperty("receiver-major-fragment") int 
oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, 
@JsonProperty("partitions") List<EndpointPartition> partitions) {
-    super(oppositeMajorFragmentId, child, 
Collections.<MinorFragmentEndpoint>emptyList());
-    this.partitions = partitions;
+  public RangePartitionSender(@JsonProperty("receiver-major-fragment") int 
oppositeMajorFragmentId,
+                              @JsonProperty("child") PhysicalOperator child,
+                              @JsonProperty("destinations") 
List<MinorFragmentEndpoint> endpoints,
+                              @JsonProperty("partitionFunction") 
PartitionFunction partitionFunction) {
+    super(oppositeMajorFragmentId, child, endpoints);
+    this.partitionFunction = partitionFunction;
   }
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new RangeSender(oppositeMajorFragmentId, child, partitions);
+    return new RangePartitionSender(oppositeMajorFragmentId, child, 
destinations, partitionFunction);
   }
 
-  public static class EndpointPartition{
-    private final PartitionRange range;
-    private final DrillbitEndpoint endpoint;
+  @JsonProperty("partitionFunction")
+  public PartitionFunction getPartitionFunction() {
+    return partitionFunction;
+  }
 
-    @JsonCreator
-    public EndpointPartition(@JsonProperty("range") PartitionRange range, 
@JsonProperty("endpoint") DrillbitEndpoint endpoint) {
-      super();
-      this.range = range;
-      this.endpoint = endpoint;
-    }
-    public PartitionRange getRange() {
-      return range;
-    }
-    public DrillbitEndpoint getEndpoint() {
-      return endpoint;
-    }
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
+    return physicalVisitor.visitRangePartitionSender(this, value);
   }
 
   @Override
   public int getOperatorType() {
-    return CoreOperatorType.RANGE_SENDER_VALUE;
+    return CoreOperatorType.RANGE_PARTITION_SENDER_VALUE;
   }
+
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index dc8dd0f..5ccf1c0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -83,10 +83,14 @@ public class ScanBatch implements CloseableRecordBatch {
   private final List<Map<String, String>> implicitColumnList;
   private String currentReaderClassName;
   private final RecordBatchStatsContext batchStatsContext;
+
   // Represents last outcome of next(). If an Exception is thrown
   // during the method's execution a value IterOutcome.STOP will be assigned.
   private IterOutcome lastOutcome;
 
+  private List<RecordReader> readerList = null; // needed for repeatable 
scanners
+  private boolean isRepeatableScan = false;     // needed for repeatable 
scanners
+
   /**
    *
    * @param context
@@ -137,6 +141,15 @@ public class ScanBatch implements CloseableRecordBatch {
         readers, Collections.<Map<String, String>> emptyList());
   }
 
+  public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
+                   List<RecordReader> readerList, boolean isRepeatableScan)
+      throws ExecutionSetupException {
+    this(context, context.newOperatorContext(subScanConfig),
+        readerList, Collections.<Map<String, String>> emptyList());
+    this.readerList = readerList;
+    this.isRepeatableScan = isRepeatableScan;
+  }
+
   @Override
   public FragmentContext getContext() {
     return context;
@@ -255,7 +268,7 @@ public class ScanBatch implements CloseableRecordBatch {
       return false;
     }
     currentReader = readers.next();
-    if (readers.hasNext()) {
+    if (!isRepeatableScan && readers.hasNext()) {
       readers.remove();
     }
     implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoin.java
new file mode 100644
index 0000000..7b4dfca
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoin.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.exec.record.AbstractRecordBatch.BatchState;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Interface for a row key join
+ */
+public interface RowKeyJoin {
+
+  /**
+   * Enum for RowKeyJoin internal state.
+   * Possible states are {INITIAL, PROCESSING, DONE}
+   *
+   * Initially RowKeyJoin will be at INITIAL state. Then the state will be 
transitioned
+   * by the RestrictedJsonRecordReader to PROCESSING as soon as it processes 
the rows
+   * related to RowKeys. Then RowKeyJoin algorithm sets to INITIAL state when 
leftStream has no data.
+   * Basically RowKeyJoin calls leftStream multiple times depending upon the 
rightStream, hence
+   * this transition from PROCESSING to INITIAL. If there is no data from 
rightStream or OutOfMemory
+   * condition then the state is transitioned to DONE.
+   */
+  public enum RowKeyJoinState {
+    INITIAL, PROCESSING, DONE;
+  }
+
+  /**
+   * Is the next batch of row keys ready to be returned
+   * @return True if ready, false if not
+   */
+  public boolean hasRowKeyBatch();
+
+  /**
+   * Get the next batch of row keys
+   * @return a Pair whose left element is the ValueVector containing the row 
keys, right
+   *    element is the number of row keys in this batch
+   */
+  public Pair<ValueVector, Integer> nextRowKeyBatch();
+
+
+  /**
+   * Get the current BatchState (this is useful when performing row key join)
+   */
+  public BatchState getBatchState();
+
+  /**
+   * Set the BatchState (this is useful when performing row key join)
+   * @param newState
+   */
+  public void setBatchState(BatchState newState);
+
+  /**
+   * Set the RowKeyJoinState (this is useful for maintaining state for row key 
join algorithm)
+   * @param newState
+   */
+  public void setRowKeyJoinState(RowKeyJoinState newState);
+
+  /**
+   * Get the current RowKeyJoinState.
+   */
+  public RowKeyJoinState getRowKeyJoinState();
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
index 96c3112..b39328e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
@@ -396,4 +396,21 @@ public abstract class DrillRelOptUtil {
       }
     }
   }
+
+  public static boolean isProjectFlatten(RelNode project) {
+
+    assert project instanceof Project : "Rel is NOT an instance of project!";
+
+    for (RexNode rex : project.getChildExps()) {
+      RexNode newExpr = rex;
+      if (rex instanceof RexCall) {
+        RexCall function = (RexCall) rex;
+        String functionName = function.getOperator().getName();
+        if (functionName.equalsIgnoreCase("flatten") ) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PluginCost.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PluginCost.java
new file mode 100644
index 0000000..d765162
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PluginCost.java
@@ -0,0 +1,79 @@
+package org.apache.drill.exec.planner.cost;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.drill.exec.physical.base.GroupScan;
+
+/**
+ * PluginCost describes the cost factors to be used when costing for the 
specific storage/format plugin
+ */
+public interface PluginCost {
+  org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PluginCost.class);
+
+  /**
+   * An interface to check if a parameter provided by user is valid or not.
+   * @param <T> Type of the parameter.
+   */
+  interface CheckValid<T> {
+    boolean isValid(T paramValue);
+  }
+
+  /**
+   * Class which checks whether the provided parameter value is greater than
+   * or equals to a minimum limit.
+   */
+  class greaterThanEquals implements CheckValid<Integer> {
+    private final Integer atleastEqualsTo;
+    public greaterThanEquals(Integer atleast) {
+      atleastEqualsTo = atleast;
+    }
+
+    @Override
+    public boolean isValid(Integer paramValue) {
+      if (paramValue >= atleastEqualsTo &&
+          paramValue <= Integer.MAX_VALUE) {
+        return true;
+      } else {
+        logger.warn("Setting default value as the supplied parameter value is 
less than {}", paramValue);
+        return false;
+      }
+    }
+  }
+
+  /**
+   * @return the average column size in bytes
+   */
+  int getAverageColumnSize(GroupScan scan);
+
+  /**
+   * @return the block size in bytes
+   */
+  int getBlockSize(GroupScan scan);
+
+  /**
+   * @return the sequential block read cost
+   */
+  int getSequentialBlockReadCost(GroupScan scan);
+
+  /**
+   * @return the random block read cost
+   */
+  int getRandomBlockReadCost(GroupScan scan);
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexCollection.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexCollection.java
new file mode 100644
index 0000000..9894b32
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexCollection.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import java.util.Iterator;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.SchemaPath;
+
+/**
+ * Abstract base class for Index collection (collection of Index descriptors)
+ *
+ */
+public abstract class AbstractIndexCollection implements IndexCollection, 
Iterable<IndexDescriptor> {
+
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractIndexCollection.class);
+  /**
+   * A set of indexes for a particular table
+   */
+  @JsonProperty
+  protected List<IndexDescriptor> indexes;
+
+  public AbstractIndexCollection() {
+    indexes = Lists.newArrayList();
+  }
+
+  @Override
+  public boolean addIndex(IndexDescriptor index) {
+    return indexes.add(index);
+  }
+
+  @Override
+  public boolean removeIndex(IndexDescriptor index) {
+    return indexes.remove(index);
+  }
+
+  @Override
+  public void clearAll() {
+    indexes.clear();
+  }
+
+  @Override
+  public boolean supportsIndexSelection() {
+    return false;
+  }
+
+  @Override
+  public double getRows(RexNode indexCondition) {
+    throw new UnsupportedOperationException("getRows() not supported for this 
index collection.");
+  }
+
+  @Override
+  public boolean supportsRowCountStats() {
+    return false;
+  }
+
+  @Override
+  public boolean supportsFullTextSearch() {
+    return false;
+  }
+
+  @Override
+  public boolean isColumnIndexed(SchemaPath path) {
+    for (IndexDescriptor index : indexes) {
+      if (index.getIndexColumnOrdinal(path) >= 0) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public Iterator<IndexDescriptor> iterator() {
+    return indexes.iterator();
+  }
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java
new file mode 100644
index 0000000..f908ead
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.IndexGroupScan;
+
+/**
+ * Abstract base class for an Index descriptor
+ *
+ */
+public abstract class AbstractIndexDescriptor extends DrillIndexDefinition 
implements IndexDescriptor {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractIndexDescriptor .class);
+
+  public AbstractIndexDescriptor(List<LogicalExpression> indexCols,
+                                 CollationContext indexCollationContext,
+                                 List<LogicalExpression> nonIndexCols,
+                                 List<LogicalExpression> rowKeyColumns,
+                                 String indexName,
+                                 String tableName,
+                                 IndexType type,
+                                 NullDirection nullsDirection) {
+    super(indexCols, indexCollationContext, nonIndexCols, rowKeyColumns, 
indexName, tableName, type, nullsDirection);
+  }
+
+  @Override
+  public double getRows(RelNode scan, RexNode indexCondition) {
+    throw new UnsupportedOperationException("getRows() not supported for this 
index.");
+  }
+
+  @Override
+  public boolean supportsRowCountStats() {
+    return false;
+  }
+
+  @Override
+  public IndexGroupScan getIndexGroupScan() {
+    throw new UnsupportedOperationException("Group scan not supported for this 
index.");
+  }
+
+  @Override
+  public boolean supportsFullTextSearch() {
+    return false;
+  }
+
+  @Override
+  public RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner,
+      int numProjectedFields, GroupScan primaryGroupScan) {
+    throw new UnsupportedOperationException("getCost() not supported for this 
index.");
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexStatistics.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexStatistics.java
new file mode 100644
index 0000000..dfc0897
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexStatistics.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public abstract class AbstractIndexStatistics implements IndexStatistics {
+
+    protected static final Logger logger = 
LoggerFactory.getLogger(AbstractIndexStatistics.class);
+    protected final RelNode input;
+    protected final RexNode condition;
+    protected final DrillTable table;
+
+    public AbstractIndexStatistics(RelNode input, RexNode condition, 
DrillTable table) {
+            this.input = input;
+            this.condition = condition;
+            this.table = table;
+    }
+    public abstract double getRowCount();
+
+    public List<RelCollation> getCollations() {
+        throw new UnsupportedOperationException();
+    }
+
+    public RelDistribution getDistribution() {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/CollationContext.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/CollationContext.java
new file mode 100644
index 0000000..8260bee
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/CollationContext.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.drill.common.expression.LogicalExpression;
+
+import java.util.List;
+import java.util.Map;
+
+public class CollationContext {
+
+  public final Map<LogicalExpression, RelFieldCollation> collationMap;
+  public final List<RelFieldCollation> relFieldCollations;
+
+  public CollationContext(Map<LogicalExpression, RelFieldCollation> 
collationMap,
+      List<RelFieldCollation> relFieldCollations) {
+    this.collationMap = collationMap;
+    this.relFieldCollations = relFieldCollations;
+  }
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexCollection.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexCollection.java
new file mode 100644
index 0000000..0ea3d83
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexCollection.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.physical.base.IndexGroupScan;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.Set;
+
+public class DrillIndexCollection extends AbstractIndexCollection {
+  private final RelNode scan;  // physical scan rel corresponding to the 
primary table
+
+  public DrillIndexCollection(RelNode scanRel,
+                               Set<DrillIndexDescriptor> indexes) {
+    this.scan = scanRel;
+    for (IndexDescriptor index : indexes) {
+      super.addIndex(index);
+    }
+  }
+
+  private IndexDescriptor getIndexDescriptor() {
+
+    //XXX need a policy to pick the indexDesc to use instead of picking the 
first one.
+    return this.indexes.iterator().next();
+  }
+
+  @Override
+  public boolean supportsIndexSelection() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsRowCountStats() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsFullTextSearch() {
+    return true;
+  }
+
+  @Override
+  public double getRows(RexNode indexCondition) {
+
+    return getIndexDescriptor().getRows(scan, indexCondition);
+  }
+
+  @Override
+  public IndexGroupScan getGroupScan() {
+    return getIndexDescriptor().getIndexGroupScan();
+  }
+
+  @Override
+  public IndexCollectionType getIndexCollectionType() {
+    return 
IndexCollection.IndexCollectionType.EXTERNAL_SECONDARY_INDEX_COLLECTION;
+  }
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDefinition.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDefinition.java
new file mode 100644
index 0000000..03c2a44
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDefinition.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class DrillIndexDefinition implements IndexDefinition {
+  /**
+   * The indexColumns is the list of column(s) on which this index is created. 
If there is more than 1 column,
+   * the order of the columns is important: index on {a, b} is not the same as 
index on {b, a}
+   * NOTE: the indexed column could be of type columnfamily.column
+   */
+  @JsonProperty
+  protected final List<LogicalExpression> indexColumns;
+
+  /**
+   * nonIndexColumns: the list of columns that are included in the index as 
'covering'
+   * columns but are not themselves indexed.  These are useful for covering 
indexes where the
+   * query request can be satisfied directly by the index and avoid accessing 
the table altogether.
+   */
+  @JsonProperty
+  protected final List<LogicalExpression> nonIndexColumns;
+
+  @JsonIgnore
+  protected final Set<LogicalExpression> allIndexColumns;
+
+  @JsonProperty
+  protected final List<LogicalExpression> rowKeyColumns;
+
+  @JsonProperty
+  protected final CollationContext indexCollationContext;
+
+  /**
+   * indexName: name of the index that should be unique within the scope of a 
table
+   */
+  @JsonProperty
+  protected final String indexName;
+
+  protected final String tableName;
+
+  @JsonProperty
+  protected final IndexDescriptor.IndexType indexType;
+
+  @JsonProperty
+  protected final NullDirection nullsDirection;
+
+  public DrillIndexDefinition(List<LogicalExpression> indexCols,
+                              CollationContext indexCollationContext,
+                              List<LogicalExpression> nonIndexCols,
+                              List<LogicalExpression> rowKeyColumns,
+                              String indexName,
+                              String tableName,
+                              IndexType type,
+                              NullDirection nullsDirection) {
+    this.indexColumns = indexCols;
+    this.nonIndexColumns = nonIndexCols;
+    this.rowKeyColumns = rowKeyColumns;
+    this.indexName = indexName;
+    this.tableName = tableName;
+    this.indexType = type;
+    this.allIndexColumns = Sets.newHashSet(indexColumns);
+    this.allIndexColumns.addAll(nonIndexColumns);
+    this.indexCollationContext = indexCollationContext;
+    this.nullsDirection = nullsDirection;
+
+  }
+
+  @Override
+  public int getIndexColumnOrdinal(LogicalExpression path) {
+    int id = indexColumns.indexOf(path);
+    return id;
+  }
+
+  @Override
+  public boolean isCoveringIndex(List<LogicalExpression> columns) {
+    return allIndexColumns.containsAll(columns);
+  }
+
+  @Override
+  public boolean allColumnsIndexed(Collection<LogicalExpression> columns) {
+    return columnsInIndexFields(columns, indexColumns);
+  }
+
+  @Override
+  public boolean someColumnsIndexed(Collection<LogicalExpression> columns) {
+    return someColumnsInIndexFields(columns, indexColumns);
+  }
+
+  public boolean pathExactIn(SchemaPath path, Collection<LogicalExpression> 
exprs) {
+    for (LogicalExpression expr : exprs) {
+      if (expr instanceof SchemaPath) {
+        if (((SchemaPath) expr).toExpr().equals(path.toExpr())) {
+          return true;
+        }
+      }
+    }
+
+    return false;
+  }
+
+  boolean castIsCompatible(CastExpression castExpr, 
Collection<LogicalExpression> indexFields) {
+    for(LogicalExpression indexExpr : indexFields) {
+      if(indexExpr.getClass() != castExpr.getClass()) {
+        continue;
+      }
+      CastExpression indexCastExpr = (CastExpression)indexExpr;
+      //we compare input using equals because we know we are comparing 
SchemaPath,
+      //if we extend to support other expression, make sure the equals of that 
expression
+      //is implemented properly, otherwise it will fall to identity comparison
+      if ( !castExpr.getInput().equals(indexCastExpr.getInput()) ) {
+          continue;
+      }
+
+      if( castExpr.getMajorType().getMinorType() != 
indexCastExpr.getMajorType().getMinorType()) {
+        continue;
+      }
+      return true;
+    }
+    return false;
+  }
+
+  protected boolean columnsInIndexFields(Collection<LogicalExpression> 
columns, Collection<LogicalExpression> indexFields) {
+    //we need to do extra check, so we could allow the case when query 
condition expression is not identical with indexed fields
+    //and they still could use the index either by implicit cast or the 
difference is allowed, e.g. width of varchar
+    for (LogicalExpression col : columns) {
+      if (col instanceof CastExpression) {
+        if (!castIsCompatible((CastExpression) col, indexFields)) {
+          return false;
+        }
+      }
+      else {
+        if (!pathExactIn((SchemaPath)col, indexFields)) {
+          return false;
+        }
+      }
+    }
+    return true;//indexFields.containsAll(columns);
+  }
+
+  protected boolean someColumnsInIndexFields(Collection<LogicalExpression> 
columns,
+      Collection<LogicalExpression> indexFields) {
+
+    //we need to do extra check, so we could allow the case when query 
condition expression is not identical with indexed fields
+    //and they still could use the index either by implicit cast or the 
difference is allowed, e.g. width of varchar
+    for (LogicalExpression col : columns) {
+      if (col instanceof CastExpression) {
+        if (castIsCompatible((CastExpression) col, indexFields)) {
+          return true;
+        }
+      }
+      else {
+        if (pathExactIn((SchemaPath)col, indexFields)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    String columnsDesc = " Index columns: " + indexColumns.toString() + " 
Non-Index columns: " + nonIndexColumns.toString();
+    String desc = "Table: " + tableName + " Index: " + indexName + columnsDesc;
+    return desc;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null) {
+      return false;
+    }
+    DrillIndexDefinition index1 = (DrillIndexDefinition) o;
+    return tableName.equals(index1.tableName)
+        && indexName.equals(index1.indexName)
+        && indexType.equals(index1.indexType)
+        && indexColumns.equals(index1.indexColumns);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    final String fullName = tableName + indexName;
+    int result = 1;
+    result = prime * result + fullName.hashCode();
+    result = prime * result + indexType.hashCode();
+
+    return result;
+  }
+
+  @Override
+  @JsonProperty
+  public String getIndexName() {
+    return indexName;
+  }
+
+  @Override
+  public String getTableName() {
+    return tableName;
+  }
+
+  @Override
+  @JsonProperty
+  public IndexDescriptor.IndexType getIndexType() {
+    return indexType;
+  }
+
+  @Override
+  @JsonProperty
+  public List<LogicalExpression> getRowKeyColumns() {
+    return this.rowKeyColumns;
+  }
+
+  @Override
+  @JsonProperty
+  public List<LogicalExpression> getIndexColumns() {
+    return this.indexColumns;
+  }
+
+  @Override
+  @JsonProperty
+  public List<LogicalExpression> getNonIndexColumns() {
+    return this.nonIndexColumns;
+  }
+
+  @Override
+  @JsonIgnore
+  public RelCollation getCollation() {
+    if (indexCollationContext != null) {
+      return RelCollations.of(indexCollationContext.relFieldCollations);
+    }
+    return null;
+  }
+
+  @Override
+  @JsonIgnore
+  public Map<LogicalExpression, RelFieldCollation> getCollationMap() {
+    return indexCollationContext.collationMap;
+  }
+
+  @Override
+  @JsonIgnore
+  public NullDirection getNullsOrderingDirection() {
+    return nullsDirection;
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDescriptor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDescriptor.java
new file mode 100644
index 0000000..4da62c2
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.IndexGroupScan;
+import org.apache.drill.exec.planner.cost.PluginCost;
+import org.apache.drill.exec.planner.logical.DrillTable;
+
+import java.io.IOException;
+import java.util.List;
+
+public class DrillIndexDescriptor extends AbstractIndexDescriptor {
+
+  /**
+   * The name of Drill's Storage Plugin on which the Index was stored
+   */
+  private String storage;
+
+  private DrillTable table;
+
+  public DrillIndexDescriptor(List<LogicalExpression> indexCols,
+                              CollationContext indexCollationContext,
+                              List<LogicalExpression> nonIndexCols,
+                              List<LogicalExpression> rowKeyColumns,
+                              String indexName,
+                              String tableName,
+                              IndexType type,
+                              NullDirection nullsDirection) {
+    super(indexCols, indexCollationContext, nonIndexCols, rowKeyColumns, 
indexName, tableName, type, nullsDirection);
+  }
+
+  public DrillIndexDescriptor(DrillIndexDefinition def) {
+    this(def.indexColumns, def.indexCollationContext, def.nonIndexColumns, 
def.rowKeyColumns, def.indexName,
+        def.getTableName(), def.getIndexType(), def.nullsDirection);
+  }
+
+  @Override
+  public double getRows(RelNode scan, RexNode indexCondition) {
+    //TODO: real implementation is to use Drill's stats implementation. for 
now return fake value 1.0
+    return 1.0;
+  }
+
+  @Override
+  public IndexGroupScan getIndexGroupScan() {
+    try {
+      final DrillTable idxTable = getDrillTable();
+      GroupScan scan = idxTable.getGroupScan();
+
+      if (!(scan instanceof IndexGroupScan)){
+        logger.error("The Groupscan from table {} is not an IndexGroupScan", 
idxTable.toString());
+        return null;
+      }
+      return (IndexGroupScan)scan;
+    }
+    catch(IOException e) {
+      logger.error("Error in getIndexGroupScan ", e);
+    }
+    return null;
+  }
+
+  public void attach(String storageName, DrillTable inTable) {
+    storage = storageName;
+    setDrillTable(inTable);
+  }
+
+  public void setStorageName(String storageName) {
+    storage = storageName;
+  }
+
+  public String getStorageName() {
+    return storage;
+  }
+
+  public void setDrillTable(DrillTable table) {
+    this.table = table;
+  }
+
+  public DrillTable getDrillTable() {
+    return this.table;
+  }
+
+  public FunctionalIndexInfo getFunctionalInfo() {
+    return null;
+  }
+
+  @Override
+  public PluginCost getPluginCostModel() {
+    return null;
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/FunctionalIndexInfo.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/FunctionalIndexInfo.java
new file mode 100644
index 0000000..a12dcc6
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/FunctionalIndexInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * FunctionalIndexInfo is to collect Functional fields in IndexDescriptor, 
derive information needed for index plan,
+ * e.g. convert and rewrite filter, columns, and rowtype on index scan that 
involve functional index.
+ * In case different store might have different way to rename expression in 
index table, we allow storage plugin
+ */
+public interface FunctionalIndexInfo {
+
+  /**
+   * @return if this index has functional indexed field, return true
+   */
+  boolean hasFunctional();
+
+  /**
+   * @return the IndexDescriptor this IndexInfo built from
+   */
+  IndexDescriptor getIndexDesc();
+
+  /**
+   * getNewPath: for an original path, return new rename '$N' path, notice 
there could be multiple renamed paths
+   * if the there are multiple functional indexes refer original path.
+   * @param path
+   * @return
+   */
+  SchemaPath getNewPath(SchemaPath path);
+
+  /**
+   * return a plain field path if the incoming index expression 'expr' is 
replaced to be a plain field
+   * @param expr suppose to be an indexed expression
+   * @return the renamed schemapath in index table for the indexed expression
+   */
+  SchemaPath getNewPathFromExpr(LogicalExpression expr);
+
+  /**
+   * @return the map of indexed expression --> the involved schema paths in a 
indexed expression
+   */
+  Map<LogicalExpression, Set<SchemaPath>> getPathsInFunctionExpr();
+
+  /**
+   * @return the map between indexed expression and to-be-converted target 
expression for scan in index
+   * e.g. cast(a.b as int) -> '$0'
+   */
+  Map<LogicalExpression, LogicalExpression> getExprMap();
+
+  /**
+   * @return the set of all new field names for indexed functions in index
+   */
+  Set<SchemaPath> allNewSchemaPaths();
+
+  /**
+   * @return the set of all schemaPath exist in functional index fields
+   */
+  Set<SchemaPath> allPathsInFunction();
+
+  /**
+   * Whether this implementation( may be different per storage) support 
rewrite rewriting varchar equality expression,
+   * e.g. cast(a.b as varchar(2)) = 'ca'  to LIKE expression: cast(a.b as 
varchar(2) LIKE 'ca%'
+   */
+  boolean supportEqualCharConvertToLike();
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java
new file mode 100644
index 0000000..65788cb
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.DbGroupScan;
+import 
org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.common.DrillProjectRelBase;
+import java.util.List;
+import java.util.Set;
+
+public interface IndexCallContext {
+  DrillScanRelBase getScan();
+
+  DbGroupScan getGroupScan();
+
+  List<RelCollation> getCollationList();
+
+  RelCollation getCollation();
+
+  boolean hasLowerProject();
+
+  boolean hasUpperProject();
+
+  RelOptRuleCall getCall();
+
+  Set<LogicalExpression> getLeftOutPathsInFunctions();
+
+  RelNode getFilter();
+
+  IndexableExprMarker getOrigMarker();
+
+  List<LogicalExpression> getSortExprs();
+
+  DrillProjectRelBase getLowerProject();
+
+  DrillProjectRelBase getUpperProject();
+
+  void setLeftOutPathsInFunctions(Set<LogicalExpression> exprs);
+
+  List<SchemaPath> getScanColumns();
+
+  RexNode getFilterCondition();
+
+  RexNode getOrigCondition();
+
+  Sort getSort();
+
+  void createSortExprs();
+
+  RelNode getExchange();
+
+  List<DistributionField> getDistributionFields();
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCollection.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCollection.java
new file mode 100644
index 0000000..9b4d170
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCollection.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.IndexGroupScan;
+
+// Interface used to describe an index collection
+public interface IndexCollection extends Iterable<IndexDescriptor> {
+  /**
+   * Types of an index collections: NATIVE_SECONDARY_INDEX_COLLECTION, 
EXTERNAL_SECONDARY_INDEX_COLLECTION
+   */
+  public static enum IndexCollectionType {
+    NATIVE_SECONDARY_INDEX_COLLECTION,
+    EXTERNAL_SECONDARY_INDEX_COLLECTION
+  };
+
+  /**
+   * Add a new index to the collection. Return True if index was successfully 
added; False otherwise
+   */
+  public boolean addIndex(IndexDescriptor index);
+
+  /**
+   * Remove an index (identified by table name and index name) from the 
collection.
+   * Return True if index was successfully removed; False otherwise
+   */
+  public boolean removeIndex(IndexDescriptor index);
+
+  /**
+   * Clears all entries from this index collection
+   */
+  public void clearAll();
+
+  /**
+   * Get the type of this index based on {@link IndexCollectionType}
+   * @return one of the values in {@link IndexCollectionType}
+   */
+  public IndexCollectionType getIndexCollectionType();
+
+  /**
+   * Whether or not this index collection supports index selection (selecting 
an
+   * appropriate index out of multiple candidates). Typically, external index 
collections
+   * such as Elasticsearch already have this capability while native secondary 
index collection
+   * may not have - in such cases, Drill needs to do the index selection.
+   */
+  public boolean supportsIndexSelection();
+
+  /**
+   * Get the estimated row count for a single index condition
+   * @param indexCondition The index condition (e.g index_col1 < 10 AND 
index_col2 = 'abc')
+   * @return The estimated row count
+   */
+  public double getRows(RexNode indexCondition);
+
+  /**
+   * Whether or not the index supports getting row count statistics
+   * @return True if index supports getting row count, False otherwise
+   */
+  public boolean supportsRowCountStats();
+
+  /**
+   * Whether or not the index supports full-text search (to allow pushing down 
such filters)
+   * @return True if index supports full-text search, False otherwise
+   */
+  public boolean supportsFullTextSearch();
+
+  /**
+   * If this IndexCollection exposes a single GroupScan, return the GroupScan 
instance. For external indexes
+   * such as Elasticsearch, we may have a single GroupScan representing all 
the indexes contained
+   * within that collection.  On the other hand, for native indexes, each 
separate index would
+   * have its own GroupScan.
+   * @return GroupScan for this IndexCollection if available, otherwise null
+   */
+  public IndexGroupScan getGroupScan();
+
+  /**
+   * Check if the field name is the leading key of any of the indexes in this 
collection
+   * @param path
+   * @return True if an appropriate index is found, False otherwise
+   */
+  public boolean isColumnIndexed(SchemaPath path);
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDefinition.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDefinition.java
new file mode 100644
index 0000000..995d23c
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDefinition.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.drill.common.expression.LogicalExpression;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+// Interface used to define an index,
+public interface IndexDefinition {
+  /**
+   * Types of an index: PRIMARY_KEY_INDEX, NATIVE_SECONDARY_INDEX, 
EXTERNAL_SECONDARY_INDEX
+   */
+  static enum IndexType {
+    PRIMARY_KEY_INDEX,
+    NATIVE_SECONDARY_INDEX,
+    EXTERNAL_SECONDARY_INDEX
+  };
+
+  /**
+   * Check to see if the field name is an index column and if so return the 
ordinal position in the index
+   * @param path The field path you want to compare to index column names.
+   * @return Return ordinal of the indexed column if valid, otherwise return -1
+   */
+  int getIndexColumnOrdinal(LogicalExpression path);
+
+  /**
+   * Get the name of the index
+   */
+  String getIndexName();
+
+  /**
+   * Check if this index 'covers' all the columns specified in the supplied 
list of columns
+   * @param columns
+   * @return True for covering index, False for non-covering
+   */
+  boolean isCoveringIndex(List<LogicalExpression> columns);
+
+  /**
+   * Check if this index have all the columns specified in the supplied list 
of columns indexed
+   * @param columns
+   * @return True if all fields are indexed, False for some or all fields is 
not indexed
+   */
+  boolean allColumnsIndexed(Collection<LogicalExpression> columns);
+
+  /**
+   * Check if this index has some columns specified in the supplied list of 
columns indexed
+   * @param columns
+   * @return True if some fields are indexed, False if none of the fields are 
indexed
+   */
+  boolean someColumnsIndexed(Collection<LogicalExpression> columns);
+
+  /**
+   * Get the list of columns (typically 1 column) that constitute the row key 
(primary key)
+   * @return
+   */
+  List<LogicalExpression> getRowKeyColumns();
+
+  /**
+   * Get the name of the table this index is associated with
+   */
+  String getTableName();
+
+  /**
+   * Get the type of this index based on {@link IndexType}
+   * @return one of the values in {@link IndexType}
+   */
+  IndexType getIndexType();
+
+
+  List<LogicalExpression> getIndexColumns();
+
+  List<LogicalExpression> getNonIndexColumns();
+
+  RelCollation getCollation();
+
+  Map<LogicalExpression, RelFieldCollation> getCollationMap();
+
+  /**
+   * Get the nulls ordering of this index
+   * @return True, if nulls first. False otherwise
+   */
+  NullDirection getNullsOrderingDirection();
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java
new file mode 100644
index 0000000..f355285
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.IndexGroupScan;
+import org.apache.drill.exec.planner.cost.PluginCost;
+
+
+/**
+ * IndexDefinition + functions to access materialized index(index table/scan, 
etc)
+ */
+
+public interface IndexDescriptor extends IndexDefinition {
+
+  /**
+   * Get the estimated row count for a single index condition
+   * @param input The rel node corresponding to the primary table
+   * @param indexCondition The index condition (e.g index_col1 < 10 AND 
index_col2 = 'abc')
+   * @return The estimated row count
+   */
+  double getRows(RelNode input, RexNode indexCondition);
+
+  /**
+   * Whether or not the index supports getting row count statistics
+   * @return True if index supports getting row count, False otherwise
+   */
+  boolean supportsRowCountStats();
+
+  /**
+   * Get an instance of the group scan associated with this index descriptor
+   * @return An instance of group scan for this index
+   */
+  IndexGroupScan getIndexGroupScan();
+
+  /**
+   * Whether or not the index supports full-text search (to allow pushing down 
such filters)
+   * @return True if index supports full-text search, False otherwise
+   */
+  boolean supportsFullTextSearch();
+
+  FunctionalIndexInfo getFunctionalInfo();
+
+  public RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner,
+      int numProjectedFields, GroupScan primaryGroupScan);
+
+  public PluginCost getPluginCostModel();
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscover.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscover.java
new file mode 100644
index 0000000..309083b
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscover.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+
+public interface IndexDiscover {
+    IndexCollection getTableIndex(String tableName);
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverBase.java
new file mode 100644
index 0000000..fde2a32
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverBase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.drill.exec.physical.base.AbstractDbGroupScan;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.calcite.rel.RelNode;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * IndexDiscoverBase is the layer to read index configurations of tables on 
storage plugins,
+ * then based on the properties it collected, get the StoragePlugin from 
StoragePluginRegistry,
+ * together with indexes information, build an IndexCollection
+ */
+public abstract class IndexDiscoverBase implements IndexDiscover {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IndexDiscoverBase.class);
+
+  private AbstractDbGroupScan scan; // group scan corresponding to the primary 
table
+  private RelNode scanRel;   // physical scan rel corresponding to the primary 
table
+
+  public IndexDiscoverBase(AbstractDbGroupScan inScan, DrillScanRelBase 
inScanPrel) {
+    scan = inScan;
+    scanRel = inScanPrel;
+  }
+
+  public IndexDiscoverBase(AbstractDbGroupScan inScan, ScanPrel inScanPrel) {
+    scan = inScan;
+    scanRel = inScanPrel;
+  }
+
+  public AbstractDbGroupScan getOriginalScan() {
+    return scan;
+  }
+
+  public RelNode getOriginalScanRel() {
+    return scanRel;
+  }
+
+  public IndexCollection getTableIndex(String tableName, String storageName, 
Collection<DrillIndexDefinition>  indexDefs ) {
+    Set<DrillIndexDescriptor> idxSet = new HashSet<>();
+    for (DrillIndexDefinition def : indexDefs) {
+      DrillIndexDescriptor indexDescriptor = new DrillIndexDescriptor(def);
+      materializeIndex(storageName, indexDescriptor);
+    }
+    return new DrillIndexCollection(getOriginalScanRel(), idxSet);
+  }
+
+  public void materializeIndex(String storageName, DrillIndexDescriptor index) 
{
+    index.setStorageName(storageName);
+    index.setDrillTable(buildDrillTable(index));
+  }
+
+  /**
+   * When there is storageName in IndexDescriptor, get a DrillTable instance 
based on the
+   * StorageName and other informaiton in idxDesc that helps identifying the 
table.
+   * @param idxDesc
+   * @return
+   */
+  public DrillTable getExternalDrillTable(IndexDescriptor idxDesc) {
+    //XX: get table object for this index, index storage plugin should provide 
interface to get the DrillTable object
+    return null;
+  }
+
+  /**
+   * Abstract function getDrillTable will be implemented the IndexDiscover 
within storage plugin(e.g. HBase, MaprDB)
+   * since the implementations of AbstractStoragePlugin, IndexDescriptor and 
DrillTable in that storage plugin may have
+   * the implement details.
+   * @param idxDesc
+
+   * @return
+   */
+  public DrillTable buildDrillTable(IndexDescriptor idxDesc) {
+    if(idxDesc.getIndexType() == 
IndexDescriptor.IndexType.EXTERNAL_SECONDARY_INDEX) {
+      return getExternalDrillTable(idxDesc);
+    }
+    else {
+      return getNativeDrillTable(idxDesc);
+    }
+  }
+
+  /**
+   * When it is native index(index provided by native storage plugin),
+   * the actual IndexDiscover should provide the implementation to get the 
DrillTable object of index,
+   * Otherwise, we call IndexDiscoverable interface exposed from external 
storage plugin's SchemaFactory
+   * to get the desired DrillTable.
+   * @param idxDesc
+   * @return
+   */
+  public abstract DrillTable getNativeDrillTable(IndexDescriptor idxDesc);
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverable.java
new file mode 100644
index 0000000..dbf5edc
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.drill.exec.planner.logical.DrillTable;
+
+
+/**
+ * SchemaFactory of a storage plugin that can used to store index tables 
should expose this interface to allow
+ * IndexDiscovers discovering the index table without adding dependency to the 
storage plugin.
+ */
+public interface IndexDiscoverable {
+
+  /**
+   * return the found DrillTable with path (e.g. names={"elasticsearch", 
"staffidx", "stjson"})
+   * @param discover
+   * @param desc
+   * @return
+   */
+    DrillTable findTable(IndexDiscover discover, DrillIndexDescriptor desc);
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java
new file mode 100644
index 0000000..ea34ea5
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Encapsulates one or more IndexProperties representing (non)covering or 
intersecting indexes. The encapsulated
+ * IndexProperties are used to rank the index in comparison with other 
IndexGroups.
+ */
+public class IndexGroup {
+  private List<IndexProperties> indexProps;
+
+  public IndexGroup() {
+    indexProps = Lists.newArrayList();
+  }
+
+  public boolean isIntersectIndex() {
+    if (indexProps.size() > 1) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public int numIndexes() {
+    return indexProps.size();
+  }
+
+  public void addIndexProp(IndexProperties prop) {
+    indexProps.add(prop);
+  }
+
+  public void addIndexProp(List<IndexProperties> prop) {
+    indexProps.addAll(prop);
+  }
+
+  public boolean removeIndexProp(IndexProperties prop) {
+    return indexProps.remove(prop);
+  }
+
+  public List<IndexProperties> getIndexProps() {
+    return indexProps;
+  }
+}
+
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexProperties.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexProperties.java
new file mode 100644
index 0000000..cfdd6d0
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexProperties.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+
+import java.util.Map;
+
+/**
+ * IndexProperties encapsulates the various metrics of a single index that are 
related to
+ * the current query. These metrics are subsequently used to rank the index in 
comparison
+ * with other indexes.
+ */
+public interface IndexProperties  {
+
+  void setProperties(Map<LogicalExpression, RexNode> prefixMap,
+                            boolean satisfiesCollation,
+                            RexNode indexColumnsRemainderFilter,
+                            Statistics stats);
+
+  double getLeadingSelectivity();
+
+  double getRemainderSelectivity();
+
+  boolean isCovering();
+
+  double getTotalRows();
+
+  IndexDescriptor getIndexDesc();
+
+  DrillScanRelBase getPrimaryTableScan();
+
+  RexNode getTotalRemainderFilter();
+
+  boolean satisfiesCollation();
+
+  void setSatisfiesCollation(boolean satisfiesCollation);
+
+  RelOptCost getSelfCost(RelOptPlanner planner);
+
+  int numLeadingFilters();
+
+  double getAvgRowSize();
+}
+
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexStatistics.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexStatistics.java
new file mode 100644
index 0000000..e716369
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexStatistics.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+
+
+import java.util.List;
+
+public interface IndexStatistics {
+    /** Returns the approximate number of rows in the table. */
+    double getRowCount();
+
+    /** Returns the collections of columns on which this table is sorted. */
+    List<RelCollation> getCollations();
+
+    /** Returns the distribution of the data in query result table. */
+    RelDistribution getDistribution();
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexableExprMarker.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexableExprMarker.java
new file mode 100644
index 0000000..a1a6fc8
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexableExprMarker.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexOver;
+import org.apache.calcite.rex.RexRangeRef;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The filter expressions that could be indexed
+ * Other than SchemaPaths, which represent columns of a table and could be 
indexed,
+ * we consider only function expressions, and specifically, CAST function.
+ * To judge if an expression is indexable, we check these:
+ * 1, this expression should be one operand of a comparison operator, one of 
SqlKind.COMPARISON:
+ *      IN, EQUALS, NOT_EQUALS, LESS_THAN, GREATER_THAN, 
GREATER_THAN_OR_EQUAL, LESS_THAN_OR_EQUAL
+ * 2, the expression tree should contain at least one inputRef (which means 
this expression is a
+ *     computation on top of at least one column), and if we have more than 
one indexable expressions
+ *     are found from operands of comparison operator, we should not take any 
expression as indexable.
+ *
+ * 3, (LIMIT to one level function) the expression is a function call, and no 
nested function call underneath, except ITEM
+ * 4, (LIMIT to CAST), the function call is a CAST
+ */
+public class IndexableExprMarker extends RexVisitorImpl<Boolean> {
+
+  //map of rexNode->converted LogicalExpression
+  final Map<RexNode, LogicalExpression> desiredExpressions = Maps.newHashMap();
+
+  //the expressions in equality comparison
+  final Map<RexNode, LogicalExpression> equalityExpressions = 
Maps.newHashMap();
+
+  //the expression found in non-equality comparison
+  final Map<RexNode, LogicalExpression> notInEquality = Maps.newHashMap();
+
+  //for =(cast(a.b as VARCHAR(len)), 'abcd'), if the 'len' is less than the 
max length of casted field on index table,
+  // we want to rewrite it to LIKE(cast(a.b as VARCHAR(len)), 'abcd%')
+  //map equalOnCastChar: key is the equal operator, value is the operand 
(cast(a.b as VARCHAR(10)),
+  final Map<RexNode, LogicalExpression> equalOnCastChar = Maps.newHashMap();
+
+  final private RelNode inputRel;
+
+  //flag current recursive call state: whether we are on a direct operand of 
comparison operator
+  boolean directCompareOp = false;
+
+  RexCall contextCall = null;
+
+  DrillParseContext parserContext;
+
+  public IndexableExprMarker(RelNode inputRel) {
+    super(true);
+    this.inputRel = inputRel;
+    parserContext = new 
DrillParseContext(PrelUtil.getPlannerSettings(inputRel.getCluster()));
+  }
+
+  public Map<RexNode, LogicalExpression> getIndexableExpression() {
+    return ImmutableMap.copyOf(desiredExpressions);
+  }
+
+  public Map<RexNode, LogicalExpression> getEqualOnCastChar() {
+    return ImmutableMap.copyOf(equalOnCastChar);
+  }
+
+  /**
+   * return the expressions that were only in equality condition _and_ only 
once. ( a.b = 'value' )
+   * @return
+   */
+  public Set<LogicalExpression> getExpressionsOnlyInEquality() {
+
+    Set<LogicalExpression> onlyInEquality = Sets.newHashSet();
+
+    Set<LogicalExpression> notInEqSet = Sets.newHashSet();
+
+    Set<LogicalExpression> inEqMoreThanOnce = Sets.newHashSet();
+
+    notInEqSet.addAll(notInEquality.values());
+
+    for (LogicalExpression expr : equalityExpressions.values()) {
+      //only process expr that is not in any non-equality 
condition(!notInEqSet.contains)
+      if (!notInEqSet.contains(expr)) {
+
+        //expr appear in two and more equality conditions should be ignored too
+        if (inEqMoreThanOnce.contains(expr)) {
+          continue;
+        }
+
+        //we already have recorded this expr in equality condition, move it to 
inEqMoreThanOnce
+        if (onlyInEquality.contains(expr)) {
+          inEqMoreThanOnce.add(expr);
+          onlyInEquality.remove(expr);
+          continue;
+        }
+
+        //finally we could take this expr
+        onlyInEquality.add(expr);
+      }
+    }
+    return onlyInEquality;
+  }
+
+  @Override
+  public Boolean visitInputRef(RexInputRef rexInputRef) {
+    return directCompareOp;
+  }
+
+  public boolean containInputRef(RexNode rex) {
+    if (rex instanceof RexInputRef) {
+      return true;
+    }
+    if ((rex instanceof RexCall) && 
"ITEM".equals(((RexCall)rex).getOperator().getName())) {
+      return true;
+    }
+    //TODO: use a visitor search recursively for inputRef, if found one return 
true
+    return false;
+  }
+
+  public boolean operandsAreIndexable(RexCall call) {
+    SqlKind kind = call.getKind();
+    boolean kindIsRight = (SqlKind.COMPARISON.contains(kind) || 
kind==SqlKind.LIKE || kind == SqlKind.SIMILAR);
+
+    if (!kindIsRight) {
+      return false;
+    }
+
+    int inputReference = 0;
+    for (RexNode operand : call.operands) {
+      //if for this operator, there are two operands and more have inputRef, 
which means it is something like:
+      // a.b = a.c, instead of a.b ='hello', so this cannot apply index
+      if (containInputRef(operand)) {
+        inputReference++;
+        if(inputReference>=2) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public Boolean visitCall(RexCall call) {
+    if (call.getKind() == SqlKind.NOT) {
+      // Conditions under NOT are not indexable
+      return false;
+    }
+    if (operandsAreIndexable(call)) {
+      for (RexNode operand : call.operands) {
+        directCompareOp = true;
+        contextCall = call;
+        boolean markIt = operand.accept(this);
+        directCompareOp = false;
+        contextCall = null;
+        if (markIt) {
+          LogicalExpression expr = DrillOptiq.toDrill(parserContext, inputRel, 
operand);
+          desiredExpressions.put(operand, expr);
+          if (call.getKind() == SqlKind.EQUALS) {
+            equalityExpressions.put(operand, expr);
+          }
+          else {
+            notInEquality.put(operand, expr);
+          }
+        }
+      }
+      return false;
+    }
+
+    //now we are handling a call directly under comparison e.g. <([call], 
literal)
+    if (directCompareOp) {
+      // if it is an item, or CAST function
+      if ("ITEM".equals(call.getOperator().getName())) {
+        return directCompareOp;
+      }
+      else if (call.getKind() == SqlKind.CAST) {
+        //For now, we care only direct CAST: CAST's operand is a 
field(schemaPath),
+        // either ITEM call(nested name) or inputRef
+
+        //cast as char/varchar in equals function
+        if(contextCall != null && contextCall.getKind() == SqlKind.EQUALS
+            && (call.getType().getSqlTypeName()== SqlTypeName.CHAR
+                || call.getType().getSqlTypeName()==SqlTypeName.VARCHAR)) {
+          equalOnCastChar.put(contextCall, DrillOptiq.toDrill(parserContext, 
inputRel, call));
+        }
+
+        RexNode castOp = call.operands.get(0);
+        if (castOp instanceof RexInputRef) {
+          return true;
+        }
+        if ((castOp instanceof RexCall) && 
("ITEM".equals(((RexCall)castOp).getOperator().getName()))) {
+          return true;
+        }
+      }
+    }
+
+    for (RexNode operand : call.operands) {
+      boolean bret = operand.accept(this);
+    }
+    return false;
+  }
+
+  public Boolean visitLocalRef(RexLocalRef localRef) {
+    return false;
+  }
+
+  public Boolean visitLiteral(RexLiteral literal) {
+    return false;
+  }
+
+  public Boolean visitOver(RexOver over) {
+    return false;
+  }
+
+  public Boolean visitCorrelVariable(RexCorrelVariable correlVariable) {
+    return false;
+  }
+
+  public Boolean visitDynamicParam(RexDynamicParam dynamicParam) {
+    return false;
+  }
+
+  public Boolean visitRangeRef(RexRangeRef rangeRef) {
+    return false;
+  }
+
+  public Boolean visitFieldAccess(RexFieldAccess fieldAccess) {
+    final RexNode expr = fieldAccess.getReferenceExpr();
+    return expr.accept(this);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java
new file mode 100644
index 0000000..c17d09f
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+public class InvalidIndexDefinitionException extends DrillRuntimeException {
+  public InvalidIndexDefinitionException(String message) {
+    super(message);
+  }
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/Statistics.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/Statistics.java
new file mode 100644
index 0000000..2859102
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/Statistics.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+
+public interface Statistics {
+
+  double ROWCOUNT_UNKNOWN = -1;
+  //HUGE is same as DrillCostBase.HUGE
+  double ROWCOUNT_HUGE = Double.MAX_VALUE;
+  double AVG_ROWSIZE_UNKNOWN = -1;
+  long AVG_COLUMN_SIZE = 10;
+
+  /** Returns whether statistics are available. Should be called prior to 
using the statistics
+   */
+  boolean isStatsAvailable();
+
+  /** Returns a unique index identifier
+   *  @param idx - Index specified as a {@link IndexDescriptor}
+   *  @return The unique index identifier
+   */
+  String buildUniqueIndexIdentifier(IndexDescriptor idx);
+
+  /** Returns the rowcount for the specified filter condition
+   *  @param condition - Filter specified as a {@link RexNode}
+   *  @param tabIdxName - The index name generated using {@code 
buildUniqueIndexIdentifier}
+   *  @param scanRel - The current scan rel
+   *  @return the rowcount for the filter
+   */
+  double getRowCount(RexNode condition, String tabIdxName, RelNode scanRel);
+
+  /** Returns the leading rowcount for the specified filter condition
+   *  Leading rowcount means rowcount for filter condition only on leading 
index columns.
+   *  @param condition - Filter specified as a {@link RexNode}
+   *  @param tabIdxName - The index name generated using {@code 
buildUniqueIndexIdentifier}
+   *  @param scanRel - The current scan rel
+   *  @return the leading rowcount
+   */
+  double getLeadingRowCount(RexNode condition, String tabIdxName, RelNode 
scanRel);
+
+  /** Returns the average row size for the specified filter condition
+   * @param tabIdxName - The index name generated using {@code 
buildUniqueIndexIdentifier}
+   * @param isIndexScan - Whether the current rel is an index scan (false for 
primary table)
+   */
+  double getAvgRowSize(String tabIdxName, boolean isIndexScan);
+
+  boolean initialize(RexNode condition, DrillScanRelBase scanRel, 
IndexCallContext context);
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/StatisticsPayload.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/StatisticsPayload.java
new file mode 100644
index 0000000..6894e4f
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/StatisticsPayload.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+public interface StatisticsPayload {
+  double getRowCount();
+  double getLeadingRowCount();
+  double getAvgRowSize();
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index 53036f1..ed9b32f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -91,6 +91,10 @@ public abstract class DrillTable implements Table {
     this.options = options;
   }
 
+  public void setGroupScan(GroupScan scan) {
+    this.scan = scan;
+  }
+
   public GroupScan getGroupScan() throws IOException{
     if (scan == null) {
       if (selection instanceof FileSelection && ((FileSelection) 
selection).isEmptyDirectory()) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PartitionFunction.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PartitionFunction.java
new file mode 100644
index 0000000..754c5d7
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PartitionFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.record.VectorWrapper;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, 
property="@class")
+public interface PartitionFunction  {
+
+  /**
+   * Return the list of FieldReferences that participate in the partitioning 
function
+   * @return list of FieldReferences
+   */
+  List<FieldReference> getPartitionRefList();
+
+  /**
+   * Setup method for the partitioning function
+   * @param partitionKeys a list of partition columns on which range 
partitioning is needed
+   */
+  void setup(List<VectorWrapper<?>> partitionKeys);
+
+  /**
+   * Evaluate a partitioning function for a particular row index and return 
the partition id
+   * @param index the integer index into the partition keys vector for a 
specific 'row' of values
+   * @param numPartitions the max number of partitions that are allowed
+   * @return partition id, an integer value
+   */
+  int eval(int index, int numPartitions);
+
+  /**
+   * Returns a FieldReference (LogicalExpression) for the partition function
+   * @return FieldReference for the partition function
+   */
+  FieldReference getPartitionFieldRef();
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index eb6112d..cb79091 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -80,7 +80,7 @@ public abstract class AbstractRecordBatch<T extends 
PhysicalOperator> implements
     }
   }
 
-  protected static enum BatchState {
+  public static enum BatchState {
     /** Need to build schema and return. */
     BUILD_SCHEMA,
     /** This is still the first data batch. */
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 9314da6..1bbbe76 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -101,6 +101,11 @@ public abstract class AbstractRecordReader implements 
RecordReader {
     }
   }
 
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
   protected List<SchemaPath> getDefaultColumnsToRead() {
     return GroupScan.ALL_COLUMNS;
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index edd91d1..33b361c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -43,6 +43,14 @@ public interface RecordReader extends AutoCloseable {
   void allocate(Map<String, ValueVector> vectorMap) throws 
OutOfMemoryException;
 
   /**
+   * Check if the reader may have potentially more data to be read in 
subsequent iterations. Certain types of readers
+   * such as repeatable readers can be invoked multiple times, so this method 
will allow ScanBatch to check with
+   * the reader before closing it.
+   * @return return true if there could potentially be more reads, false 
otherwise
+   */
+  boolean hasNext();
+
+  /**
    * Increments this record reader forward, writing via the provided output
    * mutator into the output batch.
    *
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/EncodedSchemaPathSet.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/EncodedSchemaPathSet.java
new file mode 100644
index 0000000..5f9eef8
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/EncodedSchemaPathSet.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+
+import org.apache.drill.shaded.guava.com.google.common.io.BaseEncoding;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This class provided utility methods to encode and decode a set of user 
specified
+ * SchemaPaths to a set of encoded SchemaPaths with the following properties.
+ * <ol>
+ * <li>Valid Drill identifier as per its grammar with only one, root name 
segment.
+ * <li>A single identifier can not exceed 1024 characters in length.
+ * </ol>
+ * <p>
+ * Format of the encoded SchemaPath:
+ * <blockquote><pre>$$ENC\d\dlt;base32 encoded input 
paths&gt;</pre></blockquote>
+ * <p>
+ * We use Base-32 over Base-64 because the later's charset includes '\' and 
'+'.
+ */
+public class EncodedSchemaPathSet {
+
+  private static final int ESTIMATED_ENCODED_SIZE = 1024;
+
+  private static final String ENC_PREFIX = "$$ENC";
+
+  private static final String ENC_FORMAT_STRING = ENC_PREFIX + "%02d%s";
+  private static final int ENC_PREFIX_SIZE = ENC_PREFIX.length() + 
"00".length();
+  private static final int MAX_ENC_IDENTIFIER_SIZE = 
(PlannerSettings.DEFAULT_IDENTIFIER_MAX_LENGTH - ENC_PREFIX_SIZE);
+  private static final int MAX_ENC_IDENTIFIER_COUNT = 100; // 
"$$ENC00*...$$ENC99*"
+
+  private static final BaseEncoding CODEC = 
BaseEncoding.base32().omitPadding(); // no-padding version
+
+  public static final String ENCODED_STAR_COLUMN = encode("*")[0];
+
+  /*
+   * Performance of various methods of encoding a Java String to UTF-8 keeps 
changing
+   * between releases, hence we'll encapsulate the actual methods within these 
functions
+   * and use them everywhere in Drill
+   */
+  private static final String UTF_8 = "utf-8";
+
+
+  private static byte[] encodeUTF(String input) {
+    try {
+      return input.getBytes(UTF_8);
+    } catch (UnsupportedEncodingException e) {
+      throw new DrillRuntimeException(e); // should never come to this
+    }
+  }
+
+  private static String decodeUTF(byte[] input) {
+    try {
+      return new String(input, UTF_8);
+    } catch (UnsupportedEncodingException e) {
+      throw new DrillRuntimeException(e); // should never come to this
+    }
+  }
+
+  private static String decodeUTF(byte[] input, int offset, int length) {
+    try {
+      return new String(input, offset, length, UTF_8);
+    } catch (UnsupportedEncodingException e) {
+      throw new DrillRuntimeException(e); // should never come to this
+    }
+  }
+
+  /**
+   * Returns the encoded array of SchemaPath identifiers from the input array 
of SchemaPath.
+   * <p>
+   * The returned identifiers have the following properties:
+   * <ul>
+   *  <li>Each SchemaPath identifier in the array has only one single root 
NameSegment.</li>
+   *  <li>Maximum length of each such identifier is equal to the maximum 
length of Drill identifier (currently 1024).</li>
+   * </ul>
+   * <p>
+   * We take advantage of the fact that Java's modified utf-8 encoding can 
never contain
+   * embedded null byte.
+   * @see 
<a>http://docs.oracle.com/javase/8/docs/api/java/io/DataInput.html#modified-utf-8</a>
+   */
+  public static String[] encode(final String... schemaPaths) {
+    Preconditions.checkArgument(schemaPaths != null && schemaPaths.length > 0,
+        "At least one schema path should be provided");
+
+    NoCopyByteArrayOutputStream out = new 
NoCopyByteArrayOutputStream(ESTIMATED_ENCODED_SIZE);
+    int bufOffset = 1; // 1st byte is NULL
+    for (String schemaPath : schemaPaths) {
+      out.write(0);
+      out.write(encodeUTF(schemaPath));
+    }
+    out.close();
+
+    final int bufLen = out.size() - 1; // not counting the first NULL byte
+    String encodedStr = CODEC.encode(out.getBuffer(), bufOffset, bufLen);
+    assert !encodedStr.endsWith("=") : String.format("Encoded string '%s' ends 
with '='", encodedStr);
+    return splitIdentifiers(encodedStr);
+  }
+
+  public static boolean isEncodedSchemaPath(SchemaPath schemaPath) {
+    return schemaPath != null && 
isEncodedSchemaPath(schemaPath.getRootSegment().getNameSegment().getPath());
+  }
+
+  public static boolean isEncodedSchemaPath(String schemaPath) {
+    return schemaPath != null && schemaPath.startsWith(ENC_PREFIX);
+  }
+
+  /**
+   * Returns the decoded Collection of SchemaPath from the input which
+   * may contain a mix of encoded and non-encoded SchemaPaths.
+   * <p>
+   * The size of returned Collection is always equal to or greater than the
+   * input array.
+   * <p>
+   * The non-encoded SchemaPaths are collated in the beginning to the returned
+   * array, in the same order as that of the input array.
+   */
+  public static Collection<SchemaPath> decode(final Collection<SchemaPath> 
encodedPaths) {
+    String[] schemaPathStrings = new String[encodedPaths.size()];
+    Iterator<SchemaPath> encodedPathsItr = encodedPaths.iterator();
+    for (int i = 0; i < schemaPathStrings.length; i++) {
+      SchemaPath schemaPath = encodedPathsItr.next();
+      if (schemaPath.getRootSegmentPath().startsWith(ENC_PREFIX)) {
+        // encoded schema path contains only root segment
+        schemaPathStrings[i] = schemaPath.getRootSegmentPath();
+      } else {
+        schemaPathStrings[i] = schemaPath.toExpr();
+      }
+    }
+    String[] decodedStrings = decode(schemaPathStrings);
+    if (decodedStrings == schemaPathStrings) {
+      return encodedPaths; // return the original collection as no encoded 
SchemaPath was found
+    } else {
+      ImmutableList.Builder<SchemaPath> builder = new 
ImmutableList.Builder<>();
+      for (String decodedString : decodedStrings) {
+        if ("*".equals(decodedString) || "`*`".equals(decodedString)) {
+          builder.add(SchemaPath.STAR_COLUMN);
+        } else {
+          builder.add(SchemaPath.parseFromString(decodedString));
+        }
+      }
+      return builder.build();
+    }
+  }
+
+  /**
+   * Returns the decoded array of SchemaPath strings from the input which
+   * may contain a mix of encoded and non-encoded SchemaPaths.
+   * <p>
+   * The size of returned array is always equal to or greater than the
+   * input array.
+   * <p>
+   * The non-encoded SchemaPaths are collated in the beginning to the returned
+   * array, in the same order as that of the input array.
+   */
+  public static String[] decode(final String... encodedPaths) {
+    Preconditions.checkArgument(encodedPaths != null && encodedPaths.length > 
0,
+        "At least one encoded path should be provided");
+
+    StringBuilder sb = new StringBuilder(ESTIMATED_ENCODED_SIZE);
+
+    // As the encoded schema path move across components, they could get 
reordered.
+    // Sorting ensures that the original order is restored before 
concatenating the
+    // components back to the full encoded String.
+    Arrays.sort(encodedPaths);
+
+    List<String> decodedPathList = Lists.newArrayList();
+    for (String encodedPath : encodedPaths) {
+      if (encodedPath.startsWith(ENC_PREFIX)) {
+        sb.append(encodedPath, ENC_PREFIX_SIZE, encodedPath.length());
+      } else {
+        decodedPathList.add(encodedPath);
+      }
+    }
+
+    if (sb.length() > 0) {
+      byte[] decodedBytes;
+      try {
+        decodedBytes = CODEC.decode(sb);
+      } catch (IllegalArgumentException e) {
+        throw new DrillRuntimeException(String.format(
+            "Unable to decode the input strings as encoded schema paths:\n%s", 
Arrays.asList(encodedPaths)), e);
+      }
+
+      int start = 0, index = 0;
+      for (; index < decodedBytes.length; index++) {
+        if (decodedBytes[index] == 0 && index - start > 0) {
+          decodedPathList.add(decodeUTF(decodedBytes, start, index-start));
+          start = index + 1;
+        }
+      }
+      if (index - start > 0) {
+        String lastSchemaPath = decodeUTF(decodedBytes, start, 
index-start).trim();
+        if (!lastSchemaPath.isEmpty()) {
+          decodedPathList.add(lastSchemaPath);
+        }
+      }
+      return decodedPathList.toArray(new String[decodedPathList.size()]);
+    } else {
+      // original list did not have any encoded path, return as is
+      return encodedPaths;
+    }
+  }
+
+  /**
+   * Splits the input string so that the length of each encoded string,
+   * including the signature prefix is less than or equal to 
MAX_DRILL_IDENTIFIER_SIZE.
+   */
+  private static String[] splitIdentifiers(String input) {
+    if (input.length() < MAX_ENC_IDENTIFIER_SIZE) {
+      return new String[] { String.format(ENC_FORMAT_STRING, 0, input) };
+    }
+    int splitsCount = (int) Math.ceil(input.length() / 
(double)MAX_ENC_IDENTIFIER_SIZE);
+    if (splitsCount > MAX_ENC_IDENTIFIER_COUNT) {
+      throw new DrillRuntimeException(String.format(
+          "Encoded size of the SchemaPath identifier '%s' exceeded maximum 
value.", input));
+    }
+    String[] result = new String[splitsCount];
+    for (int i = 0, startIdx = 0; i < result.length; i++, startIdx += 
MAX_ENC_IDENTIFIER_SIZE) {
+      // TODO: see if we can avoid memcpy due to input.substring() call
+      result[i] = String.format(ENC_FORMAT_STRING, i, 
input.substring(startIdx, Math.min(input.length(), startIdx + 
MAX_ENC_IDENTIFIER_SIZE)));
+    }
+    return result;
+  }
+
+  /**
+   * Optimized version of Java's ByteArrayOutputStream which returns the 
underlying
+   * byte array instead of making a copy
+   */
+  private static class NoCopyByteArrayOutputStream extends 
ByteArrayOutputStream {
+    public NoCopyByteArrayOutputStream(int size) {
+      super(size);
+    }
+
+    public byte[] getBuffer() {
+      return buf;
+    }
+
+    public int size() {
+      return count;
+    }
+
+    @Override
+    public void write(int b) {
+      super.write(b);
+    }
+
+    @Override
+    public void write(byte[] b) {
+      super.write(b, 0, b.length);
+    }
+
+    @Override
+    public void close() {
+      try {
+        super.close();
+      } catch (IOException e) {
+        throw new DrillRuntimeException(e); // should never come to this
+      }
+    }
+  }
+
+}
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java 
b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index d346182..2f5c3de 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -414,9 +414,9 @@ public final class UserBitShared {
      */
     UNORDERED_RECEIVER(11, 11),
     /**
-     * <code>RANGE_SENDER = 12;</code>
+     * <code>RANGE_PARTITION_SENDER = 12;</code>
      */
-    RANGE_SENDER(12, 12),
+    RANGE_PARTITION_SENDER(12, 12),
     /**
      * <code>SCREEN = 13;</code>
      */
@@ -593,6 +593,10 @@ public final class UserBitShared {
      * <code>RUNTIME_FILTER = 56;</code>
      */
     RUNTIME_FILTER(56, 56),
+    /**
+     * <code>ROWKEY_JOIN = 57;</code>
+     */
+    ROWKEY_JOIN(57, 57),
     ;
 
     /**
@@ -644,9 +648,9 @@ public final class UserBitShared {
      */
     public static final int UNORDERED_RECEIVER_VALUE = 11;
     /**
-     * <code>RANGE_SENDER = 12;</code>
+     * <code>RANGE_PARTITION_SENDER = 12;</code>
      */
-    public static final int RANGE_SENDER_VALUE = 12;
+    public static final int RANGE_PARTITION_SENDER_VALUE = 12;
     /**
      * <code>SCREEN = 13;</code>
      */
@@ -823,6 +827,10 @@ public final class UserBitShared {
      * <code>RUNTIME_FILTER = 56;</code>
      */
     public static final int RUNTIME_FILTER_VALUE = 56;
+    /**
+     * <code>ROWKEY_JOIN = 57;</code>
+     */
+    public static final int ROWKEY_JOIN_VALUE = 57;
 
 
     public final int getNumber() { return value; }
@@ -841,7 +849,7 @@ public final class UserBitShared {
         case 9: return ORDERED_PARTITION_SENDER;
         case 10: return PROJECT;
         case 11: return UNORDERED_RECEIVER;
-        case 12: return RANGE_SENDER;
+        case 12: return RANGE_PARTITION_SENDER;
         case 13: return SCREEN;
         case 14: return SELECTION_VECTOR_REMOVER;
         case 15: return STREAMING_AGGREGATE;
@@ -886,6 +894,7 @@ public final class UserBitShared {
         case 54: return PARTITION_LIMIT;
         case 55: return PCAPNG_SUB_SCAN;
         case 56: return RUNTIME_FILTER;
+        case 57: return ROWKEY_JOIN;
         default: return null;
       }
     }
@@ -24422,40 +24431,40 @@ public final class UserBitShared {
       "TATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020" +
       
"\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022"
 +
       
"\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005"
 +
-      "\022\032\n\026CANCELLATION_REQUESTED\020\006*\367\010\n\020CoreOpe" +
+      "\022\032\n\026CANCELLATION_REQUESTED\020\006*\222\t\n\020CoreOpe" +
       "ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS" +
       "T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE" 
+
       
"\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS"
 +
       "H_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGI" 
+
       "NG_RECEIVER\020\010\022\034\n\030ORDERED_PARTITION_SENDE" +
       
"R\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013",
-      
"\022\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECT"
 +
-      "ION_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGREG" +
-      
"ATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021" +
-      
"\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026"
 +
-      "PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCA" +
-      "N\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_S" 
+
-      
"CAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB_" +
-      "SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN" 
+
-      "\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB" 
+
-      
"_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCER",
-      "_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WINDO" +
-      "W\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_SC" +
-      "AN\020$\022\021\n\rPCAP_SUB_SCAN\020%\022\022\n\016KAFKA_SUB_SCA" +
-      
"N\020&\022\021\n\rKUDU_SUB_SCAN\020\'\022\013\n\007FLATTEN\020(\022\020\n\014L"
 +
-      "ATERAL_JOIN\020)\022\n\n\006UNNEST\020*\022,\n(HIVE_DRILL_" +
-      "NATIVE_PARQUET_ROW_GROUP_SCAN\020+\022\r\n\tJDBC_" +
-      "SCAN\020,\022\022\n\016REGEX_SUB_SCAN\020-\022\023\n\017MAPRDB_SUB" +
-      "_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013KUDU_WRIT" +
-      "ER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRI" +
-      "TER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_S",
-      "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PART" +
-      "ITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCAN\0207\022\022\n\016RU" +
-      "NTIME_FILTER\0208*g\n\nSaslStatus\022\020\n\014SASL_UNK" +
-      "NOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRE" 
+
-      
"SS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B"
 +
-      ".\n\033org.apache.drill.exec.protoB\rUserBitS" +
-      "haredH\001"
+      "\022\032\n\026RANGE_PARTITION_SENDER\020\014\022\n\n\006SCREEN\020\r" +
+      "\022\034\n\030SELECTION_VECTOR_REMOVER\020\016\022\027\n\023STREAM" +
+      "ING_AGGREGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTER" +
+      
"NAL_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_"
 +
+      "SORT\020\024\022\032\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHI" 
+
+      "VE_SUB_SCAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\r" 
+
+      
"MOCK_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017D" +
+      "IRECT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEX" 
+
+      "T_SUB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_" 
+
+      "SCHEMA_SUB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025",
+      "\n\021PRODUCER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020" +
+      
"!\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rA" +
+      "VRO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_SCAN\020%\022\022\n\016KAF" +
+      "KA_SUB_SCAN\020&\022\021\n\rKUDU_SUB_SCAN\020\'\022\013\n\007FLAT" +
+      
"TEN\020(\022\020\n\014LATERAL_JOIN\020)\022\n\n\006UNNEST\020*\022,\n(H" +
+      "IVE_DRILL_NATIVE_PARQUET_ROW_GROUP_SCAN\020" +
+      
"+\022\r\n\tJDBC_SCAN\020,\022\022\n\016REGEX_SUB_SCAN\020-\022\023\n\017" +
+      "MAPRDB_SUB_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n" +
+      "\013KUDU_WRITER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017" +
+      "\n\013JSON_WRITER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022",
+      "\022\n\016IMAGE_SUB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN" +
+      "\0205\022\023\n\017PARTITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SC" +
+      "AN\0207\022\022\n\016RUNTIME_FILTER\0208\022\017\n\013ROWKEY_JOIN\020" +
+      "9*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSAS" +
+      
"L_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_" +
+      "SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache" +
+      ".drill.exec.protoB\rUserBitSharedH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index 6b36fb4..6138ad6 100644
--- 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -34,7 +34,7 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
     ORDERED_PARTITION_SENDER(9),
     PROJECT(10),
     UNORDERED_RECEIVER(11),
-    RANGE_SENDER(12),
+    RANGE_PARTITION_SENDER(12),
     SCREEN(13),
     SELECTION_VECTOR_REMOVER(14),
     STREAMING_AGGREGATE(15),
@@ -78,7 +78,8 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
     SEQUENCE_SUB_SCAN(53),
     PARTITION_LIMIT(54),
     PCAPNG_SUB_SCAN(55),
-    RUNTIME_FILTER(56);
+    RUNTIME_FILTER(56),
+    ROWKEY_JOIN(57);
     
     public final int number;
     
@@ -108,7 +109,7 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
             case 9: return ORDERED_PARTITION_SENDER;
             case 10: return PROJECT;
             case 11: return UNORDERED_RECEIVER;
-            case 12: return RANGE_SENDER;
+            case 12: return RANGE_PARTITION_SENDER;
             case 13: return SCREEN;
             case 14: return SELECTION_VECTOR_REMOVER;
             case 15: return STREAMING_AGGREGATE;
@@ -153,6 +154,7 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
             case 54: return PARTITION_LIMIT;
             case 55: return PCAPNG_SUB_SCAN;
             case 56: return RUNTIME_FILTER;
+            case 57: return ROWKEY_JOIN;
             default: return null;
         }
     }
diff --git a/protocol/src/main/protobuf/UserBitShared.proto 
b/protocol/src/main/protobuf/UserBitShared.proto
index 16d44c3..843c6d8 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -300,7 +300,7 @@ enum CoreOperatorType {
   ORDERED_PARTITION_SENDER = 9;
   PROJECT = 10;
   UNORDERED_RECEIVER = 11;
-  RANGE_SENDER = 12;
+  RANGE_PARTITION_SENDER = 12;
   SCREEN = 13;
   SELECTION_VECTOR_REMOVER = 14;
   STREAMING_AGGREGATE = 15;
@@ -345,6 +345,7 @@ enum CoreOperatorType {
   PARTITION_LIMIT = 54;
   PCAPNG_SUB_SCAN = 55;
   RUNTIME_FILTER = 56;
+  ROWKEY_JOIN = 57;
 }
 
 /* Registry that contains list of jars, each jar contains its name and list of 
function signatures.

Reply via email to