Repository: drill
Updated Branches:
  refs/heads/master 06e1522b5 -> 9df3403a0


DRILL-5375: Nested loop join: return correct result for left join closes #794


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8e19d61b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8e19d61b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8e19d61b

Branch: refs/heads/master
Commit: 8e19d61b07c2e2f5e197c2a255efe137398ed3bd
Parents: 06e1522
Author: Arina Ielchiieva <arina.yelchiy...@gmail.com>
Authored: Wed Mar 22 15:07:23 2017 +0000
Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com>
Committed: Fri Apr 7 15:23:58 2017 +0300

----------------------------------------------------------------------
 .../apache/drill/exec/expr/BatchReference.java  |  78 +++++++
 .../drill/exec/expr/EvaluationVisitor.java      |  35 +--
 .../exec/expr/ExpressionTreeMaterializer.java   |  88 ++++++--
 .../exec/expr/ValueVectorReadExpression.java    |  25 ++-
 .../exec/physical/config/NestedLoopJoinPOP.java |  28 +--
 .../exec/physical/impl/join/NestedLoopJoin.java |   7 +-
 .../physical/impl/join/NestedLoopJoinBatch.java | 129 ++++++++---
 .../impl/join/NestedLoopJoinTemplate.java       | 212 ++++++++++++-------
 .../apache/drill/exec/planner/PlannerPhase.java |  14 +-
 .../planner/logical/DrillConstExecutor.java     |   7 +-
 .../drill/exec/planner/logical/DrillOptiq.java  |  56 ++++-
 .../exec/planner/physical/JoinPruleBase.java    |  33 +--
 .../planner/physical/NestedLoopJoinPrel.java    |  49 +++--
 .../planner/physical/NestedLoopJoinPrule.java   |   8 +-
 .../exec/planner/physical/PlannerSettings.java  |  28 +++
 .../server/options/SystemOptionManager.java     |   1 +
 .../physical/impl/join/TestNestedLoopJoin.java  |  91 ++++++--
 .../exec/planner/logical/DrillOptiqTest.java    |   5 +-
 18 files changed, 638 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/BatchReference.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/BatchReference.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/BatchReference.java
