Ali Alsuliman has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/3005

Change subject: [NO ISSUE] Parallel sort leftover changes p.2
......................................................................

[NO ISSUE] Parallel sort leftover changes p.2

Change-Id: Ie37e03b6fc6e55fc21f8324c0f09a7fa05b51769
---
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
5 files changed, 113 insertions(+), 75 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/05/3005/1

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index 4c58ad7..66c95ee 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -64,7 +64,7 @@
         COMPILER_SORT_SAMPLES(
                 INTEGER,
                 AlgebricksConfig.SORT_SAMPLES,
-                "The number of samples parallel sorting should " + "take from 
each partition");
+                "The number of samples which parallel sorting should take from 
each partition");
 
         private final IOptionType type;
         private final Object defaultValue;
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
index c967a94..d3e297d 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
@@ -118,12 +118,12 @@
             @Override
             public IAggregateEvaluator createAggregateEvaluator(final 
IHyracksTaskContext ctx)
                     throws HyracksDataException {
-                return new GlobalSamplingAggregateFunction(args, ctx, 
ascendingFlags, numOfPartitions, numOrderFields);
+                return new RangeMapFunction(args, ctx, ascendingFlags, 
numOfPartitions, numOrderFields);
             }
         };
     }
 
-    private class GlobalSamplingAggregateFunction implements 
IAggregateEvaluator {
+    private class RangeMapFunction implements IAggregateEvaluator {
         private final IScalarEvaluator localSamplesEval;
         private final IPointable localSamples;
         private final List<List<byte[]>> finalSamples;
@@ -138,7 +138,7 @@
         private final ArrayBackedValueStorage storage;
 
         @SuppressWarnings("unchecked")
-        private GlobalSamplingAggregateFunction(IScalarEvaluatorFactory[] 
args, IHyracksTaskContext context,
+        private RangeMapFunction(IScalarEvaluatorFactory[] args, 
IHyracksTaskContext context,
                 boolean[] ascending, int numOfPartitions, int 
numOrderByFields) throws HyracksDataException {
             localSamples = new VoidPointable();
             localSamplesEval = args[0].createScalarEvaluator(context);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
index df0b446..a5851d6 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
@@ -21,7 +21,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -31,7 +30,11 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty.PartitioningType;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -52,25 +55,60 @@
     }
 
     @Override
-    public void computeDeliveredProperties(ILogicalOperator op, 
IOptimizationContext context)
-            throws AlgebricksException {
+    public void computeDeliveredProperties(ILogicalOperator op, 
IOptimizationContext context) {
         AbstractLogicalOperator childOp = (AbstractLogicalOperator) 
op.getInputs().get(0).getValue();
         List<ILocalStructuralProperty> childLocalProps = 
childOp.getDeliveredPhysicalProperties().getLocalProperties();
-        List<ILocalStructuralProperty> localProperties;
-        if (childLocalProps != null) {
-            localProperties = new ArrayList<>(childLocalProps);
-        } else {
-            localProperties = new ArrayList<>(0);
+        IPartitioningProperty childGlobalProp = 
childOp.getDeliveredPhysicalProperties().getPartitioningProperty();
+        List<ILocalStructuralProperty> outputLocalProp = new ArrayList<>(0);
+        if (childLocalProps != null && childGlobalProp.getPartitioningType() 
== PartitioningType.ORDERED_PARTITIONED) {
+            // the child could have a local order property that matches its 
global order property
+            propagateChildProperties((OrderedPartitionedProperty) 
childGlobalProp, childLocalProps, outputLocalProp);
         }
 
-        deliveredProperties = new 
StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, 
localProperties);
+        deliveredProperties = new 
StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, 
outputLocalProp);
     }
 
     @Override
     public Pair<IConnectorDescriptor, IHyracksJobBuilder.TargetConstraint> 
createConnectorDescriptor(
-            IConnectorDescriptorRegistry spec, ILogicalOperator op, 
IOperatorSchema opSchema, JobGenContext context)
-            throws AlgebricksException {
+            IConnectorDescriptorRegistry spec, ILogicalOperator op, 
IOperatorSchema opSchema, JobGenContext context) {
         IConnectorDescriptor connector = new 
MToOneSequentialMergingConnectorDescriptor(spec);
         return new Pair<>(connector, IHyracksJobBuilder.TargetConstraint.ONE);
     }
+
+    /**
+     *
+     * @param childGlobalProp the global ordering property of the child 
represented by ORDERED_PARTITIONED partitioning
+     * @param childLocalProps the local properties inside the partitions
+     * @param outputLocalProp the local property of the connector that will be 
modified if propagating prop. happens
+     */
+    private void propagateChildProperties(OrderedPartitionedProperty 
childGlobalProp,
+            List<ILocalStructuralProperty> childLocalProps, 
List<ILocalStructuralProperty> outputLocalProp) {
+        List<OrderColumn> globalOrderColumns = 
childGlobalProp.getOrderColumns();
+        List<OrderColumn> outputOrderColumns = new ArrayList<>();
+        OrderColumn localOrderColumn;
+        ILocalStructuralProperty childLocalProp;
+        boolean done = false;
+        for (int j = 0; j < childLocalProps.size() && !done; j++) {
+            childLocalProp = childLocalProps.get(j);
+            if (childLocalProp.getPropertyType() == 
ILocalStructuralProperty.PropertyType.LOCAL_ORDER_PROPERTY) {
+                List<OrderColumn> localOrderColumns = ((LocalOrderProperty) 
childLocalProp).getOrderColumns();
+                // start matching the order columns
+                for (int i = 0; i < localOrderColumns.size() && i < 
globalOrderColumns.size(); i++) {
+                    localOrderColumn = localOrderColumns.get(i);
+                    if (localOrderColumn.equals(globalOrderColumns.get(i))) {
+                        outputOrderColumns.add(localOrderColumn);
+                    } else {
+                        // stop whenever the matching fails, end of prefix 
matching
+                        break;
+                    }
+                }
+                done = true;
+            }
+        }
+
+        if (!outputOrderColumns.isEmpty()) {
+            // found a prefix
+            outputLocalProp.add(new LocalOrderProperty(outputOrderColumns));
+        }
+    }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index cdab2f4..630d877 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -167,60 +167,6 @@
         return changed;
     }
 
-    private boolean physOptimizePlan(ILogicalPlan plan, 
IPhysicalPropertiesVector pvector, boolean nestedPlan,
-            IOptimizationContext context) throws AlgebricksException {
-        boolean loggerTraceEnabled = 
AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled();
-        boolean changed = false;
-        for (Mutable<ILogicalOperator> root : plan.getRoots()) {
-            if (physOptimizeOp(root, pvector, nestedPlan, context)) {
-                changed = true;
-            }
-            AbstractLogicalOperator op = (AbstractLogicalOperator) 
root.getValue();
-            op.computeDeliveredPhysicalProperties(context);
-            if (loggerTraceEnabled) {
-                AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">>>> Structural 
properties for " + op.getPhysicalOperator()
-                        + ": " + op.getDeliveredPhysicalProperties() + "\n");
-            }
-        }
-        return changed;
-    }
-
-    // Gets the index of a child to start top-down data property enforcement.
-    // If there is a partitioning-compatible child with the operator in opRef,
-    // start from this child; otherwise, start from child zero.
-    private int getStartChildIndex(AbstractLogicalOperator op, 
PhysicalRequirements pr, boolean nestedPlan,
-            IOptimizationContext context) throws AlgebricksException {
-        IPhysicalPropertiesVector[] reqdProperties = null;
-        if (pr != null) {
-            reqdProperties = pr.getRequiredProperties();
-        }
-
-        List<IPartitioningProperty> 
deliveredPartitioningPropertiesFromChildren = new ArrayList<>();
-        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
-            AbstractLogicalOperator child = (AbstractLogicalOperator) 
childRef.getValue();
-            deliveredPartitioningPropertiesFromChildren
-                    
.add(child.getDeliveredPhysicalProperties().getPartitioningProperty());
-        }
-        int partitioningCompatibleChild = 0;
-        for (int i = 0; i < op.getInputs().size(); i++) {
-            IPartitioningProperty deliveredPropertyFromChild = 
deliveredPartitioningPropertiesFromChildren.get(i);
-            if (reqdProperties == null || reqdProperties[i] == null
-                    || reqdProperties[i].getPartitioningProperty() == null || 
deliveredPropertyFromChild == null
-                    || reqdProperties[i].getPartitioningProperty()
-                            .getPartitioningType() != 
deliveredPartitioningPropertiesFromChildren.get(i)
-                                    .getPartitioningType()) {
-                continue;
-            }
-            IPartitioningProperty requiredPropertyForChild = 
reqdProperties[i].getPartitioningProperty();
-            // If child i's delivered partitioning property already satisfies 
the required property, stop and return the child index.
-            if 
(PropertiesUtil.matchPartitioningProps(requiredPropertyForChild, 
deliveredPropertyFromChild, true)) {
-                partitioningCompatibleChild = i;
-                break;
-            }
-        }
-        return partitioningCompatibleChild;
-    }
-
     private boolean physOptimizeOp(Mutable<ILogicalOperator> opRef, 
IPhysicalPropertiesVector required,
             boolean nestedPlan, IOptimizationContext context) throws 
