Yingyi Bu has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/719

Change subject: ASTERIXDB-1343: support queries over nodegroup-based datasets.
......................................................................

ASTERIXDB-1343: support queries over nodegroup-based datasets.

Fixed operators to have the right NodeDomain.
Added several regerssion tests.

Change-Id: I776b80e5b824c83bcdf43c95fff99bb151506f84
---
M 
asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
M 
asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
M 
asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
M 
asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
M 
asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
M 
asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
M 
asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
M 
asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
M 
asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
M asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
A asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-2.aql
A asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-3.aql
A asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-4.aql
A asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343.aql
A asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
A asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-3.plan
A asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-4.plan
A asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343.plan
A 
asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.1.ddl.aql
A 
asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.2.update.aql
A 
asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.3.query.aql
A 
asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.4.ddl.aql
A 
asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.1.ddl.sqlpp
A 
asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.2.update.sqlpp
A 
asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.3.query.sqlpp
A 
asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.4.ddl.sqlpp
M asterix-app/src/test/resources/runtimets/testsuite.xml
M asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M 
asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
M 
asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
M 
asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
M 
asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
32 files changed, 1,055 insertions(+), 150 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/19/719/1

diff --git 
a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
 
b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index 21adf9f..69665a8 100644
--- 
a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ 
b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -33,6 +33,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
@@ -47,6 +48,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
@@ -68,10 +70,10 @@
     private final boolean isEqCondition;
     private Object implConfig;
 
-    public BTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, 
boolean requiresBroadcast,
-            boolean isPrimaryIndex, boolean isEqCondition, 
List<LogicalVariable> lowKeyVarList,
-            List<LogicalVariable> highKeyVarList) {
-        super(idx, requiresBroadcast);
+    public BTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, 
INodeDomain domain,
+            boolean requiresBroadcast, boolean isPrimaryIndex, boolean 
isEqCondition,
+            List<LogicalVariable> lowKeyVarList, List<LogicalVariable> 
highKeyVarList) {
+        super(idx, domain, requiresBroadcast);
         this.isPrimaryIndex = isPrimaryIndex;
         this.isEqCondition = isEqCondition;
         this.lowKeyVarList = lowKeyVarList;
@@ -94,7 +96,7 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         AbstractUnnestMapOperator unnestMap = (AbstractUnnestMapOperator) op;
         ILogicalExpression unnestExpr = 
unnestMap.getExpressionRef().getValue();
         if (unnestExpr.getExpressionTag() != 
LogicalExpressionTag.FUNCTION_CALL) {
@@ -140,7 +142,7 @@
 
     @Override
     public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) {
         if (requiresBroadcast) {
             // For primary indexes optimizing an equality condition we can 
reduce the broadcast requirement to hash partitioning.
             if (isPrimaryIndex && isEqCondition) {
@@ -155,16 +157,16 @@
                     orderColumns.add(new OrderColumn(orderVar, OrderKind.ASC));
                 }
                 propsLocal.add(new LocalOrderProperty(orderColumns));
-                pv[0] = new StructuralPropertiesVector(new 
UnorderedPartitionedProperty(searchKeyVars, null),
+                pv[0] = new StructuralPropertiesVector(new 
UnorderedPartitionedProperty(searchKeyVars, domain),
                         propsLocal);
                 return new PhysicalRequirements(pv, 
IPartitioningRequirementsCoordinator.NO_COORDINATION);
             } else {
                 StructuralPropertiesVector[] pv = new 
StructuralPropertiesVector[1];
-                pv[0] = new StructuralPropertiesVector(new 
BroadcastPartitioningProperty(null), null);
+                pv[0] = new StructuralPropertiesVector(new 
BroadcastPartitioningProperty(domain), null);
                 return new PhysicalRequirements(pv, 
IPartitioningRequirementsCoordinator.NO_COORDINATION);
             }
         } else {
-            return super.getRequiredPropertiesForChildren(op, reqdByParent);
+            return super.getRequiredPropertiesForChildren(op, reqdByParent, 
context);
         }
     }
 }
diff --git 
a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
 
b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index 2d1cf1e..1a26021 100644
--- 
a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ 
b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -70,7 +70,7 @@
 
     @Override
     public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) {
         return emptyUnaryRequirements();
     }
 
diff --git 
a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
 
b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
index 1f5b47e..7db93e8 100644
--- 
a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
+++ 
b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
@@ -113,9 +113,9 @@
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, 
IOptimizationContext context)
             throws AlgebricksException {
-        AqlDataSource ds = new DatasetDataSource(datasetId, 
datasetId.getDataverseName(), datasetId.getDatasourceName(),
-                recordType, null /*external dataset doesn't have meta 
records.*/, AqlDataSourceType.EXTERNAL_DATASET,
-                dataset.getDatasetDetails());
+        AqlDataSource ds = new DatasetDataSource(datasetId, dataset, 
recordType,
+                null /*external dataset doesn't have meta records.*/, 
AqlDataSourceType.EXTERNAL_DATASET,
+                dataset.getDatasetDetails(), 
context.getComputationNodeDomain());
         IDataSourcePropertiesProvider dspp = ds.getPropertiesProvider();
         AbstractScanOperator as = (AbstractScanOperator) op;
         deliveredProperties = dspp.computePropertiesVector(as.getVariables());
@@ -167,14 +167,14 @@
 
     @Override
     public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) {
         if (requiresBroadcast) {
             StructuralPropertiesVector[] pv = new 
StructuralPropertiesVector[1];
             pv[0] = new StructuralPropertiesVector(new 
BroadcastPartitioningProperty(null), null);
             return new PhysicalRequirements(pv, 
IPartitioningRequirementsCoordinator.NO_COORDINATION);
 
         } else {
-            return super.getRequiredPropertiesForChildren(op, reqdByParent);
+            return super.getRequiredPropertiesForChildren(op, reqdByParent, 
context);
         }
     }
 
diff --git 
a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
 
b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
index 78f626d..ec59dae 100644
--- 
a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
+++ 
b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
@@ -31,6 +31,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractScanPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -43,10 +44,13 @@
 
     protected final IDataSourceIndex<String, AqlSourceId> idx;
     protected final boolean requiresBroadcast;
+    protected final INodeDomain domain;
 
-    public IndexSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, 
boolean requiresBroadcast) {
+    public IndexSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, 
INodeDomain domain,
+            boolean requiresBroadcast) {
         this.idx = idx;
         this.requiresBroadcast = requiresBroadcast;
+        this.domain = domain;
     }
 
     @Override
