>From Ali Alsuliman <[email protected]>:

Ali Alsuliman has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19407 )


Change subject: WIP: unordered partitioning property
......................................................................

WIP: unordered partitioning property

Change-Id: I38ecaccf133028ff8c4854a02c635b44bdcefcc8
---
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
13 files changed, 135 insertions(+), 20 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/07/19407/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
index 513b5aa..7c368b5 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
@@ -31,12 +31,14 @@
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -121,19 +123,29 @@
         keysLeftBranchTileId.add(keysLeftBranch.get(0));
         List<LogicalVariable> keysRightBranchTileId = new ArrayList<>();
         keysRightBranchTileId.add(keysRightBranch.get(0));
-        IPartitioningProperty pp1 = UnorderedPartitionedProperty.of(new 
ListSet<>(keysLeftBranchTileId),
-                context.getComputationNodeDomain());
-        IPartitioningProperty pp2 = UnorderedPartitionedProperty.of(new 
ListSet<>(keysRightBranchTileId),
-                context.getComputationNodeDomain());
+        INodeDomain nodeDomain = context.getComputationNodeDomain();
+        IMetadataProvider<?, ?> mp = context.getMetadataProvider();
+        int[][] partitionsMap = mp.getPartitionsMap(nodeDomain);
+        IPartitioningProperty pp1;
+        IPartitioningProperty pp2;
+        if (partitionsMap == null) {
+            pp1 = UnorderedPartitionedProperty.of(new 
ListSet<>(keysLeftBranchTileId), nodeDomain);
+            pp2 = UnorderedPartitionedProperty.of(new 
ListSet<>(keysRightBranchTileId), nodeDomain);
+        } else {
+            pp1 = UnorderedPartitionedProperty.ofPartitionsMap(new 
ListSet<>(keysLeftBranchTileId), nodeDomain,
+                    partitionsMap);
+            pp2 = UnorderedPartitionedProperty.ofPartitionsMap(new 
ListSet<>(keysRightBranchTileId), nodeDomain,
+                    partitionsMap);
+        }

         List<ILocalStructuralProperty> localProperties1 = new ArrayList<>();
-        List<OrderColumn> orderColumns1 = new ArrayList<OrderColumn>();
+        List<OrderColumn> orderColumns1 = new ArrayList<>();
         orderColumns1.add(new OrderColumn(keysLeftBranch.get(0), 
OrderOperator.IOrder.OrderKind.ASC));
         orderColumns1.add(new OrderColumn(keysLeftBranch.get(1), 
OrderOperator.IOrder.OrderKind.ASC));
         localProperties1.add(new LocalOrderProperty(orderColumns1));

         List<ILocalStructuralProperty> localProperties2 = new ArrayList<>();
-        List<OrderColumn> orderColumns2 = new ArrayList<OrderColumn>();
+        List<OrderColumn> orderColumns2 = new ArrayList<>();
         orderColumns2.add(new OrderColumn(keysRightBranch.get(0), 
OrderOperator.IOrder.OrderKind.ASC));
         orderColumns2.add(new OrderColumn(keysRightBranch.get(1), 
OrderOperator.IOrder.OrderKind.ASC));
         localProperties2.add(new LocalOrderProperty(orderColumns2));
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
index 828715d..84bdcd46 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
@@ -24,4 +24,5 @@

     AlgebricksAbsolutePartitionConstraint getClusterLocations();

