working mega overlapping plan.
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/0f533e8b Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/0f533e8b Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/0f533e8b Branch: refs/heads/ecarm002/interval_join_merge Commit: 0f533e8bbbac7cf633d87013d71370114970c959 Parents: aea7fe8 Author: Preston Carman <prest...@apache.org> Authored: Tue Jul 5 17:56:26 2016 -0700 Committer: Preston Carman <prest...@apache.org> Committed: Tue Jul 5 17:56:26 2016 -0700 ---------------------------------------------------------------------- .../IntervalLocalRangeSplitterOperator.java | 2 +- .../IntervalLocalRangeOperatorDescriptor.java | 13 +- .../IntervalLocalRangeSplitterPOperator.java | 38 ++-- .../asterix/optimizer/base/RuleCollections.java | 1 + .../rules/IntervalSplitPartitioningRule.java | 201 ++++++++++++------- .../physical/AbstractExchangePOperator.java | 1 + .../physical/OneToOneExchangePOperator.java | 5 +- .../algebricks/core/jobgen/impl/JobBuilder.java | 28 +-- 8 files changed, 173 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java index 9ae9f7d..181e0fa 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java @@ -54,7 +54,7 @@ public class IntervalLocalRangeSplitterOperator extends AbstractExtensibleLogica @Override public String toString() { - return "IntervalLocalRangeSplitterOperator"; + return "IntervalLocalRangeSplitterOperator " + joinKeyLogicalVars; } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java index 392bf43..ca44f78 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java @@ -45,20 +45,21 @@ public class IntervalLocalRangeOperatorDescriptor extends AbstractOperatorDescri private static final long serialVersionUID = 1L; private static final int PARTITION_ACTIVITY_ID = 0; + private static final int OUTPUT_ARITY = 3; + private static final int INPUT_STARTS = 0; private static final int INPUT_COVERS = 2; private static final int INPUT_ENDS = 1; -// private static final int INPUT_STARTS = 0; -// private static final int INPUT_COVERS = 0; -// private static final int INPUT_ENDS = 0; - private final int key; private final IRangeMap rangeMap; public IntervalLocalRangeOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys, RecordDescriptor recordDescriptor, IRangeMap rangeMap) { - super(spec, 1, 3); + super(spec, 1, OUTPUT_ARITY); + for (int i = 0; i < outputArity; i++) { + recordDescriptors[i] = recordDescriptor; + } key = keys[0]; this.rangeMap = rangeMap; } @@ -105,7 +106,7 @@ public class IntervalLocalRangeOperatorDescriptor extends AbstractOperatorDescri @Override public void flush() throws HyracksDataException { for (int i = 0; i < getOutputArity(); i++) { - resultAppender[i].flush(writers[i]); + FrameUtils.flushFrame(resultAppender[i].getBuffer(), writers[i]); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java index 1150b91..ced06aa 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java @@ -21,6 +21,7 @@ package org.apache.asterix.algebra.operators.physical; import java.util.List; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; @@ -28,6 +29,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator; import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; @@ -55,9 +57,13 @@ public class IntervalLocalRangeSplitterPOperator extends AbstractPhysicalOperato } @Override - public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { - AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); - deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone(); + public String toString() { + return "IntervalLocalRangeSplitterPOperator " + intervalFields; + } + + @Override + public boolean isMicroOperator() { + return false; } @Override @@ -67,35 +73,37 @@ public class IntervalLocalRangeSplitterPOperator extends AbstractPhysicalOperato } @Override - public String toString() { - return "IntervalLocalRangeSplitterPOperator " + intervalFields; + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone(); } @Override public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, - IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) throws AlgebricksException { - int[] keys = JobGenHelper.variablesToFieldIndexes(intervalFields, inputSchemas[0]); - IOperatorDescriptorRegistry spec = builder.getJobSpec(); - RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, - context); + RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), + propagatedSchema, context); - IOperatorDescriptor opDesc = new IntervalLocalRangeOperatorDescriptor(spec, keys, recordDescriptor, rangeMap); + int[] keys = JobGenHelper.variablesToFieldIndexes(intervalFields, inputSchemas[0]); + + IOperatorDescriptor opDesc = new IntervalLocalRangeOperatorDescriptor(spec, keys, recDescriptor, rangeMap); contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc); - // and contribute one edge from its child ILogicalOperator src = op.getInputs().get(0).getValue(); builder.contributeGraphEdge(src, 0, op, 0); } @Override - public boolean isMicroOperator() { - return false; + public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) { + ReplicateOperator rop = (ReplicateOperator) op; + int[] inputDependencyLabels = new int[] { 0 }; + int[] outputDependencyLabels = new int[rop.getOutputArity()]; // filled with 0's + return new Pair<>(inputDependencyLabels, outputDependencyLabels); } @Override public boolean expensiveThanMaterialization() { return false; } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java index 5fcfc94..9a3a125 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java @@ -342,4 +342,5 @@ public final class RuleCollections { prepareForJobGenRewrites.add(new SweepIllegalNonfunctionalFunctions()); return prepareForJobGenRewrites; } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java index a6f49f9..2772e68 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java @@ -51,12 +51,14 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogi import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.MaterializePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.MergeJoinPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator; @@ -122,17 +124,19 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule { public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { ILogicalOperator op = opRef.getValue(); + if (context.checkIfInDontApplySet(this, op)) { + return false; + } if (!isIntervalJoin(op)) { return false; } - InnerJoinOperator startsJoin = (InnerJoinOperator) op; - ExecutionMode mode = startsJoin.getExecutionMode(); - Mutable<ILogicalOperator> startsJoinRef = opRef; + InnerJoinOperator originalIntervalJoin = (InnerJoinOperator) op; + ExecutionMode mode = originalIntervalJoin.getExecutionMode(); Set<LogicalVariable> localLiveVars = new ListSet<>(); - VariableUtilities.getLiveVariables(op, localLiveVars); + VariableUtilities.getLiveVariables(originalIntervalJoin, localLiveVars); - Mutable<ILogicalOperator> leftSortedInput = op.getInputs().get(0); - Mutable<ILogicalOperator> rightSortedInput = op.getInputs().get(1); + Mutable<ILogicalOperator> leftSortedInput = originalIntervalJoin.getInputs().get(0); + Mutable<ILogicalOperator> rightSortedInput = originalIntervalJoin.getInputs().get(1); if (leftSortedInput.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE && rightSortedInput.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) { return false; @@ -160,14 +164,14 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule { // TODO check physical join // Interval local partition operators - LogicalVariable leftJoinKey = getJoinKey(startsJoin.getCondition().getValue(), LEFT); - LogicalVariable rightJoinKey = getJoinKey(startsJoin.getCondition().getValue(), RIGHT); + LogicalVariable leftJoinKey = getJoinKey(originalIntervalJoin.getCondition().getValue(), LEFT); + LogicalVariable rightJoinKey = getJoinKey(originalIntervalJoin.getCondition().getValue(), RIGHT); if (leftJoinKey == null || rightJoinKey == null) { return false; } - ILogicalOperator leftIntervalSplit = getIntervalSplitOperator(leftSortKey, rightRangeMap, mode); + ReplicateOperator leftIntervalSplit = getIntervalSplitOperator(leftSortKey, leftRangeMap, mode); Mutable<ILogicalOperator> leftIntervalSplitRef = new MutableObject<>(leftIntervalSplit); - ILogicalOperator rightIntervalSplit = getIntervalSplitOperator(rightSortKey, rightRangeMap, mode); + ReplicateOperator rightIntervalSplit = getIntervalSplitOperator(rightSortKey, rightRangeMap, mode); Mutable<ILogicalOperator> rightIntervalSplitRef = new MutableObject<>(rightIntervalSplit); // Replicate operators @@ -177,20 +181,36 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule { Mutable<ILogicalOperator> rightStartsSplitRef = new MutableObject<>(rightStartsSplit); // Covers Join Operator - ILogicalOperator leftCoversJoin = getNestedLoop(startsJoin.getCondition(), context, mode); + ILogicalOperator leftCoversJoin = getNestedLoop(originalIntervalJoin.getCondition(), context, mode); Mutable<ILogicalOperator> leftCoversJoinRef = new MutableObject<>(leftCoversJoin); - ILogicalOperator rightCoversJoin = getNestedLoop(startsJoin.getCondition(), context, mode); + ILogicalOperator rightCoversJoin = getNestedLoop(originalIntervalJoin.getCondition(), context, mode); Mutable<ILogicalOperator> rightCoversJoinRef = new MutableObject<>(rightCoversJoin); // Ends Join Operator - ILogicalOperator leftEndsJoin = getIntervalJoin(startsJoin, context, mode); - ILogicalOperator rightEndsJoin = getIntervalJoin(startsJoin, context, mode); - if (leftEndsJoin == null || rightEndsJoin == null) { + ILogicalOperator startsJoin = getIntervalJoin(originalIntervalJoin, context, mode); + ILogicalOperator leftEndsJoin = getIntervalJoin(originalIntervalJoin, context, mode); + ILogicalOperator rightEndsJoin = getIntervalJoin(originalIntervalJoin, context, mode); + if (startsJoin == null || leftEndsJoin == null || rightEndsJoin == null) { return false; } + Mutable<ILogicalOperator> startsJoinRef = new MutableObject<>(startsJoin); Mutable<ILogicalOperator> leftEndsJoinRef = new MutableObject<>(leftEndsJoin); Mutable<ILogicalOperator> rightEndsJoinRef = new MutableObject<>(rightEndsJoin); + // Materialize Operator + ILogicalOperator leftMaterialize0 = getMaterializeOperator(mode); + Mutable<ILogicalOperator> leftMaterialize0Ref = new MutableObject<>(leftMaterialize0); + ILogicalOperator leftMaterialize1 = getMaterializeOperator(mode); + Mutable<ILogicalOperator> leftMaterialize1Ref = new MutableObject<>(leftMaterialize1); + ILogicalOperator leftMaterialize2 = getMaterializeOperator(mode); + Mutable<ILogicalOperator> leftMaterialize2Ref = new MutableObject<>(leftMaterialize2); + ILogicalOperator rightMaterialize0 = getMaterializeOperator(mode); + Mutable<ILogicalOperator> rightMaterialize0Ref = new MutableObject<>(rightMaterialize0); + ILogicalOperator rightMaterialize1 = getMaterializeOperator(mode); + Mutable<ILogicalOperator> rightMaterialize1Ref = new MutableObject<>(rightMaterialize1); + ILogicalOperator rightMaterialize2 = getMaterializeOperator(mode); + Mutable<ILogicalOperator> rightMaterialize2Ref = new MutableObject<>(rightMaterialize2); + // Union All Operator ILogicalOperator union1 = getUnionOperator(localLiveVars, mode); Mutable<ILogicalOperator> union1Ref = new MutableObject<>(union1); @@ -201,59 +221,84 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule { ILogicalOperator union4 = getUnionOperator(localLiveVars, mode); Mutable<ILogicalOperator> union4Ref = new MutableObject<>(union4); + // Remove old path + originalIntervalJoin.getInputs().clear(); + // Connect main path connectOperators(leftIntervalSplitRef, leftSortedInput, context); - context.computeAndSetTypeEnvironmentForOperator(leftIntervalSplit); - connectOperators(leftStartsSplitRef, leftIntervalSplitRef, context); - context.computeAndSetTypeEnvironmentForOperator(leftStartsSplit); + context.computeAndSetTypeEnvironmentForOperator(leftIntervalSplitRef.getValue()); + connectOperators(leftMaterialize0Ref, leftIntervalSplitRef, context); + context.computeAndSetTypeEnvironmentForOperator(leftMaterialize0Ref.getValue()); + connectOperators(leftMaterialize1Ref, leftIntervalSplitRef, context); + context.computeAndSetTypeEnvironmentForOperator(leftMaterialize1Ref.getValue()); + connectOperators(leftMaterialize2Ref, leftIntervalSplitRef, context); + context.computeAndSetTypeEnvironmentForOperator(leftMaterialize2Ref.getValue()); + + connectOperators(leftStartsSplitRef, leftMaterialize0Ref, context); + context.computeAndSetTypeEnvironmentForOperator(leftStartsSplitRef.getValue()); + connectOperators(rightIntervalSplitRef, rightSortedInput, context); - context.computeAndSetTypeEnvironmentForOperator(rightIntervalSplit); - connectOperators(rightStartsSplitRef, rightIntervalSplitRef, context); - context.computeAndSetTypeEnvironmentForOperator(rightStartsSplit); - updateConnections(startsJoinRef, leftStartsSplitRef, context, LEFT); - updateConnections(startsJoinRef, rightStartsSplitRef, context, RIGHT); - context.computeAndSetTypeEnvironmentForOperator(startsJoin); - leftStartsSplit.getOutputs().add(startsJoinRef); - rightStartsSplit.getOutputs().add(startsJoinRef); + context.computeAndSetTypeEnvironmentForOperator(rightIntervalSplitRef.getValue()); + connectOperators(rightMaterialize0Ref, rightIntervalSplitRef, context); + context.computeAndSetTypeEnvironmentForOperator(rightMaterialize0Ref.getValue()); + connectOperators(rightMaterialize1Ref, rightIntervalSplitRef, context); + context.computeAndSetTypeEnvironmentForOperator(rightMaterialize1Ref.getValue()); + connectOperators(rightMaterialize2Ref, rightIntervalSplitRef, context); + context.computeAndSetTypeEnvironmentForOperator(rightMaterialize2Ref.getValue()); + + connectOperators(rightStartsSplitRef, rightMaterialize0Ref, context); + context.computeAndSetTypeEnvironmentForOperator(rightStartsSplitRef.getValue()); + + // Connect left and right starts path + connectOperators(startsJoinRef, leftStartsSplitRef, context); + connectOperators(startsJoinRef, rightStartsSplitRef, context); + context.computeAndSetTypeEnvironmentForOperator(startsJoinRef.getValue()); // Connect left ends path - connectOperators(leftEndsJoinRef, leftIntervalSplitRef, context); + connectOperators(leftEndsJoinRef, leftMaterialize1Ref, context); connectOperators(leftEndsJoinRef, rightStartsSplitRef, context); - context.computeAndSetTypeEnvironmentForOperator(leftEndsJoin); - connectOperators(union1Ref, leftEndsJoinRef, context); + context.computeAndSetTypeEnvironmentForOperator(leftEndsJoinRef.getValue()); connectOperators(union1Ref, startsJoinRef, context); - context.computeAndSetTypeEnvironmentForOperator(union1); - rightStartsSplit.getOutputs().add(leftEndsJoinRef); + connectOperators(union1Ref, leftEndsJoinRef, context); + context.computeAndSetTypeEnvironmentForOperator(union1Ref.getValue()); // Connect left covers path - connectOperators(leftCoversJoinRef, leftIntervalSplitRef, context); + connectOperators(leftCoversJoinRef, leftMaterialize2Ref, context); connectOperators(leftCoversJoinRef, rightStartsSplitRef, context); - context.computeAndSetTypeEnvironmentForOperator(leftCoversJoin); + context.computeAndSetTypeEnvironmentForOperator(leftCoversJoinRef.getValue()); connectOperators(union2Ref, union1Ref, context); connectOperators(union2Ref, leftCoversJoinRef, context); - context.computeAndSetTypeEnvironmentForOperator(union2); - rightStartsSplit.getOutputs().add(leftCoversJoinRef); + context.computeAndSetTypeEnvironmentForOperator(union2Ref.getValue()); // Connect right ends path connectOperators(rightEndsJoinRef, leftStartsSplitRef, context); - connectOperators(rightEndsJoinRef, rightIntervalSplitRef, context); - context.computeAndSetTypeEnvironmentForOperator(rightEndsJoin); + connectOperators(rightEndsJoinRef, rightMaterialize1Ref, context); + context.computeAndSetTypeEnvironmentForOperator(rightEndsJoinRef.getValue()); connectOperators(union3Ref, union2Ref, context); connectOperators(union3Ref, rightEndsJoinRef, context); - context.computeAndSetTypeEnvironmentForOperator(union3); - leftStartsSplit.getOutputs().add(rightEndsJoinRef); + context.computeAndSetTypeEnvironmentForOperator(union3Ref.getValue()); // Connect right covers path connectOperators(rightCoversJoinRef, leftStartsSplitRef, context); - connectOperators(rightCoversJoinRef, rightIntervalSplitRef, context); - context.computeAndSetTypeEnvironmentForOperator(rightCoversJoin); + connectOperators(rightCoversJoinRef, rightMaterialize2Ref, context); + context.computeAndSetTypeEnvironmentForOperator(rightCoversJoinRef.getValue()); connectOperators(union4Ref, union3Ref, context); connectOperators(union4Ref, rightCoversJoinRef, context); - context.computeAndSetTypeEnvironmentForOperator(union4); - leftStartsSplit.getOutputs().add(rightCoversJoinRef); + context.computeAndSetTypeEnvironmentForOperator(union4Ref.getValue()); // Update context - opRef.setValue(union4); + opRef.setValue(union4Ref.getValue()); + + context.addToDontApplySet(this, startsJoin); + context.addToDontApplySet(this, leftCoversJoin); + context.addToDontApplySet(this, rightCoversJoin); + context.addToDontApplySet(this, leftCoversJoin); + context.addToDontApplySet(this, rightEndsJoin); + + context.addToDontApplySet(this, union1); + context.addToDontApplySet(this, union2); + context.addToDontApplySet(this, union3); + context.addToDontApplySet(this, union4); return true; } @@ -274,7 +319,7 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule { if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { return null; } - // Check whether the function is a function we want to push. + // Check whether the function is a function we want to alter. AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr; if (!intervalJoinConditions.contains(funcExpr.getFunctionIdentifier())) { return null; @@ -286,33 +331,26 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule { return null; } - private void connectOperators(Mutable<ILogicalOperator> from, Mutable<ILogicalOperator> to, + private void connectOperators(Mutable<ILogicalOperator> child, Mutable<ILogicalOperator> parent, IOptimizationContext context) throws AlgebricksException { - if (to.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) { - ILogicalOperator eo = getExchangeOperator(from.getValue().getExecutionMode()); + if (parent.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) { + ILogicalOperator eo = getExchangeOperator(child.getValue().getExecutionMode()); Mutable<ILogicalOperator> eoRef = new MutableObject<>(eo); - eo.getInputs().add(to); - from.getValue().getInputs().add(eoRef); - context.computeAndSetTypeEnvironmentForOperator(eo); - context.computeAndSetTypeEnvironmentForOperator(from.getValue()); - } else { - from.getValue().getInputs().add(to); - context.computeAndSetTypeEnvironmentForOperator(from.getValue()); - } - } - - private void updateConnections(Mutable<ILogicalOperator> from, Mutable<ILogicalOperator> to, - IOptimizationContext context, int index) throws AlgebricksException { - if (from.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) { - ILogicalOperator eo = getExchangeOperator(from.getValue().getExecutionMode()); - Mutable<ILogicalOperator> eoRef = new MutableObject<>(eo); - eo.getInputs().add(to); - from.getValue().getInputs().set(index, eoRef); - context.computeAndSetTypeEnvironmentForOperator(from.getValue()); + eo.getInputs().add(parent); + if (parent.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) { + ReplicateOperator ro = (ReplicateOperator) parent.getValue(); + ro.getOutputs().add(eoRef); + } + child.getValue().getInputs().add(eoRef); context.computeAndSetTypeEnvironmentForOperator(eo); + context.computeAndSetTypeEnvironmentForOperator(child.getValue()); } else { - from.getValue().getInputs().set(index, to); - context.computeAndSetTypeEnvironmentForOperator(from.getValue()); + if (parent.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) { + ReplicateOperator ro = (ReplicateOperator) parent.getValue(); + ro.getOutputs().add(child); + } + child.getValue().getInputs().add(parent); + context.computeAndSetTypeEnvironmentForOperator(child.getValue()); } } @@ -323,25 +361,28 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule { return eo; } - private ILogicalOperator getIntervalSplitOperator(LogicalVariable key, IRangeMap rangeMap, ExecutionMode mode) { + private ReplicateOperator getIntervalSplitOperator(LogicalVariable key, IRangeMap rangeMap, ExecutionMode mode) { List<LogicalVariable> joinKeyLogicalVars = new ArrayList<>(); joinKeyLogicalVars.add(key); //create the logical and physical operator - IntervalLocalRangeSplitterOperator splitOperator = new IntervalLocalRangeSplitterOperator(joinKeyLogicalVars); + boolean[] flags = new boolean[2]; + for (int i = 0; i < flags.length; ++i) { + flags[i] = true; + } + ReplicateOperator splitOperator = new ReplicateOperator(flags.length, flags); + // ReplicatePOperator splitPOperator = new ReplicatePOperator(); IntervalLocalRangeSplitterPOperator splitPOperator = new IntervalLocalRangeSplitterPOperator(joinKeyLogicalVars, rangeMap); splitOperator.setPhysicalOperator(splitPOperator); splitOperator.setExecutionMode(mode); - - //create ExtensionOperator and put the commitOperator in it. - ExtensionOperator extensionOperator = new ExtensionOperator(splitOperator); - extensionOperator.setPhysicalOperator(splitPOperator); - extensionOperator.setExecutionMode(mode); - return extensionOperator; + return splitOperator; } private ReplicateOperator getReplicateOperator(int outputArity, ExecutionMode mode) { boolean[] flags = new boolean[outputArity]; + for (int i = 0; i < flags.length; ++i) { + flags[i] = true; + } ReplicateOperator ro = new ReplicateOperator(flags.length, flags); ReplicatePOperator rpo = new ReplicatePOperator(); ro.setPhysicalOperator(rpo); @@ -349,6 +390,14 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule { return ro; } + private ILogicalOperator getMaterializeOperator(ExecutionMode mode) { + MaterializeOperator mo = new MaterializeOperator(); + MaterializePOperator mpo = new MaterializePOperator(false); + mo.setPhysicalOperator(mpo); + mo.setExecutionMode(mode); + return mo; + } + private ILogicalOperator getNestedLoop(Mutable<ILogicalExpression> condition, IOptimizationContext context, ExecutionMode mode) { int memoryJoinSize = context.getPhysicalOptimizationConfig().getMaxFramesForJoin(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java index aad0cf3..799a6af 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java @@ -29,6 +29,7 @@ import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; public abstract class AbstractExchangePOperator extends AbstractPhysicalOperator { + @Override public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) throws AlgebricksException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java index 818e1ec..083e4d3 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java @@ -35,9 +35,6 @@ import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; public class OneToOneExchangePOperator extends AbstractExchangePOperator { - public OneToOneExchangePOperator() { - } - @Override public PhysicalOperatorTag getOperatorTag() { return PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE; @@ -59,7 +56,7 @@ public class OneToOneExchangePOperator extends AbstractExchangePOperator { public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec, ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) { IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec); - return new Pair<IConnectorDescriptor, TargetConstraint>(conn, TargetConstraint.SAME_COUNT); + return new Pair<>(conn, TargetConstraint.SAME_COUNT); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0f533e8b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java index af40250..a1c6164 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java @@ -49,19 +49,19 @@ public class JobBuilder implements IHyracksJobBuilder { private final AlgebricksAbsolutePartitionConstraint clusterLocations; private final AlgebricksAbsolutePartitionConstraint countOneLocation; - private final Map<ILogicalOperator, ArrayList<ILogicalOperator>> outEdges = new HashMap<ILogicalOperator, ArrayList<ILogicalOperator>>(); - private final Map<ILogicalOperator, ArrayList<ILogicalOperator>> inEdges = new HashMap<ILogicalOperator, ArrayList<ILogicalOperator>>(); - private final Map<ILogicalOperator, Pair<IConnectorDescriptor, TargetConstraint>> connectors = new HashMap<ILogicalOperator, Pair<IConnectorDescriptor, TargetConstraint>>(); + private final Map<ILogicalOperator, ArrayList<ILogicalOperator>> outEdges = new HashMap<>(); + private final Map<ILogicalOperator, ArrayList<ILogicalOperator>> inEdges = new HashMap<>(); + private final Map<ILogicalOperator, Pair<IConnectorDescriptor, TargetConstraint>> connectors = new HashMap<>(); - private final Map<ILogicalOperator, Pair<IPushRuntimeFactory, RecordDescriptor>> microOps = new HashMap<ILogicalOperator, Pair<IPushRuntimeFactory, RecordDescriptor>>(); - private final Map<IPushRuntimeFactory, ILogicalOperator> revMicroOpMap = new HashMap<IPushRuntimeFactory, ILogicalOperator>(); - private final Map<ILogicalOperator, IOperatorDescriptor> hyracksOps = new HashMap<ILogicalOperator, IOperatorDescriptor>(); - private final Map<ILogicalOperator, AlgebricksPartitionConstraint> pcForMicroOps = new HashMap<ILogicalOperator, AlgebricksPartitionConstraint>(); + private final Map<ILogicalOperator, Pair<IPushRuntimeFactory, RecordDescriptor>> microOps = new HashMap<>(); + private final Map<IPushRuntimeFactory, ILogicalOperator> revMicroOpMap = new HashMap<>(); + private final Map<ILogicalOperator, IOperatorDescriptor> hyracksOps = new HashMap<>(); + private final Map<ILogicalOperator, AlgebricksPartitionConstraint> pcForMicroOps = new HashMap<>(); - private final Map<ILogicalOperator, Integer> algebraicOpBelongingToMetaAsterixOp = new HashMap<ILogicalOperator, Integer>(); - private final Map<Integer, List<Pair<IPushRuntimeFactory, RecordDescriptor>>> metaAsterixOpSkeletons = new HashMap<Integer, List<Pair<IPushRuntimeFactory, RecordDescriptor>>>(); - private final Map<Integer, AlgebricksMetaOperatorDescriptor> metaAsterixOps = new HashMap<Integer, AlgebricksMetaOperatorDescriptor>(); - private final Map<IOperatorDescriptor, AlgebricksPartitionConstraint> partitionConstraintMap = new HashMap<IOperatorDescriptor, AlgebricksPartitionConstraint>(); + private final Map<ILogicalOperator, Integer> algebraicOpBelongingToMetaAsterixOp = new HashMap<>(); + private final Map<Integer, List<Pair<IPushRuntimeFactory, RecordDescriptor>>> metaAsterixOpSkeletons = new HashMap<>(); + private final Map<Integer, AlgebricksMetaOperatorDescriptor> metaAsterixOps = new HashMap<>(); + private final Map<IOperatorDescriptor, AlgebricksPartitionConstraint> partitionConstraintMap = new HashMap<>(); private int aodCounter = 0; @@ -123,14 +123,14 @@ public class JobBuilder implements IHyracksJobBuilder { int destInputIndex) { ArrayList<ILogicalOperator> outputs = outEdges.get(src); if (outputs == null) { - outputs = new ArrayList<ILogicalOperator>(); + outputs = new ArrayList<>(); outEdges.put(src, outputs); } addAtPos(outputs, dest, srcOutputIndex); ArrayList<ILogicalOperator> inp = inEdges.get(dest); if (inp == null) { - inp = new ArrayList<ILogicalOperator>(); + inp = new ArrayList<>(); inEdges.put(dest, inp); } addAtPos(inp, src, destInputIndex); @@ -270,7 +270,7 @@ public class JobBuilder implements IHyracksJobBuilder { } private Map<IConnectorDescriptor, TargetConstraint> setupConnectors() throws AlgebricksException { - Map<IConnectorDescriptor, TargetConstraint> tgtConstraints = new HashMap<IConnectorDescriptor, TargetConstraint>(); + Map<IConnectorDescriptor, TargetConstraint> tgtConstraints = new HashMap<>(); for (ILogicalOperator exchg : connectors.keySet()) { ILogicalOperator inOp = inEdges.get(exchg).get(0); ILogicalOperator outOp = outEdges.get(exchg).get(0);