@@ -73,14 +77,15 @@
         return keyIndexes;
     }
 
+    @Override
     public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) {
         if (requiresBroadcast) {
             StructuralPropertiesVector[] pv = new 
StructuralPropertiesVector[1];
-            pv[0] = new StructuralPropertiesVector(new 
BroadcastPartitioningProperty(null), null);
+            pv[0] = new StructuralPropertiesVector(new 
BroadcastPartitioningProperty(domain), null);
             return new PhysicalRequirements(pv, 
IPartitioningRequirementsCoordinator.NO_COORDINATION);
         } else {
-            return super.getRequiredPropertiesForChildren(op, reqdByParent);
+            return super.getRequiredPropertiesForChildren(op, reqdByParent, 
context);
         }
     }
 
diff --git 
a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
 
b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 8029166..fe364fa 100644
--- 
a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ 
b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -63,6 +63,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
@@ -89,9 +90,9 @@
 public class InvertedIndexPOperator extends IndexSearchPOperator {
     private final boolean isPartitioned;
 
-    public InvertedIndexPOperator(IDataSourceIndex<String, AqlSourceId> idx, 
boolean requiresBroadcast,
-            boolean isPartitioned) {
-        super(idx, requiresBroadcast);
+    public InvertedIndexPOperator(IDataSourceIndex<String, AqlSourceId> idx, 
INodeDomain domain,
+            boolean requiresBroadcast, boolean isPartitioned) {
+        super(idx, domain, requiresBroadcast);
         this.isPartitioned = isPartitioned;
     }
 
@@ -107,7 +108,7 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         AbstractUnnestMapOperator unnestMapOp = (AbstractUnnestMapOperator) op;
         ILogicalExpression unnestExpr = 
unnestMapOp.getExpressionRef().getValue();
         if (unnestExpr.getExpressionTag() != 
LogicalExpressionTag.FUNCTION_CALL) {
diff --git 
a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
 
b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
index ea06619..5d714ea 100644
--- 
a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
+++ 
b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
@@ -43,6 +43,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 
@@ -52,8 +53,9 @@
  */
 public class RTreeSearchPOperator extends IndexSearchPOperator {
 
-    public RTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, 
boolean requiresBroadcast) {
-        super(idx, requiresBroadcast);
+    public RTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, 
INodeDomain domain,
+            boolean requiresBroadcast) {
+        super(idx, domain, requiresBroadcast);
     }
 
     @Override
@@ -64,7 +66,7 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         AbstractUnnestMapOperator unnestMap = (AbstractUnnestMapOperator) op;
         ILogicalExpression unnestExpr = 
unnestMap.getExpressionRef().getValue();
         if (unnestExpr.getExpressionTag() != 
LogicalExpressionTag.FUNCTION_CALL) {
diff --git 
a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
 
b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index b0cf533..9382b2d 100644
--- 
a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ 
b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.declared.AqlSourceId;
+import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams;
 import org.apache.asterix.optimizer.rules.am.BTreeJobGenParams;
@@ -58,6 +59,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.rewriter.util.JoinUtils;
@@ -216,8 +218,11 @@
                         AqlMetadataProvider mp = (AqlMetadataProvider) 
context.getMetadataProvider();
                         AqlSourceId dataSourceId = new 
AqlSourceId(jobGenParams.getDataverseName(),
                                 jobGenParams.getDatasetName());
+                        Dataset dataset = 
mp.findDataset(jobGenParams.getDataverseName(),
+                                jobGenParams.getDatasetName());
                         IDataSourceIndex<String, AqlSourceId> dsi = 
mp.findDataSourceIndex(jobGenParams.getIndexName(),
                                 dataSourceId);
+                        INodeDomain storageDomain = 
mp.findNodeDomain(dataset.getNodeGroupName());
                         if (dsi == null) {
                             throw new AlgebricksException("Could not find 
index " + jobGenParams.getIndexName()
                                     + " for dataset " + dataSourceId);
@@ -228,23 +233,25 @@
                             case BTREE: {
                                 BTreeJobGenParams btreeJobGenParams = new 
BTreeJobGenParams();
                                 
btreeJobGenParams.readFromFuncArgs(f.getArguments());
-                                op.setPhysicalOperator(new 
BTreeSearchPOperator(dsi, requiresBroadcast,
+                                op.setPhysicalOperator(new 
BTreeSearchPOperator(dsi, storageDomain, requiresBroadcast,
                                         btreeJobGenParams.isPrimaryIndex(), 
btreeJobGenParams.isEqCondition(),
                                         btreeJobGenParams.getLowKeyVarList(), 
btreeJobGenParams.getHighKeyVarList()));
                                 break;
                             }
                             case RTREE: {
-                                op.setPhysicalOperator(new 
RTreeSearchPOperator(dsi, requiresBroadcast));
+                                op.setPhysicalOperator(new 
RTreeSearchPOperator(dsi, storageDomain, requiresBroadcast));
                                 break;
                             }
                             case SINGLE_PARTITION_WORD_INVIX:
                             case SINGLE_PARTITION_NGRAM_INVIX: {
-                                op.setPhysicalOperator(new 
InvertedIndexPOperator(dsi, requiresBroadcast, false));
+                                op.setPhysicalOperator(
+                                        new InvertedIndexPOperator(dsi, 
storageDomain, requiresBroadcast, false));
                                 break;
                             }
                             case LENGTH_PARTITIONED_WORD_INVIX:
                             case LENGTH_PARTITIONED_NGRAM_INVIX: {
-                                op.setPhysicalOperator(new 
InvertedIndexPOperator(dsi, requiresBroadcast, true));
+                                op.setPhysicalOperator(
+                                        new InvertedIndexPOperator(dsi, 
storageDomain, requiresBroadcast, true));
                                 break;
                             }
                             default: {
diff --git 
a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
 
b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 1f965d6..bcd5c7b 100644
--- 
a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ 
b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -175,7 +175,7 @@
                 String csLocations = 
metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
                 AqlDataSource dataSource = createFeedDataSource(asid, 
targetDataset, sourceFeedName,
                         subscriptionLocation, metadataProvider, policy, 
outputType,
-                        null /* TODO(Abdullah): to figure out the meta type 
name*/, csLocations);
+                        null /* TODO(Abdullah): to figure out the meta type 
name*/, csLocations, context);
                 DataSourceScanOperator scan = new DataSourceScanOperator(v, 
dataSource);
 
                 List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
@@ -203,7 +203,8 @@
 
     private AqlDataSource createFeedDataSource(AqlSourceId aqlId, String 
targetDataset, String sourceFeedName,
             String subscriptionLocation, AqlMetadataProvider metadataProvider, 
FeedPolicyEntity feedPolicy,
-            String outputType, String outputMetaType, String locations) throws 
AlgebricksException {
+            String outputType, String outputMetaType, String locations, 
IOptimizationContext context)
+                    throws AlgebricksException {
         if 
(!aqlId.getDataverseName().equals(metadataProvider.getDefaultDataverse() == 
null ? null
                 : metadataProvider.getDefaultDataverse().getDataverseName())) {
             return null;
@@ -212,9 +213,10 @@
         IAType feedOutputMetaType = 
metadataProvider.findType(aqlId.getDataverseName(), outputMetaType);
         Feed sourceFeed = metadataProvider.findFeed(aqlId.getDataverseName(), 
sourceFeedName);
 
-        FeedDataSource feedDataSource = new FeedDataSource(aqlId, 
targetDataset, feedOutputType, feedOutputMetaType,
-                AqlDataSource.AqlDataSourceType.FEED, sourceFeed.getFeedId(), 
sourceFeed.getFeedType(),
-                ConnectionLocation.valueOf(subscriptionLocation), 
locations.split(","));
+        FeedDataSource feedDataSource = new FeedDataSource(aqlId, sourceFeed, 
targetDataset, feedOutputType,
+                feedOutputMetaType, AqlDataSource.AqlDataSourceType.FEED, 
sourceFeed.getFeedId(),
+                sourceFeed.getFeedType(), 
ConnectionLocation.valueOf(subscriptionLocation), locations.split(","),
+                context.getComputationNodeDomain());
         
feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, 
feedPolicy);
         return feedDataSource;
     }
diff --git 
a/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
 
b/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 4739b71..5c84894 100644
--- 
a/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ 
b/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -137,6 +137,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.api.io.FileReference;
@@ -457,9 +458,9 @@
         IAType itemType = 
metadataProvider.findType(dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName());
         IAType metaItemType = 
metadataProvider.findType(dataset.getMetaItemTypeDataverseName(),
                 dataset.getMetaItemTypeName());
-        DatasetDataSource dataSource = new DatasetDataSource(sourceId, 
dataset.getDataverseName(),
-                dataset.getDatasetName(), itemType, metaItemType, 
AqlDataSourceType.INTERNAL_DATASET,
-                dataset.getDatasetDetails());
+        INodeDomain domain = 
metadataProvider.findNodeDomain(dataset.getNodeGroupName());
+        DatasetDataSource dataSource = new DatasetDataSource(sourceId, 
dataset, itemType, metaItemType,
+                AqlDataSourceType.INTERNAL_DATASET, 
dataset.getDatasetDetails(), domain);
         return dataSource;
     }
 
diff --git 
a/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java 
b/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index c0613ed..42b1cc3 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -161,10 +161,10 @@
                 IExpressionEvalSizeComputer expressionEvalSizeComputer,
                 IMergeAggregationExpressionFactory 
mergeAggregationExpressionFactory,
                 IExpressionTypeComputer expressionTypeComputer, 
INullableTypeComputer nullableTypeComputer,
-                PhysicalOptimizationConfig physicalOptimizationConfig) {
+                PhysicalOptimizationConfig physicalOptimizationConfig, 
AlgebricksPartitionConstraint clusterLocations) {
             return new AlgebricksOptimizationContext(varCounter, 
expressionEvalSizeComputer,
                     mergeAggregationExpressionFactory, expressionTypeComputer, 
nullableTypeComputer,
-                    physicalOptimizationConfig);
+                    physicalOptimizationConfig, clusterLocations);
         }
 
     }
@@ -283,13 +283,15 @@
         builder.setPartialAggregationTypeComputer(new 
AqlPartialAggregationTypeComputer());
         builder.setExpressionTypeComputer(AqlExpressionTypeComputer.INSTANCE);
         builder.setNullableTypeComputer(AqlNullableTypeComputer.INSTANCE);
+        
builder.setClusterLocations(queryMetadataProvider.getClusterLocations());
 
         ICompiler compiler = compilerFactory.createCompiler(plan, 
queryMetadataProvider, t.getVarCounter());
         if (conf.isOptimize()) {
             compiler.optimize();
             //plot optimized logical plan
-            if (plot)
+            if (plot) {
                 PlanPlotter.printOptimizedLogicalPlan(plan);
+            }
             if (conf.is(SessionConfig.OOB_OPTIMIZED_LOGICAL_PLAN)) {
                 if (conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS)) {
                     // For Optimizer tests.
@@ -321,10 +323,8 @@
             return null;
         }
 
-        AlgebricksPartitionConstraint clusterLocs = 
queryMetadataProvider.getClusterLocations();
         
builder.setBinaryBooleanInspectorFactory(format.getBinaryBooleanInspectorFactory());
         
builder.setBinaryIntegerInspectorFactory(format.getBinaryIntegerInspectorFactory());
-        builder.setClusterLocations(clusterLocs);
         
builder.setComparatorFactoryProvider(format.getBinaryComparatorFactoryProvider());
         builder.setExpressionRuntimeProvider(
                 new 
LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(QueryLogicalExpressionJobGen.INSTANCE));
diff --git 
a/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-2.aql 
b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-2.aql
new file mode 100644
index 0000000..928beff
--- /dev/null
+++ 
b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-2.aql
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use dataverse tpch;
+
+create type LineItemType as closed {
+  l_orderkey: int64,
+  l_partkey: int64,
+  l_suppkey: int64,
+  l_linenumber: int64,
+  l_quantity: int64,
+  l_extendedprice: double,
+  l_discount: double,
+  l_tax: double,
+  l_returnflag: string,
+  l_linestatus: string,
+  l_shipdate: string,
+  l_commitdate: string,
+  l_receiptdate: string,
+  l_shipinstruct: string,
+  l_shipmode: string,
+  l_comment: string
+}
+
+create type OrderType as closed {
+  o_orderkey: int64,
+  o_custkey: int64,
+  o_orderstatus: string,
+  o_totalprice: double,
+  o_orderdate: string,
+  o_orderpriority: string,
+  o_clerk: string,
+  o_shippriority: int64,
+  o_comment: string
+}
+
+drop nodegroup group_test if exists;
+create  nodegroup group_test on
+    asterix_nc2,
+    asterix_nc1;
+
+create dataset LineItem(LineItemType)
+  primary key l_orderkey, l_linenumber on group_test;
+create dataset Orders(OrderType)
+  primary key o_orderkey on group_test;
+
+declare function tmp()
+{
+  for $l in dataset('LineItem')
+  where $l.l_commitdate < $l.l_receiptdate
+  distinct by $l.l_orderkey
+  return { "o_orderkey": $l.l_orderkey }
+}
+
+for $o in dataset('Orders')
+for $t in tmp()
+where $o.o_orderkey = $t.o_orderkey and 
+  $o.o_orderdate >= '1993-07-01' and $o.o_orderdate < '1993-10-01' 
+group by $o_orderpriority := $o.o_orderpriority with $o
+order by $o_orderpriority
+return {
+  "order_priority": $o_orderpriority,
+  "count": count($o)
+}
+
+drop dataverse tpch if exists;
+drop nodegroup group_test if exists;
+
diff --git 
a/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-3.aql 
b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-3.aql
new file mode 100644
index 0000000..7572cbb
--- /dev/null
+++ 
b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-3.aql
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use dataverse tpch;
+
+create type LineItemType as closed {
+  l_orderkey: int64,
+  l_partkey: int64,
+  l_suppkey: int64,
+  l_linenumber: int64,
+  l_quantity: int64,
+  l_extendedprice: double,
+  l_discount: double,
+  l_tax: double,
+  l_returnflag: string,
+  l_linestatus: string,
+  l_shipdate: string,
+  l_commitdate: string,
+  l_receiptdate: string,
+  l_shipinstruct: string,
+  l_shipmode: string,
+  l_comment: string
+}
+
+create type OrderType as closed {
+  o_orderkey: int64,
+  o_custkey: int64,
+  o_orderstatus: string,
+  o_totalprice: double,
+  o_orderdate: string,
+  o_orderpriority: string,
+  o_clerk: string,
+  o_shippriority: int64,
+  o_comment: string
+}
+
+drop nodegroup group_test if exists;
+create  nodegroup group_test on
+    asterix_nc1,
+    asterix_nc2;
+
+create dataset LineItem(LineItemType)
+  primary key l_orderkey, l_linenumber on group_test;
+create dataset Orders(OrderType)
+  primary key o_orderkey on group_test;
+
+declare function tmp()
+{
+  for $l in dataset('LineItem')
+  where $l.l_commitdate < $l.l_receiptdate
+  distinct by $l.l_orderkey
+  return { "o_orderkey": $l.l_orderkey }
+}
+
+for $o in dataset('Orders')
+for $t in tmp()
+where $o.o_orderkey = $t.o_orderkey and 
+  $o.o_orderdate >= '1993-07-01' and $o.o_orderdate < '1993-10-01' 
+group by $o_orderpriority := $o.o_orderpriority with $o
+order by $o_orderpriority
+return {
+  "order_priority": $o_orderpriority,
+  "count": count($o)
+}
+
+
+drop dataverse tpch if exists;
+drop nodegroup group_test if exists;
diff --git 
a/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-4.aql 
b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-4.aql
new file mode 100644
index 0000000..425394c
--- /dev/null
+++ 
b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343-4.aql
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+drop nodegroup group_test if exists;
+create dataverse tpch;
+
+use dataverse tpch;
+
+create type LineItemType as closed {
+  l_orderkey: int64,
+  l_partkey: int64,
+  l_suppkey: int64,
+  l_linenumber: int64,
+  l_quantity: int64,
+  l_extendedprice: double,
+  l_discount: double,
+  l_tax: double,
+  l_returnflag: string,
+  l_linestatus: string,
+  l_shipdate: string,
+  l_commitdate: string,
+  l_receiptdate: string,
+  l_shipinstruct: string,
+  l_shipmode: string,
+  l_comment: string
+}
+
+create type OrderType as closed {
+  o_orderkey: int64,
+  o_custkey: int64,
+  o_orderstatus: string,
+  o_totalprice: double,
+  o_orderdate: string,
+  o_orderpriority: string,
+  o_clerk: string,
+  o_shippriority: int64,
+  o_comment: string
+}
+
+create  nodegroup group_test  on
+    asterix_nc1;
+
+create dataset LineItem(LineItemType)
+  primary key l_orderkey, l_linenumber on group_test;
+create dataset Orders(OrderType)
+  primary key o_orderkey on group_test;
+
+create index lineitem_shipdateIx on LineItem (l_shipdate);
+create index lineitem_receiptdateIx on LineItem (l_receiptdate);
+create index lineitem_fk_orders on LineItem (l_orderkey);
+create index lineitem_fk_part on LineItem (l_partkey);
+create index lineitem_fk_supplier on LineItem (l_suppkey);
+create index orders_fk_customer on Orders (o_custkey);
+create index orders_orderdateIx on Orders (o_orderdate);
+
+declare function tmp()
+{
+  for $l in dataset('LineItem')
+  where $l.l_commitdate < $l.l_receiptdate
+  distinct by $l.l_orderkey
+  return { "o_orderkey": $l.l_orderkey }
+}
+
+for $o in dataset('Orders')
+for $t in tmp()
+where $o.o_orderkey = $t.o_orderkey and 
+  $o.o_orderdate >= '1993-07-01' and $o.o_orderdate < '1993-10-01' 
+group by $o_orderpriority := $o.o_orderpriority with $o
+order by $o_orderpriority
+return {
+  "order_priority": $o_orderpriority,
+  "count": count($o)
+}
+
+
+drop dataverse tpch if exists;
+drop nodegroup group_test if exists;
\ No newline at end of file
diff --git 
a/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343.aql 
b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343.aql
new file mode 100644
index 0000000..f45adb1
--- /dev/null
+++ 
b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1343.aql
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+drop nodegroup group_test if exists;
+create dataverse tpch;
+
+use dataverse tpch;
+
+create type LineItemType as closed {
+  l_orderkey: int64,
+  l_partkey: int64,
+  l_suppkey: int64,
+  l_linenumber: int64,
+  l_quantity: int64,
+  l_extendedprice: double,
+  l_discount: double,
+  l_tax: double,
+  l_returnflag: string,
+  l_linestatus: string,
+  l_shipdate: string,
+  l_commitdate: string,
+  l_receiptdate: string,
+  l_shipinstruct: string,
+  l_shipmode: string,
+  l_comment: string
+}
+
+create type OrderType as closed {
+  o_orderkey: int64,
+  o_custkey: int64,
+  o_orderstatus: string,
+  o_totalprice: double,
+  o_orderdate: string,
+  o_orderpriority: string,
+  o_clerk: string,
+  o_shippriority: int64,
+  o_comment: string
+}
+
+create  nodegroup group_test  on
+    asterix_nc1;
+
+create dataset LineItem(LineItemType)
+  primary key l_orderkey, l_linenumber on group_test;
+create dataset Orders(OrderType)
+  primary key o_orderkey on group_test;
+
+declare function tmp()
+{
+  for $l in dataset('LineItem')
+  where $l.l_commitdate < $l.l_receiptdate
+  distinct by $l.l_orderkey
+  return { "o_orderkey": $l.l_orderkey }
+}
+
+for $o in dataset('Orders')
+for $t in tmp()
+where $o.o_orderkey = $t.o_orderkey and 
+  $o.o_orderdate >= '1993-07-01' and $o.o_orderdate < '1993-10-01' 
+group by $o_orderpriority := $o.o_orderpriority with $o
+order by $o_orderpriority
+return {
+  "order_priority": $o_orderpriority,
+  "count": count($o)
+}
+
+
+drop dataverse tpch if exists;
+drop nodegroup group_test if exists;
\ No newline at end of file
diff --git 
a/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
 
b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
new file mode 100644
index 0000000..1dac8b2
--- /dev/null
+++ 
b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
@@ -0,0 +1,39 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$3(ASC) ]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$39]  |PARTITIONED|
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$39(ASC)] HASH:[$$39]  
|PARTITIONED|
+              -- SORT_GROUP_BY[$$27]  |PARTITIONED|
+                      {
+                        -- AGGREGATE  |LOCAL|
+                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                      }
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- HYBRID_HASH_JOIN [$$30][$$31]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$30]  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+                            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$31(ASC)] 
HASH:[$$31]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- STREAM_SELECT  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-3.plan
 