+    int[][] getPartitionsMap();
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
index 238cf69..cee44f0 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
@@ -97,7 +97,12 @@
             IOptimizationContext ctx) throws AlgebricksException {
         switch (ds.getDatasourceType()) {
             case DataSource.Type.INTERNAL_DATASET: {
-                IPartitioningProperty pp = new 
RandomPartitioningProperty(domain);
+                Set<LogicalVariable> pvars = new ListSet<>();
+                Dataset dataset = ((DatasetDataSource) ds).getDataset();
+                int[][] computeStorageMap = ((MetadataProvider) 
ctx.getMetadataProvider())
+                        
.getPartitioningProperties(dataset).getComputeStorageMap();
+                IPartitioningProperty pp =
+                        getInternalDatasetPartitioningProperty(ds, domain, 
scanVariables, pvars, computeStorageMap);
                 List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
                 ds.computeLocalStructuralProperties(propsLocal, scanVariables);
                 return new StructuralPropertiesVector(pp, propsLocal);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 20299d5..6ed0af9 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -25,6 +25,7 @@
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -130,6 +131,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
@@ -1874,6 +1876,20 @@
         return new AsterixTupleFilterFactory(filterEvalFactory, 
context.getBinaryBooleanInspectorFactory());
     }

+    @Override
+    public int[][] getPartitionsMap(INodeDomain nodeDomain) {
+        if (!(nodeDomain instanceof DefaultNodeGroupDomain inputDomain)) {
+            return null;
+        }
+        String[] inputLocations = inputDomain.getNodes();
+        AlgebricksAbsolutePartitionConstraint locations = 
dataPartitioningProvider.getClusterLocations();
+        String[] clusterLocations = locations.getLocations();
+        if (!Arrays.equals(inputLocations, clusterLocations)) {
+            return null;
+        }
+        return dataPartitioningProvider.getPartitionsMap();
+    }
+
     private void validateRecordType(IAType itemType) throws 
AlgebricksException {
         if (itemType.getTypeTag() != ATypeTag.OBJECT) {
             throw new AlgebricksException("Only record types can be indexed.");
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
index 1a1c8ac..c87d725 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
@@ -70,4 +70,10 @@
     public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
         return clusterStateManager.getNodeSortedClusterLocations();
     }
+
+    @Override
+    public int[][] getPartitionsMap() {
+        AlgebricksAbsolutePartitionConstraint locations = 
clusterStateManager.getNodeSortedClusterLocations();
+        return getOneToOnePartitionsMap(getLocationsCount(locations));
+    }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
index 44141cb..2282a11 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
@@ -134,4 +134,10 @@
         SplitComputeLocations locations = 
getSplits(MetadataConstants.DEFAULT_DATABASE);
         return (AlgebricksAbsolutePartitionConstraint) 
locations.getConstraints();
     }
+
+    @Override
+    public int[][] getPartitionsMap() {
+        StorageComputePartitionsMap partitionMap = 
clusterStateManager.getStorageComputeMap();
+        return partitionMap.getComputeToStorageMap(false);
+    }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 11c8b81..caac0ae 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.data.IAWriterFactory;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
@@ -199,4 +200,6 @@

     boolean isBlockingOperatorDisabled();

+    int[][] getPartitionsMap(INodeDomain nodeDomain);
+
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index 7a5fc8a..1d18390 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -30,10 +30,12 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -99,14 +101,25 @@
         // parent's partitioning requirements.
         IPartitioningProperty pp1;
         IPartitioningProperty pp2;
+        INodeDomain nodeDomain = ctx.getComputationNodeDomain();
         switch (partitioningType) {
             case PAIRWISE:
-                pp1 = UnorderedPartitionedProperty.of(new 
ListSet<>(keysLeftBranch), ctx.getComputationNodeDomain());
-                pp2 = UnorderedPartitionedProperty.of(new 
ListSet<>(keysRightBranch), ctx.getComputationNodeDomain());
+                IMetadataProvider<?, ?> mp = ctx.getMetadataProvider();
+                int[][] partitionsMap = mp.getPartitionsMap(nodeDomain);
+                if (partitionsMap == null) {
+                    pp1 = UnorderedPartitionedProperty.of(new 
ListSet<>(keysLeftBranch), nodeDomain);
+                    pp2 = UnorderedPartitionedProperty.of(new 
ListSet<>(keysRightBranch), nodeDomain);
+                } else {
+                    pp1 = UnorderedPartitionedProperty.ofPartitionsMap(new 
ListSet<>(keysLeftBranch), nodeDomain,
+                            partitionsMap);
+                    pp2 = UnorderedPartitionedProperty.ofPartitionsMap(new 
ListSet<>(keysRightBranch), nodeDomain,
+                            partitionsMap);
+                }
+
                 break;
             case BROADCAST:
-                pp1 = new 
RandomPartitioningProperty(ctx.getComputationNodeDomain());
-                pp2 = new 
BroadcastPartitioningProperty(ctx.getComputationNodeDomain());
+                pp1 = new RandomPartitioningProperty(nodeDomain);
+                pp2 = new BroadcastPartitioningProperty(nodeDomain);
                 break;
             default:
                 throw new IllegalStateException();
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
index 2f02a61..dc3be5c 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
@@ -26,9 +26,11 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -65,7 +67,14 @@
         IPartitioningProperty pp = null;
         AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
         if (aop.getExecutionMode() == 
AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
-            pp = UnorderedPartitionedProperty.of(new ListSet<>(columnList), 
context.getComputationNodeDomain());
+            INodeDomain nodeDomain = context.getComputationNodeDomain();
+            IMetadataProvider<?, ?> mp = context.getMetadataProvider();
+            int[][] partitionsMap = mp.getPartitionsMap(nodeDomain);
+            if (partitionsMap == null) {
+                pp = UnorderedPartitionedProperty.of(new 
ListSet<>(columnList), nodeDomain);
+            } else {
+                pp = UnorderedPartitionedProperty.ofPartitionsMap(new 
ListSet<>(columnList), nodeDomain, partitionsMap);
+            }
         }
         pv[0] = new StructuralPropertiesVector(pp, localProps);
         return new PhysicalRequirements(pv, 
IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index 969fd99..7ab7039 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -38,12 +38,14 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -230,7 +232,14 @@
         IPartitioningProperty pp = null;
         AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
         if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
-            pp = UnorderedPartitionedProperty.of(new ListSet<>(columnList), 
context.getComputationNodeDomain());
+            INodeDomain nodeDomain = context.getComputationNodeDomain();
+            IMetadataProvider<?, ?> mp = context.getMetadataProvider();
+            int[][] partitionsMap = mp.getPartitionsMap(nodeDomain);
+            if (partitionsMap == null) {
+                pp = UnorderedPartitionedProperty.of(new 
ListSet<>(columnList), nodeDomain);
+            } else {
+                pp = UnorderedPartitionedProperty.ofPartitionsMap(new 
ListSet<>(columnList), nodeDomain, partitionsMap);
+            }
         }
         pv[0] = new StructuralPropertiesVector(pp, localProps);
         return new PhysicalRequirements(pv, 
IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
index 8ff605d..0b4d21d 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
@@ -37,11 +37,13 @@
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -79,8 +81,15 @@
         IPartitioningProperty pp;
         switch (op.getExecutionMode()) {
             case PARTITIONED:
-                pp = UnorderedPartitionedProperty.of(new 
ListSet<>(partitionColumns),
-                        context.getComputationNodeDomain());
+                INodeDomain nodeDomain = context.getComputationNodeDomain();
+                IMetadataProvider<?, ?> mp = context.getMetadataProvider();
+                int[][] partitionsMap = mp.getPartitionsMap(nodeDomain);
+                if (partitionsMap == null) {
+                    pp = UnorderedPartitionedProperty.of(new 
ListSet<>(partitionColumns), nodeDomain);
+                } else {
+                    pp = UnorderedPartitionedProperty.ofPartitionsMap(new 
ListSet<>(partitionColumns), nodeDomain,
+                            partitionsMap);
+                }
                 break;
             case UNPARTITIONED:
                 pp = IPartitioningProperty.UNPARTITIONED;
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 89e17ad..bd345e4 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -38,12 +38,14 @@
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.IPartialAggregationTypeComputer;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalGroupingProperty;
@@ -108,9 +110,16 @@
         AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
         if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
             StructuralPropertiesVector[] pv = new 
StructuralPropertiesVector[1];
-            pv[0] = new StructuralPropertiesVector(
-                    UnorderedPartitionedProperty.of(new ListSet<>(columnList), 
context.getComputationNodeDomain()),
-                    null);
+            INodeDomain nodeDomain = context.getComputationNodeDomain();
+            IMetadataProvider<?, ?> mp = context.getMetadataProvider();
+            int[][] partitionsMap = mp.getPartitionsMap(nodeDomain);
+            UnorderedPartitionedProperty pp;
+            if (partitionsMap == null) {
+                pp = UnorderedPartitionedProperty.of(new 
ListSet<>(columnList), nodeDomain);
+            } else {
+                pp = UnorderedPartitionedProperty.ofPartitionsMap(new 
ListSet<>(columnList), nodeDomain, partitionsMap);
+            }
+            pv[0] = new StructuralPropertiesVector(pp, null);
             return new PhysicalRequirements(pv, 
IPartitioningRequirementsCoordinator.NO_COORDINATION);
         } else {
             return emptyUnaryRequirements();
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index b8c1f36..2384b4b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -42,6 +42,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -95,8 +96,15 @@
         IPartitioningProperty pp;
         switch (op.getExecutionMode()) {
             case PARTITIONED:
-                pp = UnorderedPartitionedProperty.of(new 
ListSet<>(partitionVariables),
-                        context.getComputationNodeDomain());
+                INodeDomain nodeDomain = context.getComputationNodeDomain();
+                IMetadataProvider<?, ?> mp = context.getMetadataProvider();
+                int[][] partitionsMap = mp.getPartitionsMap(nodeDomain);
+                if (partitionsMap == null) {
+                    pp = UnorderedPartitionedProperty.of(new 
ListSet<>(partitionVariables), nodeDomain);
+                } else {
+                    pp = UnorderedPartitionedProperty.ofPartitionsMap(new 
ListSet<>(partitionVariables), nodeDomain,
+                            partitionsMap);
+                }
                 break;
             case UNPARTITIONED:
                 pp = IPartitioningProperty.UNPARTITIONED;

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19407
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I38ecaccf133028ff8c4854a02c635b44bdcefcc8
Gerrit-Change-Number: 19407
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-MessageType: newchange

Reply via email to