AlgebricksException {
 
@@ -357,6 +303,60 @@
             physOptimizeOp(opRef, required, nestedPlan, context);
         }
         return changed;
+    }
+
+    private boolean physOptimizePlan(ILogicalPlan plan, 
IPhysicalPropertiesVector pvector, boolean nestedPlan,
+            IOptimizationContext context) throws AlgebricksException {
+        boolean loggerTraceEnabled = 
AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled();
+        boolean changed = false;
+        for (Mutable<ILogicalOperator> root : plan.getRoots()) {
+            if (physOptimizeOp(root, pvector, nestedPlan, context)) {
+                changed = true;
+            }
+            AbstractLogicalOperator op = (AbstractLogicalOperator) 
root.getValue();
+            op.computeDeliveredPhysicalProperties(context);
+            if (loggerTraceEnabled) {
+                AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">>>> Structural 
properties for " + op.getPhysicalOperator()
+                        + ": " + op.getDeliveredPhysicalProperties() + "\n");
+            }
+        }
+        return changed;
+    }
+
+    // Gets the index of a child to start top-down data property enforcement.
+    // If there is a partitioning-compatible child with the operator in opRef,
+    // start from this child; otherwise, start from child zero.
+    private int getStartChildIndex(AbstractLogicalOperator op, 
PhysicalRequirements pr, boolean nestedPlan,
+            IOptimizationContext context) throws AlgebricksException {
+        IPhysicalPropertiesVector[] reqdProperties = null;
+        if (pr != null) {
+            reqdProperties = pr.getRequiredProperties();
+        }
+
+        List<IPartitioningProperty> 
deliveredPartitioningPropertiesFromChildren = new ArrayList<>();
+        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+            AbstractLogicalOperator child = (AbstractLogicalOperator) 
childRef.getValue();
+            deliveredPartitioningPropertiesFromChildren
+                    
.add(child.getDeliveredPhysicalProperties().getPartitioningProperty());
+        }
+        int partitioningCompatibleChild = 0;
+        for (int i = 0; i < op.getInputs().size(); i++) {
+            IPartitioningProperty deliveredPropertyFromChild = 
deliveredPartitioningPropertiesFromChildren.get(i);
+            if (reqdProperties == null || reqdProperties[i] == null
+                    || reqdProperties[i].getPartitioningProperty() == null || 
deliveredPropertyFromChild == null
+                    || reqdProperties[i].getPartitioningProperty()
+                            .getPartitioningType() != 
deliveredPartitioningPropertiesFromChildren.get(i)
+                                    .getPartitioningType()) {
+                continue;
+            }
+            IPartitioningProperty requiredPropertyForChild = 
reqdProperties[i].getPartitioningProperty();
+            // If child i's delivered partitioning property already satisfies 
the required property, stop and return the child index.
+            if 
(PropertiesUtil.matchPartitioningProps(requiredPropertyForChild, 
deliveredPropertyFromChild, true)) {
+                partitioningCompatibleChild = i;
+                break;
+            }
+        }
+        return partitioningCompatibleChild;
     }
 
     private IPhysicalPropertiesVector 