b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-3.plan
new file mode 100644
index 0000000..b084028
--- /dev/null
+++ 
b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-3.plan
@@ -0,0 +1,39 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$3(ASC) ]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$39]  |PARTITIONED|
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$39(ASC)] HASH:[$$39]  
|PARTITIONED|
+              -- SORT_GROUP_BY[$$27]  |PARTITIONED|
+                      {
+                        -- AGGREGATE  |LOCAL|
+                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                      }
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- HYBRID_HASH_JOIN [$$30][$$31]  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+                            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$31(ASC)] 
HASH:[$$31]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- STREAM_SELECT  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-4.plan
 
b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-4.plan
new file mode 100644
index 0000000..4a3162f
--- /dev/null
+++ 
b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-4.plan
@@ -0,0 +1,46 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$3(ASC) ]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$39]  |PARTITIONED|
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$39(ASC)] HASH:[$$39]  
|PARTITIONED|
+              -- SORT_GROUP_BY[$$27]  |PARTITIONED|
+                      {
+                        -- AGGREGATE  |LOCAL|
+                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                      }
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- HYBRID_HASH_JOIN [$$30][$$31]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$30]  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- BTREE_SEARCH  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STABLE_SORT [$$43(ASC)]  
|PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                                -- BTREE_SEARCH  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- EMPTY_TUPLE_SOURCE  
|PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+                            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$31(ASC)] 
HASH:[$$31]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- STREAM_SELECT  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343.plan 
b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343.plan
new file mode 100644
index 0000000..1dac8b2
--- /dev/null
+++ 
b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343.plan
@@ -0,0 +1,39 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$3(ASC) ]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$39]  |PARTITIONED|
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$39(ASC)] HASH:[$$39]  
|PARTITIONED|
+              -- SORT_GROUP_BY[$$27]  |PARTITIONED|
+                      {
+                        -- AGGREGATE  |LOCAL|
+                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                      }
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- HYBRID_HASH_JOIN [$$30][$$31]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$30]  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+                            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$31(ASC)] 
HASH:[$$31]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- STREAM_SELECT  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.1.ddl.aql
 