new file mode 100644
index 0000000..440f69f
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/BatchReference.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Holder class that contains batch naming, batch  and record index. Batch 
index is used when batch is hyper container.
+ * Used to distinguish batches in non-equi conditions during expression 
materialization.
+ * Mostly used for nested loop join which allows non equi-join.
+ *
+ * BatchReference instance can be created during batch initialization
+ * (ex: instance of {@link org.apache.drill.exec.record.AbstractRecordBatch})
+ * since naming of batches used won't change during data processing.
+ * Though information from batch reference will be used during schema build 
(i.e. once per OK_NEW_SCHEMA).
+ *
+ * Example:
+ * BatchReference{batchName='leftBatch', batchIndex='leftIndex', 
recordIndex='leftIndex'}
+ * BatchReference{batchName='rightContainer', batchIndex='rightBatchIndex', 
recordIndex='rightRecordIndexWithinBatch'}
+ *
+ */
+public final class BatchReference {
+
+  private final String batchName;
+
+  private final String batchIndex;
+
+  private final String recordIndex;
+
+  public BatchReference(String batchName, String recordIndex) {
+    // when batch index is not indicated, record index value will be set 
instead
+    this(batchName, recordIndex, recordIndex);
+  }
+
+  public BatchReference(String batchName, String batchIndex, String 
recordIndex) {
+    Preconditions.checkNotNull(batchName, "Batch name should not be null.");
+    Preconditions.checkNotNull(batchIndex, "Batch index should not be null.");
+    Preconditions.checkNotNull(recordIndex, "Record index should not be 
null.");
+    this.batchName = batchName;
+    this.batchIndex = batchIndex;
+    this.recordIndex = recordIndex;
+  }
+
+  public String getBatchName() {
+    return batchName;
+  }
+
+  public String getBatchIndex() {
+    return batchIndex;
+  }
+
+  public String getRecordIndex() {
+    return recordIndex;
+  }
+
+  @Override
+  public String toString() {
+    return "BatchReference{" +
+        "batchName='" + batchName + '\'' +
+        ", batchIndex='" + batchIndex + '\'' +
+        ", recordIndex='" + recordIndex + '\'' +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 75b83c9..73a0363 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -399,15 +399,29 @@ public class EvaluationVisitor {
     private HoldingContainer 
visitValueVectorReadExpression(ValueVectorReadExpression e, ClassGenerator<?> 
generator)
         throws RuntimeException {
       // declare value vector
+      DirectExpression batchName;
+      JExpression batchIndex;
+      JExpression recordIndex;
+
+      // if value vector read expression has batch reference, use its values 
in generated code,
+      // otherwise use values provided by mapping set (which point to only one 
batch)
+      // primary used for non-equi joins where expression conditions may refer 
to more than one batch
+      BatchReference batchRef = e.getBatchRef();
+      if (batchRef != null) {
+        batchName = DirectExpression.direct(batchRef.getBatchName());
+        batchIndex = DirectExpression.direct(batchRef.getBatchIndex());
+        recordIndex = DirectExpression.direct(batchRef.getRecordIndex());
+      } else {
+        batchName = generator.getMappingSet().getIncoming();
+        batchIndex = generator.getMappingSet().getValueReadIndex();
+        recordIndex = batchIndex;
+      }
 
-      JExpression vv1 = 
generator.declareVectorValueSetupAndMember(generator.getMappingSet().getIncoming(),
-          e.getFieldId());
-      JExpression indexVariable = 
generator.getMappingSet().getValueReadIndex();
-
-      JExpression componentVariable = indexVariable.shrz(JExpr.lit(16));
+      JExpression vv1 = generator.declareVectorValueSetupAndMember(batchName, 
e.getFieldId());
+      JExpression componentVariable = batchIndex.shrz(JExpr.lit(16));
       if (e.isSuperReader()) {
         vv1 = (vv1.component(componentVariable));
-        indexVariable = indexVariable.band(JExpr.lit((int) 
Character.MAX_VALUE));
+        recordIndex = recordIndex.band(JExpr.lit((int) Character.MAX_VALUE));
       }
 
       // evaluation work.
@@ -418,14 +432,9 @@ public class EvaluationVisitor {
       final boolean repeated = Types.isRepeated(e.getMajorType());
       final boolean listVector = e.getTypedFieldId().isListVector();
 
-      int[] fieldIds = e.getFieldId().getFieldIds();
-      for (int i = 1; i < fieldIds.length; i++) {
-
-      }
-
       if (!hasReadPath && !complex) {
         JBlock eval = new JBlock();
-        GetSetVectorHelper.read(e.getMajorType(),  vv1, eval, out, 
generator.getModel(), indexVariable);
+        GetSetVectorHelper.read(e.getMajorType(),  vv1, eval, out, 
generator.getModel(), recordIndex);
         generator.getEvalBlock().add(eval);
 
       } else {
@@ -444,7 +453,7 @@ public class EvaluationVisitor {
 
         // position to the correct value.
         eval.add(expr.invoke("reset"));
-        eval.add(expr.invoke("setPosition").arg(indexVariable));
+        eval.add(expr.invoke("setPosition").arg(recordIndex));
         int listNum = 0;
 
         while (seg != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index b70ad26..b461b5c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,13 +22,13 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashSet;
-import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.BooleanOperator;
@@ -64,7 +64,6 @@ import 
org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
 import org.apache.drill.common.expression.fn.CastFunctions;
 import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
 import org.apache.drill.common.expression.visitors.ConditionalExprOptimizer;
-import org.apache.drill.common.expression.visitors.ExprVisitor;
 import org.apache.drill.common.expression.visitors.ExpressionValidator;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -80,7 +79,6 @@ import org.apache.drill.exec.expr.fn.DrillFuncHolder;
 import org.apache.drill.exec.expr.fn.ExceptionFunction;
 import org.apache.drill.exec.expr.fn.FunctionLookupContext;
 import org.apache.drill.exec.expr.stat.TypedFieldExpr;
-import org.apache.drill.exec.record.MaterializeVisitor;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.resolver.FunctionResolver;
@@ -100,7 +98,7 @@ public class ExpressionTreeMaterializer {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ExpressionTreeMaterializer.class);
 
   private ExpressionTreeMaterializer() {
-  };
+  }
 
   public static LogicalExpression materialize(LogicalExpression expr, 
VectorAccessible batch, ErrorCollector errorCollector, FunctionLookupContext 
functionLookupContext) {
     return ExpressionTreeMaterializer.materialize(expr, batch, errorCollector, 
functionLookupContext, false, false);
@@ -126,9 +124,51 @@ public class ExpressionTreeMaterializer {
     return out;
   }
 
-  public static LogicalExpression materialize(LogicalExpression expr, 
VectorAccessible batch, ErrorCollector errorCollector, FunctionLookupContext 
functionLookupContext,
-      boolean allowComplexWriterExpr, boolean unionTypeEnabled) {
-    LogicalExpression out =  expr.accept(new MaterializeVisitor(batch, 
errorCollector, allowComplexWriterExpr, unionTypeEnabled), 
functionLookupContext);
+  /**
+   * Materializes logical expression taking into account passed parameters.
+   * Is used to materialize logical expression that contains reference to one 
batch.
+   *
+   * @param expr logical expression to be materialized
+   * @param batch batch instance
+   * @param errorCollector error collector
+   * @param functionLookupContext context to find drill function holder
+   * @param allowComplexWriterExpr true if complex expressions are allowed
+   * @param unionTypeEnabled true if union type is enabled
+   * @return materialized logical expression
+   */
+  public static LogicalExpression materialize(LogicalExpression expr,
+                                              VectorAccessible batch,
+                                              ErrorCollector errorCollector,
+                                              FunctionLookupContext 
functionLookupContext,
+                                              boolean allowComplexWriterExpr,
+                                              boolean unionTypeEnabled) {
+    Map<VectorAccessible, BatchReference> batches = Maps.newHashMap();
+    batches.put(batch, null);
+    return materialize(expr, batches, errorCollector, functionLookupContext, 
allowComplexWriterExpr, unionTypeEnabled);
+  }
+
+  /**
+   * Materializes logical expression taking into account passed parameters.
+   * Is used to materialize logical expression that can contain several 
batches with or without custom batch reference.
+   *
+   * @param expr logical expression to be materialized
+   * @param batches one or more batch instances used in expression
+   * @param errorCollector error collector
+   * @param functionLookupContext context to find drill function holder
+   * @param allowComplexWriterExpr true if complex expressions are allowed
+   * @param unionTypeEnabled true if union type is enabled
+   * @return materialized logical expression
+   */
+  public static LogicalExpression materialize(LogicalExpression expr,
+                                              Map<VectorAccessible, 
BatchReference> batches,
+                                              ErrorCollector errorCollector,
+                                              FunctionLookupContext 
functionLookupContext,
+                                              boolean allowComplexWriterExpr,
+                                              boolean unionTypeEnabled) {
+
+    LogicalExpression out = expr.accept(
+        new MaterializeVisitor(batches, errorCollector, 
allowComplexWriterExpr, unionTypeEnabled),
+        functionLookupContext);
 
     if (!errorCollector.hasErrors()) {
       out = out.accept(ConditionalExprOptimizer.INSTANCE, null);
@@ -224,24 +264,40 @@ public class ExpressionTreeMaterializer {
     errorCollector.addGeneralError(call.getPosition(), sb.toString());
   }
 
+  /**
+   * Visitor that wraps schema path into value vector read expression
+   * if schema path is present in one of the batches,
+   * otherwise instance of null expression.
+   */
   private static class MaterializeVisitor extends AbstractMaterializeVisitor {
-    private final VectorAccessible batch;
 
-    public MaterializeVisitor(VectorAccessible batch, ErrorCollector 
errorCollector, boolean allowComplexWriter, boolean unionTypeEnabled) {
+    private final Map<VectorAccessible, BatchReference> batches;
+
+    public MaterializeVisitor(Map<VectorAccessible, BatchReference> batches,
+                              ErrorCollector errorCollector,
+                              boolean allowComplexWriter,
+                              boolean unionTypeEnabled) {
       super(errorCollector, allowComplexWriter, unionTypeEnabled);
-      this.batch = batch;
+      this.batches = batches;
     }
 
     @Override
-    public LogicalExpression visitSchemaPath(SchemaPath path, 
FunctionLookupContext functionLookupContext) {
-      //      logger.debug("Visiting schema path {}", path);
-      TypedFieldId tfId = batch.getValueVectorId(path);
+    public LogicalExpression visitSchemaPath(final SchemaPath path, 
FunctionLookupContext functionLookupContext) {
+      TypedFieldId tfId = null;
+      BatchReference batchRef = null;
+      for (Map.Entry<VectorAccessible, BatchReference> entry : 
batches.entrySet()) {
+        tfId = entry.getKey().getValueVectorId(path);
+        if (tfId != null) {
+          batchRef = entry.getValue();
+          break;
+        }
+      }
+
       if (tfId == null) {
         logger.warn("Unable to find value vector of path {}, returning null 
instance.", path);
         return NullExpression.INSTANCE;
       } else {
-        ValueVectorReadExpression e = new ValueVectorReadExpression(tfId);
-        return e;
+        return new ValueVectorReadExpression(tfId, batchRef);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
index a556dc2..410c48a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,6 +19,7 @@ package org.apache.drill.exec.expr;
 
 import java.util.Iterator;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.PathSegment;
@@ -26,16 +27,28 @@ import 
org.apache.drill.common.expression.visitors.ExprVisitor;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.record.TypedFieldId;
 
-import com.google.common.collect.Iterators;
-
-public class ValueVectorReadExpression implements LogicalExpression{
+/**
+ * Wraps a value vector field to be read, providing metadata about the field.
+ * Also may contain batch naming information to which this field belongs.
+ * If such information is absent default namings will be used from mapping set 
during materialization.
+ */
+public class ValueVectorReadExpression implements LogicalExpression {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ValueVectorReadExpression.class);
 
   private final TypedFieldId fieldId;
-
+  private final BatchReference batchRef;
 
   public ValueVectorReadExpression(TypedFieldId tfId){
+    this(tfId, null);
+  }
+
+  public ValueVectorReadExpression(TypedFieldId tfId, BatchReference batchRef){
     this.fieldId = tfId;
+    this.batchRef = batchRef;
+  }
+
+  public BatchReference getBatchRef() {
+    return batchRef;
   }
 
   public boolean hasReadPath(){
@@ -74,7 +87,7 @@ public class ValueVectorReadExpression implements 
LogicalExpression{
 
   @Override
   public Iterator<LogicalExpression> iterator() {
-    return Iterators.emptyIterator();
+    return ImmutableSet.<LogicalExpression>of().iterator();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java
index fd584ea..1d747f7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/NestedLoopJoinPOP.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,7 +21,7 @@ package org.apache.drill.exec.physical.config;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
@@ -33,7 +33,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
 
 @JsonTypeName("nested-loop-join")
 public class NestedLoopJoinPOP extends AbstractBase {
@@ -42,27 +41,20 @@ public class NestedLoopJoinPOP extends AbstractBase {
 
   private final PhysicalOperator left;
   private final PhysicalOperator right;
-
-  /*
-   * Conditions and jointype are currently not used, since the condition is 
always true
-   * and we don't perform any special execution operation based on join type 
either. However
-   * when we enhance NLJ this would be used.
-   */
-  private final List<JoinCondition> conditions;
   private final JoinRelType joinType;
+  private final LogicalExpression condition;
 
   @JsonCreator
   public NestedLoopJoinPOP(
       @JsonProperty("left") PhysicalOperator left,
       @JsonProperty("right") PhysicalOperator right,
-      @JsonProperty("conditions") List<JoinCondition> conditions,
-      @JsonProperty("joinType") JoinRelType joinType
+      @JsonProperty("joinType") JoinRelType joinType,
+      @JsonProperty("condition") LogicalExpression condition
   ) {
     this.left = left;
     this.right = right;
-    this.conditions = conditions;
-    Preconditions.checkArgument(joinType != null, "Join type is missing!");
     this.joinType = joinType;
+    this.condition = condition;
   }
 
   @Override
@@ -72,8 +64,8 @@ public class NestedLoopJoinPOP extends AbstractBase {
 
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    Preconditions.checkArgument(children.size() == 2);
-    return new NestedLoopJoinPOP(children.get(0), children.get(1), conditions, 
joinType);
+    Preconditions.checkArgument(children.size() == 2, "Nested loop join should 
have two physical operators");
+    return new NestedLoopJoinPOP(children.get(0), children.get(1), joinType, 
condition);
   }
 
   @Override
@@ -93,9 +85,7 @@ public class NestedLoopJoinPOP extends AbstractBase {
     return joinType;
   }
 
-  public List<JoinCondition> getConditions() {
-    return conditions;
-  }
+  public LogicalExpression getCondition() { return condition; }
 
   @Override
   public int getOperatorType() {

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
index 6cf07a2..f7d96ad 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
@@ -36,8 +37,8 @@ public interface NestedLoopJoin {
                                   ExpandableHyperContainer rightContainer,
                                   LinkedList<Integer> rightCounts,
                                   NestedLoopJoinBatch outgoing);
-  // Produce output records
-  public int outputRecords();
+  // Produce output records taking into account join type
+  public int outputRecords(JoinRelType joinType);
 
   // Project the record at offset 'leftIndex' in the left input batch into the 
output container at offset 'outIndex'
   public void emitLeft(int leftIndex, int outIndex);

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index bdd9f0e..8336e86 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -19,9 +19,16 @@ package org.apache.drill.exec.physical.impl.join;
 
 import java.io.IOException;
 import java.util.LinkedList;
+import java.util.Map;
 
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -29,8 +36,11 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.BatchReference;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
+import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
@@ -38,6 +48,7 @@ import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.AllocationHelper;
 
@@ -45,6 +56,8 @@ import com.google.common.base.Preconditions;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
 import com.sun.codemodel.JVar;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 
 /*
  * RecordBatch implementation for the nested loop join operator
@@ -86,7 +99,7 @@ public class NestedLoopJoinBatch extends 
AbstractRecordBatch<NestedLoopJoinPOP>
   // We accumulate all the batches on the right side in a hyper container.
   private ExpandableHyperContainer rightContainer = new 
ExpandableHyperContainer();
 
-  // Record count of the individual batches in the right hypoer container
+  // Record count of the individual batches in the right hyper container
   private LinkedList<Integer> rightCounts = new LinkedList<>();
 
 
@@ -132,7 +145,6 @@ public class NestedLoopJoinBatch extends 
AbstractRecordBatch<NestedLoopJoinPOP>
    * Method drains the right side input of the NLJ and accumulates the data
    * in a hyper container. Once we have all the data from the right side we
    * process the left side one batch at a time and produce the output batch
-   * which is a cross product of the two sides.
    * @return IterOutcome state of the nested loop join batch
    */
   @Override
@@ -179,7 +191,7 @@ public class NestedLoopJoinBatch extends 
AbstractRecordBatch<NestedLoopJoinPOP>
     allocateVectors();
 
     // invoke the runtime generated method to emit records in the output batch
-    outputRecords = nljWorker.outputRecords();
+    outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
 
     // Set the record count
     for (final VectorWrapper<?> vw : container) {
@@ -214,26 +226,59 @@ public class NestedLoopJoinBatch extends 
AbstractRecordBatch<NestedLoopJoinPOP>
 
   /**
    * Method generates the runtime code needed for NLJ. Other than the setup 
method to set the input and output value
-   * vector references we implement two more methods
-   * 1. emitLeft()  -> Project record from the left side
-   * 2. emitRight() -> Project record from the right side (which is a hyper 
container)
+   * vector references we implement three more methods
+   * 1. doEval() -> Evaluates if record from left side matches record from the 
right side
+   * 2. emitLeft() -> Project record from the left side
+   * 3. emitRight() -> Project record from the right side (which is a hyper 
container)
    * @return the runtime generated class that implements the NestedLoopJoin 
interface
-   * @throws IOException
-   * @throws ClassTransformationException
    */
-  private NestedLoopJoin setupWorker() throws IOException, 
ClassTransformationException {
-    final CodeGenerator<NestedLoopJoin> nLJCodeGenerator = 
CodeGenerator.get(NestedLoopJoin.TEMPLATE_DEFINITION, 
context.getFunctionRegistry(), context.getOptions());
+  private NestedLoopJoin setupWorker() throws IOException, 
ClassTransformationException, SchemaChangeException {
+    final CodeGenerator<NestedLoopJoin> nLJCodeGenerator = CodeGenerator.get(
+        NestedLoopJoin.TEMPLATE_DEFINITION, context.getFunctionRegistry(), 
context.getOptions());
     nLJCodeGenerator.plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
 //    nLJCodeGenerator.saveCodeForDebugging(true);
     final ClassGenerator<NestedLoopJoin> nLJClassGenerator = 
nLJCodeGenerator.getRoot();
 
+    // generate doEval
+    final ErrorCollector collector = new ErrorCollectorImpl();
+
+    /*
+        Logical expression may contain fields from left and right batches. 
During code generation (materialization)
+        we need to indicate from which input field should be taken.
+
+        Non-equality joins can belong to one of below categories. For example:
+        1. Join on non-equality join predicates:
+        select * from t1 inner join t2 on (t1.c1 between t2.c1 AND t2.c2) AND 
(...)
+        2. Join with an OR predicate:
+        select * from t1 inner join t2 on on t1.c1 = t2.c1 OR t1.c2 = t2.c2
+     */
+    Map<VectorAccessible, BatchReference> batches = ImmutableMap
+        .<VectorAccessible, BatchReference>builder()
+        .put(left, new BatchReference("leftBatch", "leftIndex"))
+        .put(rightContainer, new BatchReference("rightContainer", 
"rightBatchIndex", "rightRecordIndexWithinBatch"))
+        .build();
+
+    LogicalExpression materialize = ExpressionTreeMaterializer.materialize(
+        popConfig.getCondition(),
+        batches,
+        collector,
+        context.getFunctionRegistry(),
+        false,
+        false);
+
+    if (collector.hasErrors()) {
+      throw new SchemaChangeException(String.format("Failure while trying to 
materialize join condition. Errors:\n %s.",
+          collector.toErrorString()));
+    }
+
+    nLJClassGenerator.addExpr(new ReturnValueExpression(materialize), 
ClassGenerator.BlkCreateMode.FALSE);
 
+    // generate emitLeft
     nLJClassGenerator.setMappingSet(emitLeftMapping);
     JExpression outIndex = JExpr.direct("outIndex");
     JExpression leftIndex = JExpr.direct("leftIndex");
 
-
     int fieldId = 0;
     int outputFieldId = 0;
     // Set the input and output value vector references corresponding to the 
left batch
@@ -243,8 +288,10 @@ public class NestedLoopJoinBatch extends 
AbstractRecordBatch<NestedLoopJoinPOP>
       // Add the vector to the output container
       container.addOrGet(field);
 
-      JVar inVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("leftBatch", new 
TypedFieldId(fieldType, false, fieldId));
-      JVar outVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("outgoing", new 
TypedFieldId(fieldType, false, outputFieldId));
+      JVar inVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("leftBatch",
+          new TypedFieldId(fieldType, false, fieldId));
+      JVar outVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("outgoing",
+          new TypedFieldId(fieldType, false, outputFieldId));
 
       
nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(leftIndex).arg(outIndex).arg(inVV));
       nLJClassGenerator.rotateBlock();
@@ -252,6 +299,7 @@ public class NestedLoopJoinBatch extends 
AbstractRecordBatch<NestedLoopJoinPOP>
       outputFieldId++;
     }
 
+    // generate emitRight
     fieldId = 0;
     nLJClassGenerator.setMappingSet(emitRightMapping);
     JExpression batchIndex = JExpr.direct("batchIndex");
@@ -260,12 +308,22 @@ public class NestedLoopJoinBatch extends 
AbstractRecordBatch<NestedLoopJoinPOP>
     // Set the input and output value vector references corresponding to the 
right batch
     for (MaterializedField field : rightSchema) {
 
-      final TypeProtos.MajorType fieldType = field.getType();
-      // Add the vector to our output container
-      container.addOrGet(field);
+      final TypeProtos.MajorType inputType = field.getType();
+      TypeProtos.MajorType outputType;
+      // if join type is LEFT, make sure right batch output fields data mode 
is optional
+      if (popConfig.getJoinType() == JoinRelType.LEFT && inputType.getMode() 
== TypeProtos.DataMode.REQUIRED) {
+        outputType = Types.overrideMode(inputType, 
TypeProtos.DataMode.OPTIONAL);
+      } else {
+        outputType = inputType;
+      }
+
+      MaterializedField newField = MaterializedField.create(field.getPath(), 
outputType);
+      container.addOrGet(newField);
 
-      JVar inVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("rightContainer", new 
TypedFieldId(field.getType(), true, fieldId));
-      JVar outVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("outgoing", new 
TypedFieldId(fieldType, false, outputFieldId));
+      JVar inVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("rightContainer",
+          new TypedFieldId(inputType, true, fieldId));
+      JVar outVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("outgoing",
+          new TypedFieldId(outputType, false, outputFieldId));
       nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe")
           .arg(recordIndexWithinBatch)
           .arg(outIndex)
@@ -290,7 +348,7 @@ public class NestedLoopJoinBatch extends 
AbstractRecordBatch<NestedLoopJoinPOP>
   /**
    * Builds the output container's schema. Goes over the left and the right
    * batch and adds the corresponding vectors to the output container.
-   * @throws SchemaChangeException
+   * @throws SchemaChangeException if batch schema was changed during execution
    */
   @Override
   protected void buildSchema() throws SchemaChangeException {
@@ -314,28 +372,39 @@ public class NestedLoopJoinBatch extends 
AbstractRecordBatch<NestedLoopJoinPOP>
         for (final VectorWrapper<?> vw : left) {
           container.addOrGet(vw.getField());
         }
-
-        // if we have a schema batch, skip it
-        if (left.getRecordCount() == 0) {
-          leftUpstream = next(LEFT_INPUT, left);
-        }
       }
 
       if (rightUpstream != IterOutcome.NONE) {
-        rightSchema = right.getSchema();
-        for (final VectorWrapper<?> vw : right) {
-          container.addOrGet(vw.getField());
+        // make right input schema optional if we have LEFT join
+        for (final VectorWrapper<?> vectorWrapper : right) {
+          TypeProtos.MajorType inputType = vectorWrapper.getField().getType();
+          TypeProtos.MajorType outputType;
+          if (popConfig.getJoinType() == JoinRelType.LEFT && 
inputType.getMode() == TypeProtos.DataMode.REQUIRED) {
+            outputType = Types.overrideMode(inputType, 
TypeProtos.DataMode.OPTIONAL);
+          } else {
+            outputType = inputType;
+          }
+          MaterializedField newField = 
MaterializedField.create(vectorWrapper.getField().getPath(), outputType);
+          ValueVector valueVector = container.addOrGet(newField);
+          if (valueVector instanceof AbstractContainerVector) {
+            vectorWrapper.getValueVector().makeTransferPair(valueVector);
+            valueVector.clear();
+          }
         }
+        rightSchema = right.getSchema();
         addBatchToHyperContainer(right);
       }
 
+      allocateVectors();
       nljWorker = setupWorker();
 
-      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-
-      allocateVectors();
+      // if left batch is empty, fetch next
+      if (leftUpstream != IterOutcome.NONE && left.getRecordCount() == 0) {
+        leftUpstream = next(LEFT_INPUT, left);
+      }
 
       container.setRecordCount(0);
+      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException(e);

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
index 842c891..bdd6f9d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
@@ -40,35 +41,32 @@ public abstract class NestedLoopJoinTemplate implements 
NestedLoopJoin {
   // Record count of the left batch currently being processed
   private int leftRecordCount = 0;
 
-  // List of record counts  per batch in the hyper container
+  // List of record counts per batch in the hyper container
   private List<Integer> rightCounts = null;
 
   // Output batch
   private NestedLoopJoinBatch outgoing = null;
 
-  // Next right batch to process
-  private int nextRightBatchToProcess = 0;
-
-  // Next record in the current right batch to process
-  private int nextRightRecordToProcess = 0;
-
-  // Next record in the left batch to process
-  private int nextLeftRecordToProcess = 0;
+  // Iteration status tracker
+  private IterationStatusTracker tracker = new IterationStatusTracker();
 
   /**
    * Method initializes necessary state and invokes the doSetup() to set the
-   * input and output value vector references
+   * input and output value vector references.
+   *
    * @param context Fragment context
    * @param left Current left input batch being processed
    * @param rightContainer Hyper container
+   * @param rightCounts Counts for each right container
    * @param outgoing Output batch
    */
-  public void setupNestedLoopJoin(FragmentContext context, RecordBatch left,
+  public void setupNestedLoopJoin(FragmentContext context,
+                                  RecordBatch left,
                                   ExpandableHyperContainer rightContainer,
                                   LinkedList<Integer> rightCounts,
                                   NestedLoopJoinBatch outgoing) {
     this.left = left;
-    leftRecordCount = left.getRecordCount();
+    this.leftRecordCount = left.getRecordCount();
     this.rightCounts = rightCounts;
     this.outgoing = outgoing;
 
@@ -76,96 +74,100 @@ public abstract class NestedLoopJoinTemplate implements 
NestedLoopJoin {
   }
 
   /**
-   * This method is the core of the nested loop join. For every record on the 
right we go over
-   * the left batch and produce the cross product output
+   * Main entry point for producing the output records. Thin wrapper around 
populateOutgoingBatch(), this method
+   * controls which left batch we are processing and fetches the next left 
input batch once we exhaust the current one.
+   *
+   * @param joinType join type (INNER ot LEFT)
+   * @return the number of records produced in the output batch
+   */
+  public int outputRecords(JoinRelType joinType) {
+    int outputIndex = 0;
+    while (leftRecordCount != 0) {
+      outputIndex = populateOutgoingBatch(joinType, outputIndex);
+      if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+        break;
+      }
+      // reset state and get next left batch
+      resetAndGetNextLeft();
+    }
+    return outputIndex;
+  }
+
+  /**
+   * This method is the core of the nested loop join.For each left batch 
record looks for matching record
+   * from the list of right batches. Match is checked by calling {@link 
#doEval(int, int, int)} method.
+   * If matching record is found both left and right records are written into 
output batch,
+   * otherwise if join type is LEFT, than only left record is written, right 
batch record values will be null.
+   *
+   * @param joinType join type (INNER or LEFT)
    * @param outputIndex index to start emitting records at
    * @return final outputIndex after producing records in the output batch
    */
-  private int populateOutgoingBatch(int outputIndex) {
-
-    // Total number of batches on the right side
-    int totalRightBatches = rightCounts.size();
-
-    // Total number of records on the left
-    int localLeftRecordCount = leftRecordCount;
-
-    /*
-     * The below logic is the core of the NLJ. To have better performance we 
copy the instance members into local
-     * method variables, once we are done with the loop we need to update the 
instance variables to reflect the new
-     * state. To avoid code duplication of resetting the instance members at 
every exit point in the loop we are using
-     * 'goto'
-     */
-    int localNextRightBatchToProcess = nextRightBatchToProcess;
-    int localNextRightRecordToProcess = nextRightRecordToProcess;
-    int localNextLeftRecordToProcess = nextLeftRecordToProcess;
-
-    outer: {
-
-      for (; localNextRightBatchToProcess< totalRightBatches; 
localNextRightBatchToProcess++) { // for every batch on the right
-        int compositeIndexPart = localNextRightBatchToProcess << 16;
-        int rightRecordCount = rightCounts.get(localNextRightBatchToProcess);
-
-        for (; localNextRightRecordToProcess < rightRecordCount; 
localNextRightRecordToProcess++) { // for every record in this right batch
-          for (; localNextLeftRecordToProcess < localLeftRecordCount; 
localNextLeftRecordToProcess++) { // for every record in the left batch
-
+  private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) {
+    // copy index and match counters as local variables to speed up processing
+    int nextRightBatchToProcess = tracker.getNextRightBatchToProcess();
+    int nextRightRecordToProcess = tracker.getNextRightRecordToProcess();
+    int nextLeftRecordToProcess = tracker.getNextLeftRecordToProcess();
+    boolean rightRecordMatched = tracker.isRightRecordMatched();
+
+    outer:
+    // for every record in the left batch
+    for (; nextLeftRecordToProcess < leftRecordCount; 
nextLeftRecordToProcess++) {
+      // for every batch on the right
+      for (; nextRightBatchToProcess < rightCounts.size(); 
nextRightBatchToProcess++) {
+        int rightRecordCount = rightCounts.get(nextRightBatchToProcess);
+        // for every record in right batch
+        for (; nextRightRecordToProcess < rightRecordCount; 
nextRightRecordToProcess++) {
+
+          if (doEval(nextLeftRecordToProcess, nextRightBatchToProcess, 
nextRightRecordToProcess)) {
             // project records from the left and right batches
-            emitLeft(localNextLeftRecordToProcess, outputIndex);
-            emitRight(localNextRightBatchToProcess, 
localNextRightRecordToProcess, outputIndex);
+            emitLeft(nextLeftRecordToProcess, outputIndex);
+            emitRight(nextRightBatchToProcess, nextRightRecordToProcess, 
outputIndex);
             outputIndex++;
+            rightRecordMatched = true;
 
-            // TODO: Optimization; We can eliminate this check and compute the 
limits before the loop
             if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
-              localNextLeftRecordToProcess++;
+              nextRightRecordToProcess++;
 
               // no more space left in the batch, stop processing
               break outer;
             }
           }
-          localNextLeftRecordToProcess = 0;
         }
-        localNextRightRecordToProcess = 0;
+        nextRightRecordToProcess = 0;
       }
-    }
-
-    // update the instance members
-    nextRightBatchToProcess = localNextRightBatchToProcess;
-    nextRightRecordToProcess = localNextRightRecordToProcess;
-    nextLeftRecordToProcess = localNextLeftRecordToProcess;
-
-    // done with the current left batch and there is space in the output batch 
continue processing
-    return outputIndex;
-  }
-
-  /**
-   * Main entry point for producing the output records. Thin wrapper around 
populateOutgoingBatch(), this method
-   * controls which left batch we are processing and fetches the next left 
input batch one we exhaust
-   * the current one.
-   * @return the number of records produced in the output batch
-   */
-  public int outputRecords() {
-    int outputIndex = 0;
-    while (leftRecordCount != 0) {
-      outputIndex = populateOutgoingBatch(outputIndex);
-      if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
-        break;
+      nextRightBatchToProcess = 0;
+      if (joinType == JoinRelType.LEFT && !rightRecordMatched) {
+        // project records from the left side only, records from right will be 
null
+        emitLeft(nextLeftRecordToProcess, outputIndex);
+        outputIndex++;
+        if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+          nextLeftRecordToProcess++;
+
+          // no more space left in the batch, stop processing
+          break;
+        }
+      } else {
+        // reset match indicator if matching record was found
+        rightRecordMatched = false;
       }
-      // reset state and get next left batch
-      resetAndGetNextLeft();
     }
+
+    // update iteration status tracker with actual index and match counters
+    tracker.update(nextRightBatchToProcess, nextRightRecordToProcess, 
nextLeftRecordToProcess, rightRecordMatched);
     return outputIndex;
   }
 
   /**
-   * Utility method to clear the memory in the left input batch once we have 
completed processing it. Resets some
-   * internal state which indicate the next records to process in the left and 
right batches. Also fetches the next
-   * left input batch.
+   * Utility method to clear the memory in the left input batch once we have 
completed processing it.
+   * Resets some internal state which indicates the next records to process in 
the left and right batches,
+   * also fetches the next left input batch.
    */
   private void resetAndGetNextLeft() {
-
     for (VectorWrapper<?> vw : left) {
       vw.getValueVector().clear();
     }
-    nextRightBatchToProcess = nextRightRecordToProcess = 
nextLeftRecordToProcess = 0;
+    tracker.reset();
     RecordBatch.IterOutcome leftOutcome = 
outgoing.next(NestedLoopJoinBatch.LEFT_INPUT, left);
     switch (leftOutcome) {
       case OK_NEW_SCHEMA:
@@ -191,5 +193,57 @@ public abstract class NestedLoopJoinTemplate implements 
NestedLoopJoin {
                                  @Named("recordIndexWithinBatch") int 
recordIndexWithinBatch,
                                  @Named("outIndex") int outIndex);
 
-  public abstract void emitLeft(@Named("leftIndex") int leftIndex, 
@Named("outIndex") int outIndex);
+  public abstract void emitLeft(@Named("leftIndex") int leftIndex,
+                                @Named("outIndex") int outIndex);
+
+  protected abstract boolean doEval(@Named("leftIndex") int leftIndex,
+                                    @Named("rightBatchIndex") int batchIndex,
+                                    @Named("rightRecordIndexWithinBatch") int 
recordIndexWithinBatch);
+
+  /**
+   * Helper class to track position of left and record batches during iteration
+   * and match status of record from the right batch.
+   */
+  private static class IterationStatusTracker {
+    // Next right batch to process
+    private int nextRightBatchToProcess;
+    // Next record in the current right batch to process
+    private int nextRightRecordToProcess;
+    // Next record in the left batch to process
+    private int nextLeftRecordToProcess;
+    // Flag to indicate if record from the left found matching record from the 
right, applicable during left join
+    private boolean rightRecordMatched;
+
+    int getNextRightBatchToProcess() {
+      return nextRightBatchToProcess;
+    }
+
+    boolean isRightRecordMatched() {
+      return rightRecordMatched;
+    }
+
+    int getNextLeftRecordToProcess() {
+      return nextLeftRecordToProcess;
+    }
+
+    int getNextRightRecordToProcess() {
+      return nextRightRecordToProcess;
+    }
+
+    void update(int nextRightBatchToProcess,
+                int nextRightRecordToProcess,
+                int nextLeftRecordToProcess,
+                boolean rightRecordMatchFound) {
+      this.nextRightBatchToProcess = nextRightBatchToProcess;
+      this.nextRightRecordToProcess = nextRightRecordToProcess;
+      this.nextLeftRecordToProcess = nextLeftRecordToProcess;
+      this.rightRecordMatched = rightRecordMatchFound;
+    }
+
+    void reset() {
+      nextRightBatchToProcess = nextRightRecordToProcess = 
nextLeftRecordToProcess = 0;
+      rightRecordMatched = false;
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 1551040..513db9b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet.Builder;
+import com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.volcano.AbstractConverter.ExpandConversionRule;
 import org.apache.calcite.rel.core.RelFactories;
@@ -126,11 +127,14 @@ public enum PlannerPhase {
 
   JOIN_PLANNING("LOPT Join Planning") {
     public RuleSet getRules(OptimizerRulesContext context, 
Collection<StoragePlugin> plugins) {
+      List<RelOptRule> rules = Lists.newArrayList();
+      if (context.getPlannerSettings().isJoinOptimizationEnabled()) {
+        rules.add(DRILL_JOIN_TO_MULTIJOIN_RULE);
+        rules.add(DRILL_LOPT_OPTIMIZE_JOIN_RULE);
+      }
+      rules.add(ProjectRemoveRule.INSTANCE);
       return PlannerPhase.mergedRuleSets(
-          RuleSets.ofList(
-              DRILL_JOIN_TO_MULTIJOIN_RULE,
-              DRILL_LOPT_OPTIMIZE_JOIN_RULE,
-              ProjectRemoveRule.INSTANCE),
+          RuleSets.ofList(rules),
           getStorageRules(context, plugins, this)
           );
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
index 4a94c71..19c7524 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -14,12 +14,13 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- 
******************************************************************************/
+*/
 package org.apache.drill.exec.planner.logical;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import io.netty.buffer.DrillBuf;
+import org.apache.calcite.rel.RelNode;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.ExpressionStringBuilder;
@@ -120,7 +121,7 @@ public class DrillConstExecutor implements 
RelOptPlanner.Executor {
   @Override
   public void reduce(final RexBuilder rexBuilder, List<RexNode> constExps, 
final List<RexNode> reducedValues) {
     for (final RexNode newCall : constExps) {
-      LogicalExpression logEx = DrillOptiq.toDrill(new 
DrillParseContext(plannerSettings), null /* input rel */, newCall);
+      LogicalExpression logEx = DrillOptiq.toDrill(new 
DrillParseContext(plannerSettings), (RelNode) null /* input rel */, newCall);
 
       ErrorCollectorImpl errors = new ErrorCollectorImpl();
       final LogicalExpression materializedExpr = 
ExpressionTreeMaterializer.materialize(logEx, null, errors, funcImplReg);

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
index 87b76ae..5a90787 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,7 +22,7 @@ import java.util.GregorianCalendar;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.calcite.rel.logical.LogicalAggregate;
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
@@ -70,27 +70,65 @@ public class DrillOptiq {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DrillOptiq.class);
 
   /**
-   * Converts a tree of {@link RexNode} operators into a scalar expression in 
Drill syntax.
+   * Converts a tree of {@link RexNode} operators into a scalar expression in 
Drill syntax using one input.
+   *
+   * @param context parse context which contains planner settings
+   * @param input data input
+   * @param expr expression to be converted
+   * @return converted expression
    */
   public static LogicalExpression toDrill(DrillParseContext context, RelNode 
input, RexNode expr) {
-    final RexToDrill visitor = new RexToDrill(context, input);
+    return toDrill(context, Lists.newArrayList(input), expr);
+  }
+
+  /**
+   * Converts a tree of {@link RexNode} operators into a scalar expression in 
Drill syntax using multiple inputs.
+   *
+   * @param context parse context which contains planner settings
+   * @param inputs multiple data inputs
+   * @param expr expression to be converted
+   * @return converted expression
+   */
+  public static LogicalExpression toDrill(DrillParseContext context, 
List<RelNode> inputs, RexNode expr) {
+    final RexToDrill visitor = new RexToDrill(context, inputs);
     return expr.accept(visitor);
   }
 
   private static class RexToDrill extends RexVisitorImpl<LogicalExpression> {
-    private final RelNode input;
+    private final List<RelNode> inputs;
     private final DrillParseContext context;
+    private final List<RelDataTypeField> fieldList;
 
-    RexToDrill(DrillParseContext context, RelNode input) {
+    RexToDrill(DrillParseContext context, List<RelNode> inputs) {
       super(true);
       this.context = context;
-      this.input = input;
+      this.inputs = inputs;
+      this.fieldList = Lists.newArrayList();
+      /*
+         Fields are enumerated by their presence order in input. Details 
{@link org.apache.calcite.rex.RexInputRef}.
+         Thus we can merge field list from several inputs by adding them into 
the list in order of appearance.
+         Each field index in the list will match field index in the 
RexInputRef instance which will allow us
+         to retrieve field from filed list by index in {@link 
#visitInputRef(RexInputRef)} method. Example:
+
+         Query: select t1.c1, t2.c1. t2.c2 from t1 inner join t2 on t1.c1 
between t2.c1 and t2.c2
+
+         Input 1: $0
+         Input 2: $1, $2
+
+         Result: $0, $1, $2
+       */
+      for (RelNode input : inputs) {
+        if (input != null) {
+          fieldList.addAll(input.getRowType().getFieldList());
+        }
+      }
     }
 
     @Override
     public LogicalExpression visitInputRef(RexInputRef inputRef) {
       final int index = inputRef.getIndex();
-      final RelDataTypeField field = 
input.getRowType().getFieldList().get(index);
+      final RelDataTypeField field = fieldList.get(index);
+      Preconditions.checkNotNull(field, "Unable to find field using input 
reference");
       return FieldReference.getWithQuotedRef(field.getName());
     }
 
@@ -129,7 +167,7 @@ public class DrillOptiq {
             return 
FunctionCallFactory.createExpression(call.getOperator().getName().toLowerCase(),
                 ExpressionPosition.UNKNOWN, arg);
           case MINUS_PREFIX:
-            final RexBuilder builder = input.getCluster().getRexBuilder();
+            final RexBuilder builder = 
inputs.get(0).getCluster().getRexBuilder();
             final List<RexNode> operands = Lists.newArrayList();
             operands.add(builder.makeExactLiteral(new BigDecimal(-1)));
             operands.add(call.getOperands().get(0));

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index dd74c92..80e8dda 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -34,8 +34,6 @@ import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 
 import com.google.common.collect.ImmutableList;
@@ -44,7 +42,7 @@ import com.google.common.collect.Lists;
 // abstract base class for the join physical rules
 public abstract class JoinPruleBase extends Prule {
 
-  protected static enum PhysicalJoinType {HASH_JOIN, MERGE_JOIN, 
NESTEDLOOP_JOIN};
+  protected enum PhysicalJoinType {HASH_JOIN, MERGE_JOIN, NESTEDLOOP_JOIN}
 
   protected JoinPruleBase(RelOptRuleOperand operand, String description) {
     super(operand, description);
@@ -56,10 +54,7 @@ public abstract class JoinPruleBase extends Prule {
     List<Integer> rightKeys = Lists.newArrayList();
     List<Boolean> filterNulls = Lists.newArrayList();
     JoinCategory category = JoinUtils.getJoinCategory(left, right, 
join.getCondition(), leftKeys, rightKeys, filterNulls);
-    if (category == JoinCategory.CARTESIAN || category == 
JoinCategory.INEQUALITY) {
-      return false;
-    }
-    return true;
+    return !(category == JoinCategory.CARTESIAN || category == 
JoinCategory.INEQUALITY);
   }
 
   protected List<DistributionField> getDistributionField(List<Integer> keys) {
@@ -238,26 +233,14 @@ public abstract class JoinPruleBase extends Prule {
 
     } else {
       if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
-        call.transformTo(new MergeJoinPrel(join.getCluster(), 
convertedLeft.getTraitSet(), convertedLeft, convertedRight, joinCondition,
-            join.getJoinType()));
-
+        call.transformTo(new MergeJoinPrel(join.getCluster(), 
convertedLeft.getTraitSet(), convertedLeft,
+            convertedRight, joinCondition, join.getJoinType()));
       } else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
-        call.transformTo(new HashJoinPrel(join.getCluster(), 
convertedLeft.getTraitSet(), convertedLeft, convertedRight, joinCondition,
-            join.getJoinType()));
+        call.transformTo(new HashJoinPrel(join.getCluster(), 
convertedLeft.getTraitSet(), convertedLeft,
+            convertedRight, joinCondition, join.getJoinType()));
       } else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
-        if (joinCondition.isAlwaysTrue()) {
-          call.transformTo(new NestedLoopJoinPrel(join.getCluster(), 
convertedLeft.getTraitSet(), convertedLeft, convertedRight, joinCondition,
-            join.getJoinType()));
-        } else {
-          RexBuilder builder = join.getCluster().getRexBuilder();
-          RexLiteral condition = builder.makeLiteral(true); // TRUE condition 
for the NLJ
-
-          FilterPrel newFilterRel = new FilterPrel(join.getCluster(), 
convertedLeft.getTraitSet(),
-              new NestedLoopJoinPrel(join.getCluster(), 
convertedLeft.getTraitSet(), convertedLeft, convertedRight,
-                  condition, join.getJoinType()),
-              joinCondition);
-          call.transformTo(newFilterRel);
-        }
+        call.transformTo(new NestedLoopJoinPrel(join.getCluster(), 
convertedLeft.getTraitSet(), convertedLeft,
+            convertedRight, joinCondition, join.getJoinType()));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java
index 7c4798f..b184eab 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrel.java
@@ -18,13 +18,15 @@
 package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
-import java.util.List;
 
-import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.core.Join;
@@ -35,17 +37,13 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
 
-import com.google.common.collect.Lists;
-
 public class NestedLoopJoinPrel  extends JoinPrel {
 
   public NestedLoopJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode 
left, RelNode right, RexNode condition,
                       JoinRelType joinType) throws InvalidRelException {
     super(cluster, traits, left, right, condition, joinType);
-    RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys, 
filterNulls);
   }
 
   @Override
@@ -71,8 +69,9 @@ public class NestedLoopJoinPrel  extends JoinPrel {
     double rightRowCount = mq.getRowCount(this.getRight());
     double nljFactor = 
PrelUtil.getSettings(getCluster()).getNestedLoopJoinFactor();
 
-    // cpu cost of evaluating each leftkey=rightkey join condition
-    double joinConditionCost = DrillCostBase.COMPARE_CPU_COST * 
this.getLeftKeys().size();
+    // cpu cost of evaluating each expression in join condition
+    int exprNum = RelOptUtil.conjunctions(getCondition()).size() + 
RelOptUtil.disjunctions(getCondition()).size();
+    double joinConditionCost = DrillCostBase.COMPARE_CPU_COST * exprNum;
 
     double cpuCost = joinConditionCost * (leftRowCount * rightRowCount) * 
nljFactor;
 
@@ -82,23 +81,29 @@ public class NestedLoopJoinPrel  extends JoinPrel {
 
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) 
throws IOException {
-    final List<String> fields = getRowType().getFieldNames();
-    assert isUnique(fields);
-
-    final List<String> leftFields = left.getRowType().getFieldNames();
-    final List<String> rightFields = right.getRowType().getFieldNames();
-
     PhysicalOperator leftPop = ((Prel)left).getPhysicalOperator(creator);
     PhysicalOperator rightPop = ((Prel)right).getPhysicalOperator(creator);
 
-    JoinRelType jtype = this.getJoinType();
-
-    List<JoinCondition> conditions = Lists.newArrayList();
-
-    buildJoinConditions(conditions, leftFields, rightFields, leftKeys, 
rightKeys);
-
-    NestedLoopJoinPOP nljoin = new NestedLoopJoinPOP(leftPop, rightPop, 
conditions, jtype);
-    return creator.addMetadata(this, nljoin);
+    /*
+       Raw expression will be transformed into its logical representation. For 
example:
+       Query:
+         select t1.c1, t2.c1, t2.c2 from t1 inner join t2 on t1.c1 between 
t2.c1 and t2.c2
+       Raw expression:
+         AND(>=($0, $1), <=($0, $2))
+       Logical expression:
+         FunctionCall [func=booleanAnd,
+         args=[FunctionCall [func=greater_than_or_equal_to, args=[`i1`, 
`i10`]],
+               FunctionCall [func=less_than_or_equal_to, args=[`i1`, `i2`]]]
+
+       Both tables have the same column name thus duplicated column name in 
second table are renamed: i1 -> i10.
+    */
+    LogicalExpression condition = DrillOptiq.toDrill(
+        new DrillParseContext(PrelUtil.getSettings(getCluster())),
+        getInputs(),
+        getCondition());
+
+    NestedLoopJoinPOP nlj = new NestedLoopJoinPOP(leftPop, rightPop, 
getJoinType(), condition);
+    return creator.addMetadata(this, nlj);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
index bfb47d6..b98976b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
@@ -49,7 +49,7 @@ public class NestedLoopJoinPrule extends JoinPruleBase {
       PlannerSettings settings) {
     JoinRelType type = join.getJoinType();
 
-    if (!(type == JoinRelType.INNER || (type == JoinRelType.LEFT && 
JoinUtils.hasScalarSubqueryInput(left, right)))) {
+    if (!(type == JoinRelType.INNER || type == JoinRelType.LEFT)) {
       return false;
     }
 
@@ -63,11 +63,7 @@ public class NestedLoopJoinPrule extends JoinPruleBase {
     }
 
     if (settings.isNlJoinForScalarOnly()) {
-      if (JoinUtils.hasScalarSubqueryInput(left, right)) {
-        return true;
-      } else {
-        return false;
-      }
+      return JoinUtils.hasScalarSubqueryInput(left, right);
     }
 
     return true;

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index da5bc41..53d67c0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -110,6 +110,30 @@ public class PlannerSettings implements Context{
   public static final EnumeratedStringValidator QUOTING_IDENTIFIERS = new 
EnumeratedStringValidator(
       QUOTING_IDENTIFIERS_KEY, Quoting.BACK_TICK.string, 
Quoting.DOUBLE_QUOTE.string, Quoting.BRACKET.string);
 
+  /*
+     Enables rules that re-write query joins in the most optimal way.
+     Though its turned on be default and its value in query optimization is 
undeniable, user may want turn off such
+     optimization to leave join order indicated in sql query unchanged.
+
+     For example:
+     Currently only nested loop join allows non-equi join conditions usage.
+     During planning stage nested loop join will be chosen when non-equi join 
is detected
+     and {@link #NLJOIN_FOR_SCALAR} set to false. Though query performance may 
not be the most optimal in such case,
+     user may use such workaround to execute queries with non-equi joins.
+
+     Nested loop join allows only INNER and LEFT join usage and implies that 
right input is smaller that left input.
+     During LEFT join when join optimization is enabled and detected that 
right input is larger that left,
+     join will be optimized: left and right inputs will be flipped and LEFT 
join type will be changed to RIGHT one.
+     If query contains non-equi joins, after such optimization it will fail, 
since nested loop does not allow
+     RIGHT join. In this case if user accepts probability of non optimal 
performance, he may turn off join optimization.
+     Turning off join optimization, makes sense only if user are not sure that 
right output is less or equal to left,
+     otherwise join optimization can be left turned on.
+
+     Note: once hash and merge joins will allow non-equi join conditions,
+     the need to turn off join optimization may go away.
+   */
+  public static final BooleanValidator JOIN_OPTIMIZATION = new 
BooleanValidator("planner.enable_join_optimization", true);
+
   public OptionManager options = null;
   public FunctionImplementationRegistry functionImplementationRegistry = null;
 
@@ -282,6 +306,10 @@ public class PlannerSettings implements Context{
         .build(logger);
   }
 
+  public boolean isJoinOptimizationEnabled() {
+    return options.getOption(JOIN_OPTIMIZATION);
+  }
+
   @Override
   public <T> T unwrap(Class<T> clazz) {
     if(clazz == PlannerSettings.class){

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 09c5259..6b8b49a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -92,6 +92,7 @@ public class SystemOptionManager extends BaseOptionManager 
implements AutoClosea
       PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING,
       PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD,
       PlannerSettings.QUOTING_IDENTIFIERS,
+      PlannerSettings.JOIN_OPTIMIZATION,
       ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
       ExecConstants.OUTPUT_FORMAT_VALIDATOR,
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
index 6210022..10a9372 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
@@ -21,11 +21,10 @@ package org.apache.drill.exec.physical.impl.join;
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.util.TestTools;
-import org.junit.Ignore;
 import org.junit.Test;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.StringStartsWith.startsWith;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertThat;
 
 public class TestNestedLoopJoin extends PlanTestBase {
 
@@ -33,16 +32,15 @@ public class TestNestedLoopJoin extends PlanTestBase {
   private static final String WORKING_PATH = TestTools.getWorkingPath();
   private static final String TEST_RES_PATH = WORKING_PATH + 
"/src/test/resources";
 
-  private static final String NLJ = "Alter session set 
`planner.enable_hashjoin` = false; " +
-      "alter session set `planner.enable_mergejoin` = false; " +
-      "alter session set `planner.enable_nljoin_for_scalar_only` = false; ";
-  private static final String SINGLE_NLJ = "alter session set 
`planner.disable_exchanges` = true; " + NLJ;
   private static final String DISABLE_HJ = "alter session set 
`planner.enable_hashjoin` = false";
   private static final String ENABLE_HJ = "alter session set 
`planner.enable_hashjoin` = true";
+  private static final String RESET_HJ = "alter session reset 
`planner.enable_hashjoin`";
   private static final String DISABLE_MJ = "alter session set 
`planner.enable_mergejoin` = false";
   private static final String ENABLE_MJ = "alter session set 
`planner.enable_mergejoin` = true";
   private static final String DISABLE_NLJ_SCALAR = "alter session set 
`planner.enable_nljoin_for_scalar_only` = false";
   private static final String ENABLE_NLJ_SCALAR = "alter session set 
`planner.enable_nljoin_for_scalar_only` = true";
+  private static final String DISABLE_JOIN_OPTIMIZATION = "alter session set 
`planner.enable_join_optimization` = false";
+  private static final String RESET_JOIN_OPTIMIZATION = "alter session reset 
`planner.enable_join_optimization`";
 
   // Test queries used by planning and execution tests
   private static final String testNlJoinExists_1 = "select r_regionkey from 
cp.`tpch/region.parquet` "
@@ -68,6 +66,15 @@ public class TestNestedLoopJoin extends PlanTestBase {
   private static final String testNlJoinInequality_3 = "select r_regionkey 
from cp.`tpch/region.parquet` "
       + " where r_regionkey > (select min(n_regionkey) * 2 from 
cp.`tpch/nation.parquet` )";
 
+  private static final String testNlJoinBetween = "select " +
+      "n.n_nationkey, length(r.r_name) r_name_len, length(r.r_comment) 
r_comment_len " +
+      "from (select * from cp.`tpch/nation.parquet` where n_regionkey = 1) n " 
+
+      "%s join (select * from cp.`tpch/region.parquet` where r_regionkey = 1) 
r " +
+      "on n.n_nationkey between length(r.r_name) and length(r.r_comment) " +
+      "order by n.n_nationkey";
+
+  private static final String testNlJoinWithLargeRightInput = "select * from 
cp.`tpch/region.parquet`r " +
+      "left join cp.`tpch/nation.parquet` n on r.r_regionkey <> n.n_regionkey";
 
   @Test
   public void testNlJoinExists_1_planning() throws Exception {
@@ -75,7 +82,6 @@ public class TestNestedLoopJoin extends PlanTestBase {
   }
 
   @Test
-  // @Ignore
   public void testNlJoinNotIn_1_planning() throws Exception {
     testPlanMatchingPatterns(testNlJoinNotIn_1, new String[]{nlpattern}, new 
String[]{});
   }
@@ -93,7 +99,6 @@ public class TestNestedLoopJoin extends PlanTestBase {
   }
 
   @Test
-  @Ignore // Re-test after CALCITE-695 is resolved
   public void testNlJoinInequality_3() throws Exception {
     test(DISABLE_NLJ_SCALAR);
     testPlanMatchingPatterns(testNlJoinInequality_3, new String[]{nlpattern}, 
new String[]{});
@@ -103,8 +108,8 @@ public class TestNestedLoopJoin extends PlanTestBase {
   @Test
   public void testNlJoinAggrs_1_planning() throws Exception {
     String query = "select total1, total2 from "
-       + "(select sum(l_quantity) as total1 from cp.`tpch/lineitem.parquet` 
where l_suppkey between 100 and 200), "
-       + "(select sum(l_quantity) as total2 from cp.`tpch/lineitem.parquet` 
where l_suppkey between 200 and 300)  ";
+        + "(select sum(l_quantity) as total1 from cp.`tpch/lineitem.parquet` 
where l_suppkey between 100 and 200), "
+        + "(select sum(l_quantity) as total2 from cp.`tpch/lineitem.parquet` 
where l_suppkey between 200 and 300)  ";
     testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{});
   }
 
@@ -207,7 +212,7 @@ public class TestNestedLoopJoin extends PlanTestBase {
 
   @Test
   public void testNLJWithEmptyBatch() throws Exception {
-    Long result = 0l;
+    long result = 0L;
 
     test(DISABLE_NLJ_SCALAR);
     test(DISABLE_HJ);
@@ -256,18 +261,68 @@ public class TestNestedLoopJoin extends PlanTestBase {
     test(ENABLE_MJ);
   }
 
+  @Test
+  public void testNlJoinInnerBetween() throws Exception {
+    try {
+      test(DISABLE_NLJ_SCALAR);
+      String query = String.format(testNlJoinBetween, "INNER");
+      testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("n_nationkey", "r_name_length", "r_comment_length")
+          .baselineValues(17, 7, 31)
+          .baselineValues(24, 7, 31)
+          .build();
+    } finally {
+      test(RESET_HJ);
+    }
+  }
+
+  @Test
+  public void testNlJoinLeftBetween() throws Exception {
+    try {
+      test(DISABLE_NLJ_SCALAR);
+      String query = String.format(testNlJoinBetween, "LEFT");
+      testPlanMatchingPatterns(query, new String[]{nlpattern}, new String[]{});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("n_nationkey", "r_name_length", "r_comment_length")
+          .baselineValues(1, null, null)
+          .baselineValues(2, null, null)
+          .baselineValues(3, null, null)
+          .baselineValues(17, 7, 31)
+          .baselineValues(24, 7, 31)
+          .build();
+    } finally {
+      test(RESET_HJ);
+    }
+  }
+
   @Test(expected = UserRemoteException.class)
-  public void testExceptionLeftNlJoin() throws Exception {
+  public void testNlJoinWithLargeRightInputFailure() throws Exception {
     try {
       test(DISABLE_NLJ_SCALAR);
-      test("select r.r_regionkey, n.n_nationkey from cp.`tpch/nation.parquet` 
n " +
-            " left join cp.`tpch/region.parquet` r on n.n_regionkey < 
r.r_regionkey where n.n_nationkey < 3");
+      test(testNlJoinWithLargeRightInput);
     } catch (UserRemoteException e) {
-      assertThat("No expected current \"UNSUPPORTED_OPERATION ERROR\"",
-        e.getMessage(), startsWith("UNSUPPORTED_OPERATION ERROR"));
+      assertThat(e.getMessage(), containsString("UNSUPPORTED_OPERATION ERROR: 
This query cannot be planned " +
+          "possibly due to either a cartesian join or an inequality join"));
       throw e;
     } finally {
-      test("alter session reset `planner.enable_nljoin_for_scalar_only`");
+      test(RESET_HJ);
+    }
+  }
+
+  @Test
+  public void testNlJoinWithLargeRightInputSuccess() throws Exception {
+    try {
+      test(DISABLE_NLJ_SCALAR);
+      test(DISABLE_JOIN_OPTIMIZATION);
+      testPlanMatchingPatterns(testNlJoinWithLargeRightInput, new 
String[]{nlpattern}, new String[]{});
+    } finally {
+      test(RESET_HJ);
+      test(RESET_JOIN_OPTIMIZATION);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8e19d61b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillOptiqTest.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillOptiqTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillOptiqTest.java
index c3a9c20..6620585 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillOptiqTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillOptiqTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.planner.logical;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexBuilder;
@@ -52,7 +53,7 @@ public class DrillOptiqTest {
       // create a dummy RexOver object.
       RexNode window = rex.makeOver(anyType, SqlStdOperatorTable.AVG, 
emptyList, emptyList, e, null, null, true,
           false, false);
-      DrillOptiq.toDrill(null, null, window);
+      DrillOptiq.toDrill(null, (RelNode) null, window);
     } catch (UserException e) {
       if (e.getMessage().contains(DrillOptiq.UNSUPPORTED_REX_NODE_ERROR)) {
         // got expected error return

Reply via email to