newPropertiesDiff(AbstractLogicalOperator newChild,
@@ -888,13 +888,13 @@
         return forwardOperator;
     }
 
-    private boolean allAreOrderProps(List<ILocalStructuralProperty> cldLocals) 
{
-        for (ILocalStructuralProperty lsp : cldLocals) {
-            if (lsp.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) {
+    private boolean allAreOrderProps(List<ILocalStructuralProperty> 
childLocalProperties) {
+        for (ILocalStructuralProperty childLocalProperty : 
childLocalProperties) {
+            if (childLocalProperty.getPropertyType() != 
PropertyType.LOCAL_ORDER_PROPERTY) {
                 return false;
             }
         }
-        return !cldLocals.isEmpty();
+        return !childLocalProperties.isEmpty();
     }
 
     private void printOp(AbstractLogicalOperator op) throws 
AlgebricksException {
@@ -927,7 +927,7 @@
             throws AlgebricksException {
         ILogicalOperator oldOp = opRef.getValue();
         opRef.setValue(newOp);
-        newOp.getInputs().add(new MutableObject<ILogicalOperator>(oldOp));
+        newOp.getInputs().add(new MutableObject<>(oldOp));
         newOp.recomputeSchema();
         newOp.computeDeliveredPhysicalProperties(context);
         context.computeAndSetTypeEnvironmentForOperator(newOp);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
index c437619..0b1c0a2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
@@ -69,7 +69,7 @@
         }
     }
 
-    private synchronized boolean allPartitionsAdded() {
+    private boolean allPartitionsAdded() {
         for (int i = 0; i < partitions.length; i++) {
             if (partitions[i] == null) {
                 return false;

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3005
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie37e03b6fc6e55fc21f8324c0f09a7fa05b51769
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Ali Alsuliman <[email protected]>

Reply via email to