b/asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.1.ddl.aql
new file mode 100644
index 0000000..bffda72
--- /dev/null
+++ 
b/asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.1.ddl.aql
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use dataverse tpch;
+
+create type LineItemType as closed {
+  l_orderkey: int64,
+  l_partkey: int64,
+  l_suppkey: int64,
+  l_linenumber: int64,
+  l_quantity: int64,
+  l_extendedprice: double,
+  l_discount: double,
+  l_tax: double,
+  l_returnflag: string,
+  l_linestatus: string,
+  l_shipdate: string,
+  l_commitdate: string,
+  l_receiptdate: string,
+  l_shipinstruct: string,
+  l_shipmode: string,
+  l_comment: string
+}
+
+create type OrderType as closed {
+  o_orderkey: int64,
+  o_custkey: int64,
+  o_orderstatus: string,
+  o_totalprice: double,
+  o_orderdate: string,
+  o_orderpriority: string,
+  o_clerk: string,
+  o_shippriority: int64,
+  o_comment: string
+}
+
+create type CustomerType as closed {
+  c_custkey: int64,
+  c_name: string,
+  c_address: string,
+  c_nationkey: int64,
+  c_phone: string,
+  c_acctbal: double,
+  c_mktsegment: string,
+  c_comment: string
+}
+
+create type SupplierType as closed {
+  s_suppkey: int64,
+  s_name: string,
+  s_address: string,
+  s_nationkey: int64,
+  s_phone: string,
+  s_acctbal: double,
+  s_comment: string
+}
+
+create type NationType as closed {
+  n_nationkey: int64,
+  n_name: string,
+  n_regionkey: int64,
+  n_comment: string
+}
+
+create type RegionType as closed {
+    r_regionkey: int64,
+    r_name: string,
+    r_comment: string
+}
+
+create type PartType as closed {
+  p_partkey: int64,
+  p_name: string,
+  p_mfgr: string,
+  p_brand: string,
+  p_type: string,
+  p_size: int64,
+  p_container: string,
+  p_retailprice: double,
+  p_comment: string
+}
+
+create type PartSuppType as closed {
+  ps_partkey: int64,
+  ps_suppkey: int64,
+  ps_availqty: int64,
+  ps_supplycost: double,
+  ps_comment: string
+}
+
+drop nodegroup group_test if exists;
+create  nodegroup group_test  on
+    asterix_nc1;
+
+create dataset LineItem(LineItemType)
+  primary key l_orderkey, l_linenumber on group_test;
+create dataset Orders(OrderType)
+  primary key o_orderkey on group_test;
diff --git 
a/asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.2.update.aql
 
