>From Ali Alsuliman <[email protected]>:
Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19407 )
Change subject: WIP: unordered partitioning property
......................................................................
WIP: unordered partitioning property
Change-Id: I38ecaccf133028ff8c4854a02c635b44bdcefcc8
---
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
13 files changed, 135 insertions(+), 20 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/07/19407/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
index 513b5aa..7c368b5 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/SpatialJoinPOperator.java
@@ -31,12 +31,14 @@
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator;
import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -121,19 +123,29 @@
keysLeftBranchTileId.add(keysLeftBranch.get(0));
List<LogicalVariable> keysRightBranchTileId = new ArrayList<>();
keysRightBranchTileId.add(keysRightBranch.get(0));
- IPartitioningProperty pp1 = UnorderedPartitionedProperty.of(new
ListSet<>(keysLeftBranchTileId),
- context.getComputationNodeDomain());
- IPartitioningProperty pp2 = UnorderedPartitionedProperty.of(new
ListSet<>(keysRightBranchTileId),
- context.getComputationNodeDomain());
+ INodeDomain nodeDomain = context.getComputationNodeDomain();
+ IMetadataProvider<?, ?> mp = context.getMetadataProvider();
+ int[][] partitionsMap = mp.getPartitionsMap(nodeDomain);
+ IPartitioningProperty pp1;
+ IPartitioningProperty pp2;
+ if (partitionsMap == null) {
+ pp1 = UnorderedPartitionedProperty.of(new
ListSet<>(keysLeftBranchTileId), nodeDomain);
+ pp2 = UnorderedPartitionedProperty.of(new
ListSet<>(keysRightBranchTileId), nodeDomain);
+ } else {
+ pp1 = UnorderedPartitionedProperty.ofPartitionsMap(new
ListSet<>(keysLeftBranchTileId), nodeDomain,
+ partitionsMap);
+ pp2 = UnorderedPartitionedProperty.ofPartitionsMap(new
ListSet<>(keysRightBranchTileId), nodeDomain,
+ partitionsMap);
+ }
List<ILocalStructuralProperty> localProperties1 = new ArrayList<>();
- List<OrderColumn> orderColumns1 = new ArrayList<OrderColumn>();
+ List<OrderColumn> orderColumns1 = new ArrayList<>();
orderColumns1.add(new OrderColumn(keysLeftBranch.get(0),
OrderOperator.IOrder.OrderKind.ASC));
orderColumns1.add(new OrderColumn(keysLeftBranch.get(1),
OrderOperator.IOrder.OrderKind.ASC));
localProperties1.add(new LocalOrderProperty(orderColumns1));
List<ILocalStructuralProperty> localProperties2 = new ArrayList<>();
- List<OrderColumn> orderColumns2 = new ArrayList<OrderColumn>();
+ List<OrderColumn> orderColumns2 = new ArrayList<>();
orderColumns2.add(new OrderColumn(keysRightBranch.get(0),
OrderOperator.IOrder.OrderKind.ASC));
orderColumns2.add(new OrderColumn(keysRightBranch.get(1),
OrderOperator.IOrder.OrderKind.ASC));
localProperties2.add(new LocalOrderProperty(orderColumns2));
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
index 828715d..84bdcd46 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
@@ -24,4 +24,5 @@
AlgebricksAbsolutePartitionConstraint getClusterLocations();
+ int[][] getPartitionsMap();
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
index 238cf69..cee44f0 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
@@ -97,7 +97,12 @@
IOptimizationContext ctx) throws AlgebricksException {
switch (ds.getDatasourceType()) {
case DataSource.Type.INTERNAL_DATASET: {
- IPartitioningProperty pp = new
RandomPartitioningProperty(domain);
+ Set<LogicalVariable> pvars = new ListSet<>();
+ Dataset dataset = ((DatasetDataSource) ds).getDataset();
+ int[][] computeStorageMap = ((MetadataProvider)
ctx.getMetadataProvider())
+
.getPartitioningProperties(dataset).getComputeStorageMap();
+ IPartitioningProperty pp =
+ getInternalDatasetPartitioningProperty(ds, domain,
scanVariables, pvars, computeStorageMap);
List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
ds.computeLocalStructuralProperties(propsLocal, scanVariables);
return new StructuralPropertiesVector(pp, propsLocal);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 20299d5..6ed0af9 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -130,6 +131,7 @@
import
org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import
org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
@@ -1874,6 +1876,20 @@
return new AsterixTupleFilterFactory(filterEvalFactory,
context.getBinaryBooleanInspectorFactory());
}
+ @Override
+ public int[][] getPartitionsMap(INodeDomain nodeDomain) {
+ if (!(nodeDomain instanceof DefaultNodeGroupDomain inputDomain)) {
+ return null;
+ }
+ String[] inputLocations = inputDomain.getNodes();
+ AlgebricksAbsolutePartitionConstraint locations =
dataPartitioningProvider.getClusterLocations();
+ String[] clusterLocations = locations.getLocations();
+ if (!Arrays.equals(inputLocations, clusterLocations)) {
+ return null;
+ }
+ return dataPartitioningProvider.getPartitionsMap();
+ }
+
private void validateRecordType(IAType itemType) throws
AlgebricksException {
if (itemType.getTypeTag() != ATypeTag.OBJECT) {
throw new AlgebricksException("Only record types can be indexed.");
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
index 1a1c8ac..c87d725 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
@@ -70,4 +70,10 @@
public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
return clusterStateManager.getNodeSortedClusterLocations();
}
+
+ @Override
+ public int[][] getPartitionsMap() {
+ AlgebricksAbsolutePartitionConstraint locations =
clusterStateManager.getNodeSortedClusterLocations();
+ return getOneToOnePartitionsMap(getLocationsCount(locations));
+ }
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
index 44141cb..2282a11 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
@@ -134,4 +134,10 @@
SplitComputeLocations locations =
getSplits(MetadataConstants.DEFAULT_DATABASE);
return (AlgebricksAbsolutePartitionConstraint)
locations.getConstraints();
}
+
+ @Override
+ public int[][] getPartitionsMap() {
+ StorageComputePartitionsMap partitionMap =
clusterStateManager.getStorageComputeMap();
+ return partitionMap.getComputeToStorageMap(false);
+ }
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 11c8b81..caac0ae 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.data.IAWriterFactory;
import org.apache.hyracks.algebricks.data.IPrinterFactory;
@@ -199,4 +200,6 @@
boolean isBlockingOperatorDisabled();
+ int[][] getPartitionsMap(INodeDomain nodeDomain);
+
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index 7a5fc8a..1d18390 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -30,10 +30,12 @@
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import
org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -99,14 +101,25 @@
// parent's partitioning requirements.
IPartitioningProperty pp1;
IPartitioningProperty pp2;
+ INodeDomain nodeDomain = ctx.getComputationNodeDomain();
switch (partitioningType) {
case PAIRWISE:
- pp1 = UnorderedPartitionedProperty.of(new
ListSet<>(keysLeftBranch), ctx.getComputationNodeDomain());
- pp2 = UnorderedPartitionedProperty.of(new
ListSet<>(keysRightBranch), ctx.getComputationNodeDomain());
+ IMetadataProvider<?, ?> mp = ctx.getMetadataProvider();
+ int[][] partitionsMap = mp.getPartitionsMap(nodeDomain);
+ if (partitionsMap == null) {
+ pp1 = UnorderedPartitionedProperty.of(new
ListSet<>(keysLeftBranch), nodeDomain);
+ pp2 = UnorderedPartitionedProperty.of(new
ListSet<>(keysRightBranch), nodeDomain);
+ } else {
+ pp1 = UnorderedPartitionedProperty.ofPartitionsMap(new
ListSet<>(keysLeftBranch), nodeDomain,
+ partitionsMap);
+ pp2 = UnorderedPartitionedProperty.ofPartitionsMap(new
ListSet<>(keysRightBranch), nodeDomain,
+ partitionsMap);
+ }
+
break;
case BROADCAST:
- pp1 = new
RandomPartitioningProperty(ctx.getComputationNodeDomain());
- pp2 = new
BroadcastPartitioningProperty(ctx.getComputationNodeDomain());
+ pp1 = new RandomPartitioningProperty(nodeDomain);
+ pp2 = new BroadcastPartitioningProperty(nodeDomain);
break;
default:
throw new IllegalStateException();
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
index 2f02a61..dc3be5c 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
@@ -26,9 +26,11 @@
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -65,7 +67,14 @@
IPartitioningProperty pp = null;
AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
if (aop.getExecutionMode() ==
AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
- pp = UnorderedPartitionedProperty.of(new ListSet<>(columnList),
context.getComputationNodeDomain());
+ INodeDomain nodeDomain = context.getComputationNodeDomain();
+ IMetadataProvider<?, ?> mp = context.getMetadataProvider();
+ int[][] partitionsMap = mp.getPartitionsMap(nodeDomain);
+ if (partitionsMap == null) {
+ pp = UnorderedPartitionedProperty.of(new
ListSet<>(columnList), nodeDomain);
+ } else {
+ pp = UnorderedPartitionedProperty.ofPartitionsMap(new
ListSet<>(columnList), nodeDomain, partitionsMap);
+ }
}
pv[0] = new StructuralPropertiesVector(pp, localProps);
return new PhysicalRequirements(pv,
IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index 969fd99..7ab7039 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -38,12 +38,14 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import
org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -230,7 +232,14 @@
IPartitioningProperty pp = null;
AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
- pp = UnorderedPartitionedProperty.of(new ListSet<>(columnList),
context.getComputationNodeDomain());
+ INodeDomain nodeDomain = context.getComputationNodeDomain();
+ IMetadataProvider<?, ?> mp = context.getMetadataProvider();
+ int[][] partitionsMap = mp.getPartitionsMap(nodeDomain);
+ if (partitionsMap == null) {
+ pp = UnorderedPartitionedProperty.of(new
ListSet<>(columnList), nodeDomain);
+ } else {
+ pp = UnorderedPartitionedProperty.ofPartitionsMap(new
ListSet<>(columnList), nodeDomain, partitionsMap);
+ }
}
pv[0] = new StructuralPropertiesVector(pp, localProps);
return new PhysicalRequirements(pv,
IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
index 8ff605d..0b4d21d 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
@@ -37,11 +37,13 @@
import
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import
org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -79,8 +81,15 @@
IPartitioningProperty pp;
switch (op.getExecutionMode()) {
case PARTITIONED:
- pp = UnorderedPartitionedProperty.of(new
ListSet<>(partitionColumns),
- context.getComputationNodeDomain());
+ INodeDomain nodeDomain = context.getComputationNodeDomain();
+ IMetadataProvider<?, ?> mp = context.getMetadataProvider();
+ int[][] partitionsMap = mp.getPartitionsMap(nodeDomain);
+ if (partitionsMap == null) {
+ pp = UnorderedPartitionedProperty.of(new
ListSet<>(partitionColumns), nodeDomain);
+ } else {
+ pp = UnorderedPartitionedProperty.ofPartitionsMap(new
ListSet<>(partitionColumns), nodeDomain,
+ partitionsMap);
+ }
break;
case UNPARTITIONED:
pp = IPartitioningProperty.UNPARTITIONED;
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 89e17ad..bd345e4 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -38,12 +38,14 @@
import
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
import
org.apache.hyracks.algebricks.core.algebra.expressions.IPartialAggregationTypeComputer;
import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import
org.apache.hyracks.algebricks.core.algebra.properties.LocalGroupingProperty;
@@ -108,9 +110,16 @@
AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
StructuralPropertiesVector[] pv = new
StructuralPropertiesVector[1];
- pv[0] = new StructuralPropertiesVector(
- UnorderedPartitionedProperty.of(new ListSet<>(columnList),
context.getComputationNodeDomain()),
- null);
+ INodeDomain nodeDomain = context.getComputationNodeDomain();
+ IMetadataProvider<?, ?> mp = context.getMetadataProvider();
+ int[][] partitionsMap = mp.getPartitionsMap(nodeDomain);
+ UnorderedPartitionedProperty pp;
+ if (partitionsMap == null) {
+ pp = UnorderedPartitionedProperty.of(new
ListSet<>(columnList), nodeDomain);
+ } else {
+ pp = UnorderedPartitionedProperty.ofPartitionsMap(new
ListSet<>(columnList), nodeDomain, partitionsMap);
+ }
+ pv[0] = new StructuralPropertiesVector(pp, null);
return new PhysicalRequirements(pv,
IPartitioningRequirementsCoordinator.NO_COORDINATION);
} else {
return emptyUnaryRequirements();
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index b8c1f36..2384b4b 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -42,6 +42,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -95,8 +96,15 @@
IPartitioningProperty pp;
switch (op.getExecutionMode()) {
case PARTITIONED:
- pp = UnorderedPartitionedProperty.of(new
ListSet<>(partitionVariables),
- context.getComputationNodeDomain());
+ INodeDomain nodeDomain = context.getComputationNodeDomain();
+ IMetadataProvider<?, ?> mp = context.getMetadataProvider();
+ int[][] partitionsMap = mp.getPartitionsMap(nodeDomain);
+ if (partitionsMap == null) {
+ pp = UnorderedPartitionedProperty.of(new
ListSet<>(partitionVariables), nodeDomain);
+ } else {
+ pp = UnorderedPartitionedProperty.ofPartitionsMap(new
ListSet<>(partitionVariables), nodeDomain,
+ partitionsMap);
+ }
break;
case UNPARTITIONED:
pp = IPartitioningProperty.UNPARTITIONED;
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19407
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I38ecaccf133028ff8c4854a02c635b44bdcefcc8
Gerrit-Change-Number: 19407
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-MessageType: newchange