Ali Alsuliman has submitted this change and it was merged. Change subject: [ASTERIXDB-2286][COMP] Parallel sort changes p.2 ......................................................................
[ASTERIXDB-2286][COMP] Parallel sort changes p.2 - user model changes: no - storage format changes: no - interface changes: no details: This patch is change the way the SequentialMergeExchangePOperator connector computes its local property instead of blindly propagating the child's local property. The patch also includes minor code clean-ups (moved some methods down) Change-Id: Ie37e03b6fc6e55fc21f8324c0f09a7fa05b51769 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3005 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Dmitry Lychagin <[email protected]> --- A asterixdb/asterix-app/src/test/resources/optimizerts/queries/p_sort_seq_merge/p_sort_seq_merge.sqlpp A asterixdb/asterix-app/src/test/resources/optimizerts/results/p_sort_seq_merge/p_sort_seq_merge.plan M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_seq_merge/p_sort_seq_merge.1.ddl.sqlpp M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java 9 files changed, 206 insertions(+), 78 deletions(-) Approvals: Jenkins: Verified; ; Verified Dmitry Lychagin: Looks good to me, approved Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/p_sort_seq_merge/p_sort_seq_merge.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/p_sort_seq_merge/p_sort_seq_merge.sqlpp new file mode 100644 index 0000000..473a52a --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/p_sort_seq_merge/p_sort_seq_merge.sqlpp @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Description: testing a sequential merge when parallel sort has redistributed the data across partitions and one of + * the next operators requires merging the data. The local order property is not present, but ORDERED_PARTITIONED is + * present, and sequential merge connector will be introduced instead of a random merge. + */ + +drop dataverse test if exists; +create dataverse test; + +use test; + +create type TestType as +{ + id: int, + f1: int +}; + +create dataset TestDS(TestType) primary key id; + +set `compiler.sort.parallel` "true"; + +[(select * from TestDS v order by v.f1, v.id)]; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/p_sort_seq_merge/p_sort_seq_merge.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/p_sort_seq_merge/p_sort_seq_merge.plan new file mode 100644 index 0000000..d3d1d85 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/p_sort_seq_merge/p_sort_seq_merge.plan @@ -0,0 +1,34 @@ +-- DISTRIBUTE_RESULT |UNPARTITIONED| + -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED| + -- STREAM_PROJECT |UNPARTITIONED| + -- ASSIGN |UNPARTITIONED| + -- AGGREGATE |UNPARTITIONED| + -- SEQUENTIAL_MERGE_EXCHANGE |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STABLE_SORT [$$18(ASC), $$17(ASC)] |PARTITIONED| + -- RANGE_PARTITION_EXCHANGE [$$18(ASC), $$17(ASC)] |PARTITIONED| + -- FORWARD |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- REPLICATE |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- DATASOURCE_SCAN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- EMPTY_TUPLE_SOURCE |PARTITIONED| + -- BROADCAST_EXCHANGE |PARTITIONED| + -- AGGREGATE |UNPARTITIONED| + -- RANDOM_MERGE_EXCHANGE |PARTITIONED| + -- AGGREGATE |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- REPLICATE |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- DATASOURCE_SCAN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- EMPTY_TUPLE_SOURCE |PARTITIONED| \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_seq_merge/p_sort_seq_merge.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_seq_merge/p_sort_seq_merge.1.ddl.sqlpp index af54590..9f45ee3 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_seq_merge/p_sort_seq_merge.1.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_seq_merge/p_sort_seq_merge.1.ddl.sqlpp @@ -19,7 +19,8 @@ /* * Description: testing a sequential merge when parallel sort has redistributed the data across partitions and one of - * the next operators requires merging the sorted data. + * the next operators requires merging the data. The local order property is not present, but ORDERED_PARTITIONED is + * present, and sequential merge connector will be introduced instead of a random merge. */ drop dataverse test if exists; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java index 4c58ad7..66c95ee 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java @@ -64,7 +64,7 @@ COMPILER_SORT_SAMPLES( INTEGER, AlgebricksConfig.SORT_SAMPLES, - "The number of samples parallel sorting should " + "take from each partition"); + "The number of samples which parallel sorting should take from each partition"); private final IOptionType type; private final Object defaultValue; diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java index c967a94..d33a6f7 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java @@ -118,12 +118,12 @@ @Override public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { - return new GlobalSamplingAggregateFunction(args, ctx, ascendingFlags, numOfPartitions, numOrderFields); + return new RangeMapFunction(args, ctx, ascendingFlags, numOfPartitions, numOrderFields); } }; } - private class GlobalSamplingAggregateFunction implements IAggregateEvaluator { + private class RangeMapFunction implements IAggregateEvaluator { private final IScalarEvaluator localSamplesEval; private final IPointable localSamples; private final List<List<byte[]>> finalSamples; @@ -138,8 +138,8 @@ private final ArrayBackedValueStorage storage; @SuppressWarnings("unchecked") - private GlobalSamplingAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, - boolean[] ascending, int numOfPartitions, int numOrderByFields) throws HyracksDataException { + private RangeMapFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, boolean[] ascending, + int numOfPartitions, int numOrderByFields) throws HyracksDataException { localSamples = new VoidPointable(); localSamplesEval = args[0].createScalarEvaluator(context); finalSamples = new ArrayList<>(); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java index df0b446..df9141b 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.List; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; @@ -31,7 +30,11 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty.PartitioningType; import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; +import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty; import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; @@ -39,6 +42,9 @@ import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; import org.apache.hyracks.dataflow.std.connectors.MToOneSequentialMergingConnectorDescriptor; +/** + * A merging connector that merges the tuples sequentially from the partitions starting from the partition at index 0. + */ public class SequentialMergeExchangePOperator extends AbstractExchangePOperator { @Override public PhysicalOperatorTag getOperatorTag() { @@ -47,30 +53,75 @@ @Override public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { + IPhysicalPropertiesVector requiredByParent, IOptimizationContext context) { return emptyUnaryRequirements(); } + /** + * <Pre> + * The local properties delivered by this connector are either: + * 1. nothing if the child doesn't deliver any special property + * 2. order property if the child is locally ordered and globally ordered on the same prefix + * + * The partitioning property is always UNPARTITIONED since it's a merging connector + * </Pre> + * @param op the logical operator of this physical operator + * @param context optimization context, not used here + */ @Override - public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) - throws AlgebricksException { + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { AbstractLogicalOperator childOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); List<ILocalStructuralProperty> childLocalProps = childOp.getDeliveredPhysicalProperties().getLocalProperties(); - List<ILocalStructuralProperty> localProperties; - if (childLocalProps != null) { - localProperties = new ArrayList<>(childLocalProps); - } else { - localProperties = new ArrayList<>(0); + IPartitioningProperty childPartitioning = childOp.getDeliveredPhysicalProperties().getPartitioningProperty(); + List<ILocalStructuralProperty> outputLocalProp = new ArrayList<>(0); + if (childLocalProps != null && !childLocalProps.isEmpty() && childPartitioning != null + && childPartitioning.getPartitioningType() == PartitioningType.ORDERED_PARTITIONED) { + // the child could have a local order property that matches its global order property + propagateChildProperties((OrderedPartitionedProperty) childPartitioning, childLocalProps, outputLocalProp); } - deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, localProperties); + deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, outputLocalProp); } @Override public Pair<IConnectorDescriptor, IHyracksJobBuilder.TargetConstraint> createConnectorDescriptor( - IConnectorDescriptorRegistry spec, ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) - throws AlgebricksException { + IConnectorDescriptorRegistry spec, ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) { IConnectorDescriptor connector = new MToOneSequentialMergingConnectorDescriptor(spec); return new Pair<>(connector, IHyracksJobBuilder.TargetConstraint.ONE); } + + /** + * Matches prefix of the child's local order property & global order property. If a prefix is determined, the + * local order property is propagated through this connector. In essence, the connector says it maintains the + * order originally present in the child. + * @param childPartitioning the global ordering property of the child made by ORDERED_PARTITIONED partitioning + * @param childLocalProps the local properties inside the partitions + * @param outputLocalProp the local property of the connector that will be modified if propagating prop. happens + */ + private void propagateChildProperties(OrderedPartitionedProperty childPartitioning, + List<ILocalStructuralProperty> childLocalProps, List<ILocalStructuralProperty> outputLocalProp) { + ILocalStructuralProperty childLocalProp = childLocalProps.get(0); + // skip if the first property is a grouping property + if (childLocalProp.getPropertyType() == ILocalStructuralProperty.PropertyType.LOCAL_ORDER_PROPERTY) { + OrderColumn localOrderColumn; + List<OrderColumn> outputOrderColumns = new ArrayList<>(); + List<OrderColumn> globalOrderColumns = childPartitioning.getOrderColumns(); + List<OrderColumn> localOrderColumns = ((LocalOrderProperty) childLocalProp).getOrderColumns(); + // start matching the order columns + for (int i = 0; i < localOrderColumns.size() && i < globalOrderColumns.size(); i++) { + localOrderColumn = localOrderColumns.get(i); + if (localOrderColumn.equals(globalOrderColumns.get(i))) { + outputOrderColumns.add(localOrderColumn); + } else { + // stop whenever the matching fails, end of prefix matching + break; + } + } + + if (!outputOrderColumns.isEmpty()) { + // found a prefix + outputLocalProp.add(new LocalOrderProperty(outputOrderColumns)); + } + } + } } diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java index 52b3f59..96e2e53 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java @@ -167,60 +167,6 @@ return changed; } - private boolean physOptimizePlan(ILogicalPlan plan, IPhysicalPropertiesVector pvector, boolean nestedPlan, - IOptimizationContext context) throws AlgebricksException { - boolean loggerTraceEnabled = AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled(); - boolean changed = false; - for (Mutable<ILogicalOperator> root : plan.getRoots()) { - if (physOptimizeOp(root, pvector, nestedPlan, context)) { - changed = true; - } - AbstractLogicalOperator op = (AbstractLogicalOperator) root.getValue(); - op.computeDeliveredPhysicalProperties(context); - if (loggerTraceEnabled) { - AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">>>> Structural properties for " + op.getPhysicalOperator() - + ": " + op.getDeliveredPhysicalProperties() + "\n"); - } - } - return changed; - } - - // Gets the index of a child to start top-down data property enforcement. - // If there is a partitioning-compatible child with the operator in opRef, - // start from this child; otherwise, start from child zero. - private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan, - IOptimizationContext context) throws AlgebricksException { - IPhysicalPropertiesVector[] reqdProperties = null; - if (pr != null) { - reqdProperties = pr.getRequiredProperties(); - } - - List<IPartitioningProperty> deliveredPartitioningPropertiesFromChildren = new ArrayList<>(); - for (Mutable<ILogicalOperator> childRef : op.getInputs()) { - AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue(); - deliveredPartitioningPropertiesFromChildren - .add(child.getDeliveredPhysicalProperties().getPartitioningProperty()); - } - int partitioningCompatibleChild = 0; - for (int i = 0; i < op.getInputs().size(); i++) { - IPartitioningProperty deliveredPropertyFromChild = deliveredPartitioningPropertiesFromChildren.get(i); - if (reqdProperties == null || reqdProperties[i] == null - || reqdProperties[i].getPartitioningProperty() == null || deliveredPropertyFromChild == null - || reqdProperties[i].getPartitioningProperty() - .getPartitioningType() != deliveredPartitioningPropertiesFromChildren.get(i) - .getPartitioningType()) { - continue; - } - IPartitioningProperty requiredPropertyForChild = reqdProperties[i].getPartitioningProperty(); - // If child i's delivered partitioning property already satisfies the required property, stop and return the child index. - if (PropertiesUtil.matchPartitioningProps(requiredPropertyForChild, deliveredPropertyFromChild, true)) { - partitioningCompatibleChild = i; - break; - } - } - return partitioningCompatibleChild; - } - private boolean physOptimizeOp(Mutable<ILogicalOperator> opRef, IPhysicalPropertiesVector required, boolean nestedPlan, IOptimizationContext context) throws AlgebricksException { @@ -357,6 +303,60 @@ physOptimizeOp(opRef, required, nestedPlan, context); } return changed; + } + + private boolean physOptimizePlan(ILogicalPlan plan, IPhysicalPropertiesVector pvector, boolean nestedPlan, + IOptimizationContext context) throws AlgebricksException { + boolean loggerTraceEnabled = AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled(); + boolean changed = false; + for (Mutable<ILogicalOperator> root : plan.getRoots()) { + if (physOptimizeOp(root, pvector, nestedPlan, context)) { + changed = true; + } + AbstractLogicalOperator op = (AbstractLogicalOperator) root.getValue(); + op.computeDeliveredPhysicalProperties(context); + if (loggerTraceEnabled) { + AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">>>> Structural properties for " + op.getPhysicalOperator() + + ": " + op.getDeliveredPhysicalProperties() + "\n"); + } + } + return changed; + } + + // Gets the index of a child to start top-down data property enforcement. + // If there is a partitioning-compatible child with the operator in opRef, + // start from this child; otherwise, start from child zero. + private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan, + IOptimizationContext context) throws AlgebricksException { + IPhysicalPropertiesVector[] reqdProperties = null; + if (pr != null) { + reqdProperties = pr.getRequiredProperties(); + } + + List<IPartitioningProperty> deliveredPartitioningPropertiesFromChildren = new ArrayList<>(); + for (Mutable<ILogicalOperator> childRef : op.getInputs()) { + AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue(); + deliveredPartitioningPropertiesFromChildren + .add(child.getDeliveredPhysicalProperties().getPartitioningProperty()); + } + int partitioningCompatibleChild = 0; + for (int i = 0; i < op.getInputs().size(); i++) { + IPartitioningProperty deliveredPropertyFromChild = deliveredPartitioningPropertiesFromChildren.get(i); + if (reqdProperties == null || reqdProperties[i] == null + || reqdProperties[i].getPartitioningProperty() == null || deliveredPropertyFromChild == null + || reqdProperties[i].getPartitioningProperty() + .getPartitioningType() != deliveredPartitioningPropertiesFromChildren.get(i) + .getPartitioningType()) { + continue; + } + IPartitioningProperty requiredPropertyForChild = reqdProperties[i].getPartitioningProperty(); + // If child i's delivered partitioning property already satisfies the required property, stop and return the child index. + if (PropertiesUtil.matchPartitioningProps(requiredPropertyForChild, deliveredPropertyFromChild, true)) { + partitioningCompatibleChild = i; + break; + } + } + return partitioningCompatibleChild; } private IPhysicalPropertiesVector newPropertiesDiff(AbstractLogicalOperator newChild, @@ -888,13 +888,13 @@ return forwardOperator; } - private boolean allAreOrderProps(List<ILocalStructuralProperty> cldLocals) { - for (ILocalStructuralProperty lsp : cldLocals) { - if (lsp.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) { + private boolean allAreOrderProps(List<ILocalStructuralProperty> childLocalProperties) { + for (ILocalStructuralProperty childLocalProperty : childLocalProperties) { + if (childLocalProperty.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) { return false; } } - return !cldLocals.isEmpty(); + return !childLocalProperties.isEmpty(); } private void printOp(AbstractLogicalOperator op) throws AlgebricksException { @@ -927,7 +927,7 @@ throws AlgebricksException { ILogicalOperator oldOp = opRef.getValue(); opRef.setValue(newOp); - newOp.getInputs().add(new MutableObject<ILogicalOperator>(oldOp)); + newOp.getInputs().add(new MutableObject<>(oldOp)); newOp.recomputeSchema(); newOp.computeDeliveredPhysicalProperties(context); context.computeAndSetTypeEnvironmentForOperator(newOp); diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java index c437619..0b1c0a2 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java @@ -69,7 +69,7 @@ } } - private synchronized boolean allPartitionsAdded() { + private boolean allPartitionsAdded() { for (int i = 0; i < partitions.length; i++) { if (partitions[i] == null) { return false; diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java index 2646c94..6aa305b 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java @@ -24,6 +24,7 @@ import org.apache.hyracks.api.comm.IFrameReader; import org.apache.hyracks.api.exceptions.HyracksDataException; +// TODO(ali): consider sort-concat-merge as an alternative. public class SequentialMergeFrameReader implements IFrameReader { private final int numSenders; private final IPartitionBatchManager partitionBatchManager; -- To view, visit https://asterix-gerrit.ics.uci.edu/3005 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ie37e03b6fc6e55fc21f8324c0f09a7fa05b51769 Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]>