b/asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.2.update.aql
new file mode 100644
index 0000000..51c4ce7
--- /dev/null
+++ 
b/asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.2.update.aql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use dataverse tpch;
+
+load dataset LineItem 
+using localfs
+(("path"="asterix_nc1://data/tpch0.001/lineitem.tbl"),("format"="delimited-text"),("delimiter"="|"));
+
+load dataset Orders 
+using localfs
+(("path"="asterix_nc1://data/tpch0.001/orders.tbl"),("format"="delimited-text"),("delimiter"="|"));
diff --git 
a/asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.3.query.aql
 
b/asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.3.query.aql
new file mode 100644
index 0000000..2459659
--- /dev/null
+++ 
b/asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.3.query.aql
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use dataverse tpch;
+
+declare function tmp()
+{
+  for $l in dataset('LineItem')
+  where $l.l_commitdate < $l.l_receiptdate
+  distinct by $l.l_orderkey
+  return { "o_orderkey": $l.l_orderkey }
+}
+
+for $o in dataset('Orders')
+for $t in tmp()
+where $o.o_orderkey = $t.o_orderkey and 
+  $o.o_orderdate >= '1993-07-01' and $o.o_orderdate < '1993-10-01' 
+group by $o_orderpriority := $o.o_orderpriority with $o
+order by $o_orderpriority
+return {
+  "order_priority": $o_orderpriority,
+  "count": count($o)
+}
+
diff --git 
a/asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.4.ddl.aql
 
b/asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.4.ddl.aql
new file mode 100644
index 0000000..cd8adc2
--- /dev/null
+++ 
b/asterix-app/src/test/resources/runtimets/queries/tpch/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.4.ddl.aql
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse tpch if exists;
+
+drop nodegroup group_test if exists;
\ No newline at end of file
diff --git 
a/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.1.ddl.sqlpp
 
b/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.1.ddl.sqlpp
new file mode 100644
index 0000000..3ae4b82
--- /dev/null
+++ 
b/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.1.ddl.sqlpp
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  database tpch if exists;
+create  database tpch;
+
+use tpch;
+
+
+create type tpch.LineItemType as
+ closed {
+  l_orderkey : int64,
+  l_partkey : int64,
+  l_suppkey : int64,
+  l_linenumber : int64,
+  l_quantity : int64,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+}
+
+create type tpch.OrderType as
+ closed {
+  o_orderkey : int64,
+  o_custkey : int64,
+  o_orderstatus : string,
+  o_totalprice : double,
+  o_orderdate : string,
+  o_orderpriority : string,
+  o_clerk : string,
+  o_shippriority : int64,
+  o_comment : string
+}
+
+create type tpch.CustomerType as
+ closed {
+  c_custkey : int64,
+  c_name : string,
+  c_address : string,
+  c_nationkey : int64,
+  c_phone : string,
+  c_acctbal : double,
+  c_mktsegment : string,
+  c_comment : string
+}
+
+create type tpch.SupplierType as
+ closed {
+  s_suppkey : int64,
+  s_name : string,
+  s_address : string,
+  s_nationkey : int64,
+  s_phone : string,
+  s_acctbal : double,
+  s_comment : string
+}
+
+create type tpch.NationType as
+ closed {
+  n_nationkey : int64,
+  n_name : string,
+  n_regionkey : int64,
+  n_comment : string
+}
+
+create type tpch.RegionType as
+ closed {
+  r_regionkey : int64,
+  r_name : string,
+  r_comment : string
+}
+
+create type tpch.PartType as
+ closed {
+  p_partkey : int64,
+  p_name : string,
+  p_mfgr : string,
+  p_brand : string,
+  p_type : string,
+  p_size : int64,
+  p_container : string,
+  p_retailprice : double,
+  p_comment : string
+}
+
+create type tpch.PartSuppType as
+ closed {
+  ps_partkey : int64,
+  ps_suppkey : int64,
+  ps_availqty : int64,
+  ps_supplycost : double,
+  ps_comment : string
+}
+
+drop nodegroup group_test if exists;
+create  nodegroup group_test  on
+    asterix_nc1;
+
+create table LineItem(LineItemType)
+  primary key l_orderkey, l_linenumber on group_test;
+create table Orders(OrderType)
+  primary key o_orderkey on group_test;
+
+create index lineitem_shipdateIx on LineItem (l_shipdate);
+create index lineitem_receiptdateIx on LineItem (l_receiptdate);
+create index lineitem_fk_orders on LineItem (l_orderkey);
+create index lineitem_fk_part on LineItem (l_partkey);
+create index lineitem_fk_supplier on LineItem (l_suppkey);
+create index orders_fk_customer on Orders (o_custkey);
+create index orders_orderdateIx on Orders (o_orderdate);
diff --git 
a/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.2.update.sqlpp
 
b/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.2.update.sqlpp
new file mode 100644
index 0000000..24eae5e
--- /dev/null
+++ 
b/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.2.update.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load  table LineItem using 
"org.apache.asterix.external.dataset.adapter.NCFileSystemAdapter" 
(("path"="asterix_nc1://data/tpch0.001/lineitem.tbl"),("format"="delimited-text"),("delimiter"="|"));
+
+load  table Orders using 
"org.apache.asterix.external.dataset.adapter.NCFileSystemAdapter" 
(("path"="asterix_nc1://data/tpch0.001/orders.tbl"),("format"="delimited-text"),("delimiter"="|"));
diff --git 
a/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.3.query.sqlpp
 
b/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.3.query.sqlpp
new file mode 100644
index 0000000..f693ba3
--- /dev/null
+++ 
b/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.3.query.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+declare function tmp() {
+(
+    select distinct element {'o_orderkey':l.l_orderkey}
+    from  LineItem as l
+    where (l.l_commitdate < l.l_receiptdate)
+)
+};
+select element {'order_priority':o_orderpriority,'count':count(o)}
+from  Orders as o join
+      tpch.tmp() as t on o.o_orderkey = t.o_orderkey
+where o.o_orderdate >= '1993-07-01' and o.o_orderdate < '1993-10-01'
+group by o.o_orderpriority as o_orderpriority
+order by o_orderpriority
+;
diff --git 
a/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.4.ddl.sqlpp
 
b/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.4.ddl.sqlpp
new file mode 100644
index 0000000..7f661fe
--- /dev/null
+++ 
b/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-with-index/q04_order_priority_with_nodegroup/q04_order_priority_with_nodegroup.4.ddl.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop database tpch if exists;
+
+drop nodegroup group_test if exists;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml 
b/asterix-app/src/test/resources/runtimets/testsuite.xml
index d50a0a6..e55b20f 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -5552,6 +5552,11 @@
             </compilation-unit>
         </test-case>
         <test-case FilePath="tpch">
+            <compilation-unit name="q04_order_priority_with_nodegroup">
+                <output-dir compare="Text">q04_order_priority</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="tpch">
             <compilation-unit name="q05_local_supplier_volume">
                 <output-dir 
compare="Text">q05_local_supplier_volume</output-dir>
             </compilation-unit>
diff --git a/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 
b/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 2d994df..137d7a0 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -5401,6 +5401,11 @@
                 <output-dir compare="Text">q04_order_priority</output-dir>
             </compilation-unit>
         </test-case>
+         <test-case FilePath="tpch-with-index">
+            <compilation-unit name="q04_order_priority_with_nodegroup">
+                <output-dir compare="Text">q04_order_priority</output-dir>
+            </compilation-unit>
+        </test-case>
         <test-case FilePath="tpch-with-index">
             <compilation-unit name="q05_local_supplier_volume">
                 <output-dir 
compare="Text">q05_local_supplier_volume</output-dir>
diff --git 
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
 
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
index e2605ec..60279c4 100644
--- 
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
+++ 
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
@@ -74,6 +74,10 @@
         return schemaTypes;
     }
 
+    public INodeDomain getNodeDomain() {
+        return domain;
+    }
+
     public void 
computeLocalStructuralProperties(List<ILocalStructuralProperty> localProps,
             List<LogicalVariable> variables) {
         // do nothing
@@ -138,14 +142,7 @@
                         pp = new RandomPartitioningProperty(domain);
                     } else {
                         Set<LogicalVariable> pvars = new 
ListSet<LogicalVariable>();
-                        int i = 0;
-                        for (LogicalVariable v : scanVariables) {
-                            pvars.add(v);
-                            ++i;
-                            if (i >= n - 1) {
-                                break;
-                            }
-                        }
+                        pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
                         pp = new UnorderedPartitionedProperty(pvars, domain);
                     }
                     propsLocal = new ArrayList<ILocalStructuralProperty>();
@@ -154,24 +151,17 @@
 
                 case INTERNAL_DATASET:
                     n = scanVariables.size();
+                    Set<LogicalVariable> pvars = new 
ListSet<LogicalVariable>();
                     if (n < 2) {
                         pp = new RandomPartitioningProperty(domain);
                     } else {
-                        Set<LogicalVariable> pvars = new 
ListSet<LogicalVariable>();
-                        int i = 0;
-                        for (LogicalVariable v : scanVariables) {
-                            pvars.add(v);
-                            ++i;
-                            if (i >= n - 1) {
-                                break;
-                            }
-                        }
+                        pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
                         pp = new UnorderedPartitionedProperty(pvars, domain);
                     }
                     propsLocal = new ArrayList<ILocalStructuralProperty>();
                     List<OrderColumn> orderColumns = new 
ArrayList<OrderColumn>();
-                    for (int i = 0; i < n - 1; i++) {
-                        orderColumns.add(new OrderColumn(scanVariables.get(i), 
OrderKind.ASC));
+                    for (LogicalVariable pkVar : pvars) {
+                        orderColumns.add(new OrderColumn(pkVar, 
OrderKind.ASC));
                     }
                     propsLocal.add(new LocalOrderProperty(orderColumns));
                     propsVector = new StructuralPropertiesVector(pp, 
propsLocal);
diff --git 
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
 
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index e0084f8..1b504a4 100644
--- 
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ 
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -86,6 +86,7 @@
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
 import org.apache.asterix.metadata.utils.DatasetUtils;
@@ -132,6 +133,8 @@
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.algebricks.core.jobgen.impl.OperatorSchemaImpl;
@@ -981,10 +984,11 @@
         }
         IAType itemType = findType(dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName());
         IAType metaItemType = findType(dataset.getMetaItemTypeDataverseName(), 
dataset.getMetaItemTypeName());
+        INodeDomain domain = findNodeDomain(dataset.getNodeGroupName());
         AqlDataSourceType datasourceType = 
dataset.getDatasetType().equals(DatasetType.EXTERNAL)
                 ? AqlDataSourceType.EXTERNAL_DATASET : 
AqlDataSourceType.INTERNAL_DATASET;
-        return new DatasetDataSource(aqlId, aqlId.getDataverseName(), 
aqlId.getDatasourceName(), itemType, metaItemType,
-                datasourceType, dataset.getDatasetDetails());
+        return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, 
datasourceType,
+                dataset.getDatasetDetails(), domain);
     }
 
     @Override
@@ -2112,6 +2116,18 @@
         }
     }
 
+    public INodeDomain findNodeDomain(String nodeGroupName) throws 
AlgebricksException {
+        NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, 
nodeGroupName);
+        List<String> partitions = new ArrayList<>();
+        for (String location : nodeGroup.getNodeNames()) {
+            int numPartitions = 
AsterixClusterProperties.INSTANCE.getNodePartitionsCount(location);
+            for (int i = 0; i < numPartitions; i++) {
+                partitions.add(location);
+            }
+        }
+        return new DefaultNodeGroupDomain(partitions);
+    }
+
     public IAType findType(String dataverse, String typeName) throws 
AlgebricksException {
         if (dataverse == null || typeName == null) {
             return null;
diff --git 
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
 
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index 5483d49..cf6912a 100644
--- 
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ 
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -18,67 +18,42 @@
  */
 package org.apache.asterix.metadata.declared;
 
-import java.io.IOException;
 import java.util.List;
 
 import org.apache.asterix.metadata.IDatasetDetails;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.utils.KeyFieldTypeUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 
 public class DatasetDataSource extends AqlDataSource {
 
     private Dataset dataset;
 
-    public DatasetDataSource(AqlSourceId id, String datasourceDataverse, 
String datasourceName, IAType itemType,
-            IAType metaItemType, AqlDataSourceType datasourceType, 
IDatasetDetails datasetDetails)
+    public DatasetDataSource(AqlSourceId id, Dataset dataset, IAType itemType, 
IAType metaItemType,
+            AqlDataSourceType datasourceType, IDatasetDetails datasetDetails, 
INodeDomain datasetDomain)
                     throws AlgebricksException {
         super(id, itemType, metaItemType, datasourceType);
-        MetadataTransactionContext ctx = null;
-        try {
-            ctx = MetadataManager.INSTANCE.beginTransaction();
-            dataset = MetadataManager.INSTANCE.getDataset(ctx, 
id.getDataverseName(), id.getDatasourceName());
-            if (dataset == null) {
-                throw new AlgebricksException(
-                        "Unknown dataset " + datasourceName + " in dataverse " 
+ datasourceDataverse);
-            }
-            MetadataManager.INSTANCE.commitTransaction(ctx);
-            switch (dataset.getDatasetType()) {
-                case INTERNAL:
-                    initInternalDataset(itemType, metaItemType, 
datasetDetails);
-                    break;
-                case EXTERNAL:
-                    initExternalDataset(itemType);
-                    break;
-
-            }
-        } catch (Exception e) {
-            if (ctx != null) {
-                try {
-                    MetadataManager.INSTANCE.abortTransaction(ctx);
-                } catch (Exception e2) {
-                    e2.addSuppressed(e);
-                    throw new IllegalStateException("Unable to abort " + 
e2.getMessage());
-                }
-            }
-
+        this.dataset = dataset;
+        switch (dataset.getDatasetType()) {
+            case INTERNAL:
+                initInternalDataset(itemType, metaItemType, datasetDetails, 
datasetDomain);
+                break;
+            case EXTERNAL:
+                initExternalDataset(itemType, datasetDomain);
+                break;
         }
-
     }
 
     public Dataset getDataset() {
         return dataset;
     }
 
-    private void initInternalDataset(IAType itemType, IAType metaItemType, 
IDatasetDetails datasetDetails)
-            throws IOException, AlgebricksException {
+    private void initInternalDataset(IAType itemType, IAType metaItemType, 
IDatasetDetails datasetDetails,
+            INodeDomain domainForInternalData) throws AlgebricksException {
         InternalDatasetDetails internalDatasetDetails = 
(InternalDatasetDetails) datasetDetails;
         ARecordType recordType = (ARecordType) itemType;
         ARecordType metaRecordType = (ARecordType) metaItemType;
@@ -93,23 +68,12 @@
         if (metaItemType != null) {
             schemaTypes[n + 1] = metaItemType;
         }
-        domain = new DefaultNodeGroupDomain(dataset.getNodeGroupName());
+        domain = domainForInternalData;
     }
 
-    private void initExternalDataset(IAType itemType) {
+    private void initExternalDataset(IAType itemType, INodeDomain 
domainForExternalData) {
         schemaTypes = new IAType[1];
         schemaTypes[0] = itemType;
-        INodeDomain domainForExternalData = new INodeDomain() {
-            @Override
-            public Integer cardinality() {
-                return null;
-            }
-
-            @Override
-            public boolean sameAs(INodeDomain domain) {
-                return domain == this;
-            }
-        };
         domain = domainForExternalData;
     }
 
diff --git 
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
 
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index 46e3007..e95ac09 100644
--- 
a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ 
b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -21,8 +21,6 @@
 import org.apache.asterix.external.feed.api.IFeed;
 import 
org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation;
 import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.util.AsterixClusterProperties;
@@ -39,9 +37,9 @@
     private final String[] locations;
     private final int computeCardinality;
 
-    public FeedDataSource(AqlSourceId id, String targetDataset, IAType 
itemType, IAType metaItemType,
+    public FeedDataSource(AqlSourceId id, Feed feed, String targetDataset, 
IAType itemType, IAType metaItemType,
             AqlDataSourceType dataSourceType, FeedId sourceFeedId, 
IFeed.FeedType sourceFeedType,
-            ConnectionLocation location, String[] locations) throws 
AlgebricksException {
+            ConnectionLocation location, String[] locations, INodeDomain 
domain) throws AlgebricksException {
         super(id, itemType, metaItemType, dataSourceType);
         this.targetDataset = targetDataset;
         this.sourceFeedId = sourceFeedId;
@@ -49,26 +47,9 @@
         this.location = location;
         this.locations = locations;
         this.computeCardinality = 
AsterixClusterProperties.INSTANCE.getParticipantNodes().size();
-        MetadataTransactionContext ctx = null;
-        try {
-            MetadataManager.INSTANCE.acquireReadLatch();
-            ctx = MetadataManager.INSTANCE.beginTransaction();
-            this.feed = MetadataManager.INSTANCE.getFeed(ctx, 
id.getDataverseName(), id.getDatasourceName());
-            MetadataManager.INSTANCE.commitTransaction(ctx);
-            initFeedDataSource(itemType);
-        } catch (Exception e) {
-            if (ctx != null) {
-                try {
-                    MetadataManager.INSTANCE.abortTransaction(ctx);
-                } catch (Exception e2) {
-                    e2.addSuppressed(e);
-                    throw new IllegalStateException("Unable to abort " + 
e2.getMessage());
-                }
-            }
-
-        } finally {
-            MetadataManager.INSTANCE.releaseReadLatch();
-        }
+        this.feed = feed;
+        this.domain = domain;
+        initFeedDataSource(itemType);
     }
 
     public Feed getFeed() {
@@ -99,18 +80,6 @@
     private void initFeedDataSource(IAType itemType) {
         schemaTypes = new IAType[1];
         schemaTypes[0] = itemType;
-        INodeDomain domainForExternalData = new INodeDomain() {
-            @Override
-            public Integer cardinality() {
-                return null;
-            }
-
-            @Override
-            public boolean sameAs(INodeDomain domain) {
-                return domain == this;
-            }
-        };
-        domain = domainForExternalData;
     }
 
     public IFeed.FeedType getSourceFeedType() {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/719
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I776b80e5b824c83bcdf43c95fff99bb151506f84
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>

Reply via email to