http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java index d6cd363..9ca536b 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java @@ -82,7 +82,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { super(ctx, partition, status, locks, leftRd, rightRd); this.point = imjcf.isOrderAsc() ? EndPointIndexItem.START_POINT : EndPointIndexItem.END_POINT; - this.imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition); + this.imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, null); this.leftKey = leftKeys[0]; this.rightKey = rightKeys[0];
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java index 0dd358c..8c4c43d 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java @@ -37,12 +37,13 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; import org.apache.hyracks.dataflow.std.base.AbstractStateObject; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; +import org.apache.hyracks.dataflow.std.base.RangeId; +import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState; public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDescriptor { private static final long serialVersionUID = 1L; @@ -65,14 +66,14 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes private final int probeKey; private final int buildKey; private final IIntervalMergeJoinCheckerFactory imjcf; - private final IRangeMap rangeMap; + private final RangeId rangeId; private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName()); public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, long leftTupleCount, long rightTupleCount, long leftMaxDuration, long rightMaxDuration, int avgTuplesPerFrame, int[] leftKeys, int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf, - IRangeMap rangeMap) { + RangeId rangeId) { super(spec, 2, 1); this.memsize = memsize; this.buildKey = leftKeys[0]; @@ -86,7 +87,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes this.probeKeys = rightKeys; recordDescriptors[0] = recordDescriptor; this.imjcf = imjcf; - this.rangeMap = rangeMap; + this.rangeId = rangeId; } @Override @@ -137,8 +138,6 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0); final int k = IntervalPartitionUtil.determineK(buildTupleCount, buildMaxDuration, probeTupleCount, probeMaxDuration, avgTuplesPerFrame); - final long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeMap, partition); - final long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeMap, partition); return new AbstractUnaryInputSinkOperatorNodePushable() { private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState( @@ -160,6 +159,10 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes + ") with 3."); } } + + RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(rangeId); + long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(), partition); + long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition); ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, k, partitionStart, partitionEnd).createPartitioner(); ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, k, partitionStart, @@ -168,7 +171,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes state.partition = partition; state.intervalPartitions = IntervalPartitionUtil.getMaxPartitions(state.k); state.memoryForJoin = memsize; - IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition); + IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition, rangeState.getRangeMap()); state.ipj = new IntervalPartitionJoiner(ctx, state.memoryForJoin, state.k, state.intervalPartitions, BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, buildHpc, probeHpc); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java index 4c80ba8..415feae 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java @@ -28,8 +28,8 @@ import java.util.Map.Entry; import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; +import org.apache.hyracks.api.dataflow.value.IRangeMap; import org.apache.hyracks.data.std.primitive.LongPointable; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; public class IntervalPartitionUtil { public static final double C_CPU = 0.5; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java index 328697d..dfd36a2 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java @@ -33,7 +33,7 @@ import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; -public interface IOptimizationContext extends ITypingContext, IVariableContext { +public interface IOptimizationContext extends ITypingContext, IVariableContext, IRangeContext { @Override public IMetadataProvider<?, ?> getMetadataProvider(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IRangeContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IRangeContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IRangeContext.java new file mode 100644 index 0000000..9a70b7a --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IRangeContext.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.core.algebra.base; + +import org.apache.hyracks.dataflow.std.base.RangeId; + +public interface IRangeContext { + + public RangeId newRangeId(); + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java index 5a3bc98..58c7a80 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java @@ -28,16 +28,23 @@ import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagation import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; +import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.dataflow.std.base.RangeId; public class RangeForwardOperator extends AbstractLogicalOperator { + private RangeId rangeId; private IRangeMap rangeMap; - public RangeForwardOperator(IRangeMap rangeMap) { + public RangeForwardOperator(RangeId rangeId, IRangeMap rangeMap) { + this.rangeId = rangeId; this.rangeMap = rangeMap; } + public RangeId getRangeId() { + return rangeId; + } + public IRangeMap getRangeMap() { return rangeMap; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java index 8cb739a..7a5a89a 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java @@ -410,7 +410,7 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor public ILogicalOperator visitRangeForwardOperator(RangeForwardOperator op, ILogicalOperator arg) throws AlgebricksException { // TODO fix deep copy of range map - RangeForwardOperator opCopy = new RangeForwardOperator(op.getRangeMap()); + RangeForwardOperator opCopy = new RangeForwardOperator(op.getRangeId(), op.getRangeMap()); deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy); return opCopy; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java index 6337187..fea0431 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java @@ -390,7 +390,7 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical @Override public ILogicalOperator visitRangeForwardOperator(RangeForwardOperator op, Void arg) throws AlgebricksException { - return new RangeForwardOperator(op.getRangeMap()); + return new RangeForwardOperator(op.getRangeId(), op.getRangeMap()); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java index 34c707b..91dba24 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java @@ -100,7 +100,7 @@ public class HashPartitionExchangePOperator extends AbstractExchangePOperator { } ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories); IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf); - return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null); + return new Pair<>(conn, null); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java index 51f54f6..c24f3c8 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java @@ -44,9 +44,10 @@ import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertie import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType; +import org.apache.hyracks.dataflow.std.base.RangeId; import org.apache.hyracks.dataflow.std.join.IMergeJoinCheckerFactory; import org.apache.hyracks.dataflow.std.join.MergeJoinOperatorDescriptor; @@ -56,23 +57,28 @@ public class MergeJoinPOperator extends AbstractJoinPOperator { protected final List<LogicalVariable> keysLeftBranch; protected final List<LogicalVariable> keysRightBranch; private final IMergeJoinCheckerFactory mjcf; - private IRangeMap rangeMap; + private final RangeId leftRangeId; + private final RangeId rightRangeId; + private final IRangeMap rangeMapHint; private static final Logger LOGGER = Logger.getLogger(MergeJoinPOperator.class.getName()); public MergeJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, List<LogicalVariable> sideLeft, - List<LogicalVariable> sideRight, int memSizeInFrames, IMergeJoinCheckerFactory mjcf, IRangeMap rangeMap) { + List<LogicalVariable> sideRight, int memSizeInFrames, IMergeJoinCheckerFactory mjcf, RangeId leftRangeId, + RangeId rightRangeId, IRangeMap rangeMapHint) { super(kind, partitioningType); this.memSizeInFrames = memSizeInFrames; this.keysLeftBranch = sideLeft; this.keysRightBranch = sideRight; this.mjcf = mjcf; - this.rangeMap = rangeMap; + this.leftRangeId = leftRangeId; + this.rightRangeId = rightRangeId; + this.rangeMapHint = rangeMapHint; LOGGER.fine("MergeJoinPOperator constructed with: JoinKind=" + kind + ", JoinPartitioningType=" + partitioningType + ", List<LogicalVariable>=" + keysLeftBranch + ", List<LogicalVariable>=" + keysRightBranch + ", int memSizeInFrames=" + memSizeInFrames + ", IMergeJoinCheckerFactory mjcf=" - + mjcf + ", IRangeMap rangeMap=" + rangeMap + "."); + + mjcf + ", RangeId leftRangeId=" + leftRangeId + ", RangeId rightRangeId=" + rightRangeId + "."); } public List<LogicalVariable> getKeysLeftBranch() { @@ -87,8 +93,8 @@ public class MergeJoinPOperator extends AbstractJoinPOperator { return mjcf; } - public IRangeMap getRangeMap() { - return rangeMap; + public RangeId getRangeId() { + return leftRangeId; } @Override @@ -107,7 +113,8 @@ public class MergeJoinPOperator extends AbstractJoinPOperator { for (LogicalVariable v : keysLeftBranch) { order.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : OrderKind.DESC)); } - IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, rangeMap, RangePartitioningType.PROJECT); + IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, leftRangeId, RangePartitioningType.PROJECT, + rangeMapHint); List<ILocalStructuralProperty> propsLocal = new ArrayList<>(); propsLocal.add(new LocalOrderProperty(order)); deliveredProperties = new StructuralPropertiesVector(pp, propsLocal); @@ -137,8 +144,10 @@ public class MergeJoinPOperator extends AbstractJoinPOperator { ispRight.add(new LocalOrderProperty(orderRight)); if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) { - ppLeft = new OrderedPartitionedProperty(orderLeft, null, rangeMap, mjcf.getLeftPartitioningType()); - ppRight = new OrderedPartitionedProperty(orderRight, null, rangeMap, mjcf.getRightPartitioningType()); + ppLeft = new OrderedPartitionedProperty(orderLeft, null, leftRangeId, mjcf.getLeftPartitioningType(), + rangeMapHint); + ppRight = new OrderedPartitionedProperty(orderRight, null, rightRangeId, mjcf.getRightPartitioningType(), + rangeMapHint); } pv[0] = new StructuralPropertiesVector(ppLeft, ispLeft); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java index 2324c5e..3ace793 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java @@ -29,18 +29,28 @@ import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalProperties import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; +import org.apache.hyracks.dataflow.std.base.RangeId; import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor; public class RangeForwardPOperator extends AbstractPhysicalOperator { + private RangeId rangeId; private IRangeMap rangeMap; - public RangeForwardPOperator(IRangeMap rangeMap) { + public RangeForwardPOperator(RangeId rangeId, IRangeMap rangeMap) { + // Use when a range hint is provided. + this.rangeId = rangeId; this.rangeMap = rangeMap; } + public RangeForwardPOperator(RangeId rangeId) { + this(rangeId, null); + } + public IRangeMap getRangeMap() { return rangeMap; } @@ -72,7 +82,10 @@ public class RangeForwardPOperator extends AbstractPhysicalOperator { IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) throws AlgebricksException { IOperatorDescriptorRegistry spec = builder.getJobSpec(); - RangeForwardOperatorDescriptor opDesc = new RangeForwardOperatorDescriptor(spec, rangeMap); + RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, + context); + RangeForwardOperatorDescriptor opDesc = new RangeForwardOperatorDescriptor(spec, rangeId, rangeMap, + recordDescriptor); contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc); ILogicalOperator src = op.getInputs().get(0).getValue(); builder.contributeGraphEdge(src, 0, op, 0); @@ -82,4 +95,10 @@ public class RangeForwardPOperator extends AbstractPhysicalOperator { public boolean expensiveThanMaterialization() { return false; } + + @Override + public String toString() { + return getOperatorTag().toString() + " " + rangeId; + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java index 8299f61..c58bb0c 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java @@ -48,25 +48,25 @@ import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory; import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType; +import org.apache.hyracks.dataflow.std.base.RangeId; import org.apache.hyracks.dataflow.std.connectors.MToNRangePartitioningConnectorDescriptor; public class RangePartitionExchangePOperator extends AbstractExchangePOperator { private List<OrderColumn> partitioningFields; private INodeDomain domain; - private IRangeMap rangeMap; + private RangeId rangeId; private RangePartitioningType rangeType; - public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap, + public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, RangeId rangeId, RangePartitioningType rangeType) { this.partitioningFields = partitioningFields; this.domain = domain; - this.rangeMap = rangeMap; + this.rangeId = rangeId; this.rangeType = rangeType; } @@ -83,8 +83,8 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator { return rangeType; } - public IRangeMap getRangeMap() { - return rangeMap; + public RangeId getRangeId() { + return rangeId; } public INodeDomain getDomain() { @@ -94,7 +94,7 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator { @Override public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<OrderColumn>(partitioningFields), domain, - rangeMap, rangeType); + rangeId, rangeType, null); AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties(); List<ILocalStructuralProperty> locals = new ArrayList<>(); @@ -141,15 +141,15 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator { i++; } ITupleRangePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, rangeComps, - rangeMap, rangeType); - IConnectorDescriptor conn = new MToNRangePartitioningConnectorDescriptor(spec, tpcf, sortFields, binaryComps, + rangeType); + IConnectorDescriptor conn = new MToNRangePartitioningConnectorDescriptor(spec, tpcf, rangeId, sortFields, binaryComps, nkcf); return new Pair<>(conn, null); } @Override public String toString() { - return getOperatorTag().toString() + " " + partitioningFields + " " + rangeType; + return getOperatorTag().toString() + " " + partitioningFields + " " + rangeType + " " + rangeId; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java index 9667427..31cd37d 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java @@ -51,25 +51,25 @@ import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory; import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType; +import org.apache.hyracks.dataflow.std.base.RangeId; import org.apache.hyracks.dataflow.std.connectors.MToNRangePartitionMergingConnectorDescriptor; public class RangePartitionMergeExchangePOperator extends AbstractExchangePOperator { private List<OrderColumn> partitioningFields; private INodeDomain domain; - private IRangeMap rangeMap; + private RangeId rangeId; private RangePartitioningType rangeType; public RangePartitionMergeExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, - IRangeMap rangeMap, RangePartitioningType rangeType) { + RangeId rangeId, RangePartitioningType rangeType) { this.partitioningFields = partitioningFields; this.domain = domain; - this.rangeMap = rangeMap; + this.rangeId = rangeId; this.rangeType = rangeType; } @@ -86,8 +86,8 @@ public class RangePartitionMergeExchangePOperator extends AbstractExchangePOpera return rangeType; } - public IRangeMap getRangeMap() { - return rangeMap; + public RangeId getRangeId() { + return rangeId; } public INodeDomain getDomain() { @@ -96,7 +96,7 @@ public class RangePartitionMergeExchangePOperator extends AbstractExchangePOpera @Override public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { - IPartitioningProperty p = new OrderedPartitionedProperty(partitioningFields, domain, rangeMap, rangeType); + IPartitioningProperty p = new OrderedPartitionedProperty(partitioningFields, domain, rangeId, rangeType, null); AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties(); List<ILocalStructuralProperty> locals = new ArrayList<>(); @@ -152,15 +152,15 @@ public class RangePartitionMergeExchangePOperator extends AbstractExchangePOpera i++; } ITupleRangePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, rangeComps, - rangeMap, rangeType); - IConnectorDescriptor conn = new MToNRangePartitionMergingConnectorDescriptor(spec, tpcf, sortFields, + rangeType); + IConnectorDescriptor conn = new MToNRangePartitionMergingConnectorDescriptor(spec, tpcf, rangeId, sortFields, binaryComps, nkcf); return new Pair<>(conn, null); } @Override public String toString() { - return getOperatorTag().toString() + " " + partitioningFields + " " + rangeType; + return getOperatorTag().toString() + " " + partitioningFields + " " + rangeType + " " + rangeId; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java index c1dcd97..040e663 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java @@ -25,29 +25,27 @@ import java.util.Map; import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType; +import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; +import org.apache.hyracks.dataflow.std.base.RangeId; public class OrderedPartitionedProperty implements IPartitioningProperty { private List<OrderColumn> orderColumns; private INodeDomain domain; - private IRangeMap rangeMap; + private RangeId rangeId; private RangePartitioningType rangeType; - public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, IRangeMap rangeMap, - RangePartitioningType rangeType) { + public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, RangeId rangeId, + RangePartitioningType rangeType, IRangeMap rangeMapHint) { this.domain = domain; this.orderColumns = orderColumns; - this.rangeMap = rangeMap; + this.rangeId = rangeId; this.rangeType = rangeType; } - public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, IRangeMap rangeMap) { - this.domain = domain; - this.orderColumns = orderColumns; - this.rangeMap = rangeMap; - this.rangeType = RangePartitioningType.PROJECT; + public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, RangeId rangeId) { + this(orderColumns, domain, rangeId, RangePartitioningType.PROJECT, null); } public List<OrderColumn> getOrderColumns() { @@ -81,7 +79,7 @@ public class OrderedPartitionedProperty implements IPartitioningProperty { List<FunctionalDependency> fds) { List<OrderColumn> columns = PropertiesUtil.replaceOrderColumnsByEqClasses(orderColumns, equivalenceClasses); columns = PropertiesUtil.applyFDsToOrderColumns(columns, fds); - return new OrderedPartitionedProperty(columns, domain, rangeMap); + return new OrderedPartitionedProperty(columns, domain, rangeId); } @Override @@ -91,8 +89,8 @@ public class OrderedPartitionedProperty implements IPartitioningProperty { } } - public IRangeMap getRangeMap() { - return rangeMap; + public RangeId getRangeId() { + return rangeId; } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java index d3e21f7..6400dd1 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java @@ -161,11 +161,11 @@ public class PropertiesUtil { if (mayExpandProperties) { return (isPrefixOf(od.getOrderColumns().iterator(), or.getOrderColumns().iterator()) && or.getRangePartitioningType().equals(od.getRangePartitioningType()) - && or.getRangeMap().equals(od.getRangeMap())); + && or.getRangeId().equals(od.getRangeId())); } else { return (or.getOrderColumns().equals(od.getOrderColumns()) && or.getRangePartitioningType().equals(od.getRangePartitioningType()) - && or.getRangeMap().equals(od.getRangeMap())); + && or.getRangeId().equals(od.getRangeId())); } } default: { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java index 86f61ad..12b9c66 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java @@ -42,17 +42,19 @@ import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDom import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency; import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector; import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; +import org.apache.hyracks.dataflow.std.base.RangeId; @SuppressWarnings({ "unchecked", "rawtypes" }) public class AlgebricksOptimizationContext implements IOptimizationContext { private int varCounter; + private int rangeIdCounter; private final IExpressionEvalSizeComputer expressionEvalSizeComputer; private final IMergeAggregationExpressionFactory mergeAggregationExpressionFactory; private final PhysicalOptimizationConfig physicalOptimizationConfig; private final IVariableEvalSizeEnvironment varEvalSizeEnv = new IVariableEvalSizeEnvironment() { - Map<LogicalVariable, Integer> varSizeMap = new HashMap<LogicalVariable, Integer>(); + Map<LogicalVariable, Integer> varSizeMap = new HashMap<>(); @Override public void setVariableEvalSize(LogicalVariable var, int size) { @@ -65,19 +67,19 @@ public class AlgebricksOptimizationContext implements IOptimizationContext { } }; - private Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<ILogicalOperator, IVariableTypeEnvironment>(); + private Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<>(); - private Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<ILogicalOperator, HashSet<ILogicalOperator>>(); - private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<IAlgebraicRewriteRule, HashSet<ILogicalOperator>>(); - private Map<LogicalVariable, FunctionalDependency> varToPrimaryKey = new HashMap<LogicalVariable, FunctionalDependency>(); + private Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<>(); + private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<>(); + private Map<LogicalVariable, FunctionalDependency> varToPrimaryKey = new HashMap<>(); private IMetadataProvider metadataProvider; - private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<LogicalVariable>(); + private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<>(); - protected final Map<ILogicalOperator, List<FunctionalDependency>> fdGlobalMap = new HashMap<ILogicalOperator, List<FunctionalDependency>>(); - protected final Map<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>> eqClassGlobalMap = new HashMap<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>>(); + protected final Map<ILogicalOperator, List<FunctionalDependency>> fdGlobalMap = new HashMap<>(); + protected final Map<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>> eqClassGlobalMap = new HashMap<>(); - protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<ILogicalOperator, ILogicalPropertiesVector>(); + protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<>(); private final IExpressionTypeComputer expressionTypeComputer; private final IMissableTypeComputer nullableTypeComputer; private final INodeDomain defaultNodeDomain; @@ -98,6 +100,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext { PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations, LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) { this.varCounter = varCounter; + this.rangeIdCounter = -1; this.expressionEvalSizeComputer = expressionEvalSizeComputer; this.mergeAggregationExpressionFactory = mergeAggregationExpressionFactory; this.expressionTypeComputer = expressionTypeComputer; @@ -125,6 +128,13 @@ public class AlgebricksOptimizationContext implements IOptimizationContext { } @Override + public RangeId newRangeId() { + rangeIdCounter++; + RangeId id = new RangeId(rangeIdCounter); + return id; + } + + @Override public IMetadataProvider getMetadataProvider() { return metadataProvider; } @@ -148,7 +158,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext { public void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op) { HashSet<ILogicalOperator> operators = dontApply.get(rule); if (operators == null) { - HashSet<ILogicalOperator> os = new HashSet<ILogicalOperator>(); + HashSet<ILogicalOperator> os = new HashSet<>(); os.add(op); dontApply.put(rule, os); } else { @@ -164,7 +174,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext { public boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2) { HashSet<ILogicalOperator> ops = alreadyCompared.get(op1); if (ops == null) { - HashSet<ILogicalOperator> newEntry = new HashSet<ILogicalOperator>(); + HashSet<ILogicalOperator> newEntry = new HashSet<>(); newEntry.add(op2); alreadyCompared.put(op1, newEntry); return false; @@ -204,9 +214,9 @@ public class AlgebricksOptimizationContext implements IOptimizationContext { public List<LogicalVariable> findPrimaryKey(LogicalVariable recordVar) { FunctionalDependency fd = varToPrimaryKey.get(recordVar); if (fd == null) { - return null; + return new ArrayList<>(); } - return new ArrayList<LogicalVariable>(fd.getHead()); + return new ArrayList<>(fd.getHead()); } @Override @@ -299,7 +309,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext { public void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars) { for (Map.Entry<LogicalVariable, FunctionalDependency> me : varToPrimaryKey.entrySet()) { FunctionalDependency fd = me.getValue(); - List<LogicalVariable> hd = new ArrayList<LogicalVariable>(); + List<LogicalVariable> hd = new ArrayList<>(); for (LogicalVariable v : fd.getHead()) { LogicalVariable v2 = mappedVars.get(v); if (v2 == null) { @@ -308,7 +318,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext { hd.add(v2); } } - List<LogicalVariable> tl = new ArrayList<LogicalVariable>(); + List<LogicalVariable> tl = new ArrayList<>(); for (LogicalVariable v : fd.getTail()) { LogicalVariable v2 = mappedVars.get(v); if (v2 == null) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java index 44e04b5..f6b1796 100644 --- a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java +++ b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java @@ -21,7 +21,7 @@ package org.apache.hyracks.algebricks.data; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType; +import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; public interface IBinaryComparatorFactoryProvider { public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending) http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java index 31d1099..4ec5e27 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java @@ -49,6 +49,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.FDsAndEquivClassesVisitor; import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator; @@ -60,6 +61,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDi import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangeForwardPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator; @@ -89,8 +91,9 @@ import org.apache.hyracks.algebricks.core.config.AlgebricksConfig; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType; +import org.apache.hyracks.api.dataflow.value.IRangeMap; +import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; +import org.apache.hyracks.dataflow.std.base.RangeId; public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { @@ -155,8 +158,8 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { } // Gets the index of a child to start top-down data property enforcement. - // If there is a partitioning-compatible child with the operator in opRef, start from this child; - // otherwise, start from child zero. + // If there is a partitioning-compatible child with the operator in opRef, + // start from this child; otherwise, start from child zero. private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan, IOptimizationContext context) throws AlgebricksException { IPhysicalPropertiesVector[] reqdProperties = null; @@ -273,8 +276,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { changed = true; addEnforcers(op, childIndex, diff, rqd, delivered, childrenDomain, nestedPlan, context); - AbstractLogicalOperator newChild = (AbstractLogicalOperator) op.getInputs().get(childIndex) - .getValue(); + AbstractLogicalOperator newChild = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue(); if (newChild != child) { delivered = newChild.getDeliveredPhysicalProperties(); @@ -333,7 +335,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { private IPhysicalPropertiesVector newPropertiesDiff(AbstractLogicalOperator newChild, IPhysicalPropertiesVector required, boolean mayExpandPartitioningProperties, IOptimizationContext context) - throws AlgebricksException { + throws AlgebricksException { IPhysicalPropertiesVector newDelivered = newChild.getDeliveredPhysicalProperties(); Map<LogicalVariable, EquivalenceClass> newChildEqClasses = context.getEquivalenceClassMap(newChild); @@ -373,14 +375,15 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { preSortedDistinct.setDistinctByColumns(d.getDistinctByVarList()); break; } + default: } } private List<OrderColumn> getOrderColumnsFromGroupingProperties(List<ILocalStructuralProperty> reqd, List<ILocalStructuralProperty> dlvd) { - List<OrderColumn> returnedProperties = new ArrayList<OrderColumn>(); - List<LogicalVariable> rqdCols = new ArrayList<LogicalVariable>(); - List<LogicalVariable> dlvdCols = new ArrayList<LogicalVariable>(); + List<OrderColumn> returnedProperties = new ArrayList<>(); + List<LogicalVariable> rqdCols = new ArrayList<>(); + List<LogicalVariable> dlvdCols = new ArrayList<>(); for (ILocalStructuralProperty r : reqd) { r.getVariables(rqdCols); } @@ -389,7 +392,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { } int prefix = dlvdCols.size() - 1; - for (; prefix >= 0;) { + while (prefix >= 0) { if (!rqdCols.contains(dlvdCols.get(prefix))) { prefix--; } else { @@ -403,7 +406,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { returnedProperties.add(new OrderColumn(orderColumns.get(j).getColumn(), orderColumns.get(j).getOrder())); } // maintain other order columns after the required order columns - if (returnedProperties.size() != 0) { + if (!returnedProperties.isEmpty()) { for (int j = prefix + 1; j < dlvdCols.size(); j++) { OrderColumn oc = orderColumns.get(j); returnedProperties.add(new OrderColumn(oc.getColumn(), oc.getOrder())); @@ -464,9 +467,9 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { return; } - Mutable<ILogicalOperator> topOp = new MutableObject<ILogicalOperator>(); + Mutable<ILogicalOperator> topOp = new MutableObject<>(); topOp.setValue(op.getInputs().get(i).getValue()); - LinkedList<LocalOrderProperty> oList = new LinkedList<LocalOrderProperty>(); + LinkedList<LocalOrderProperty> oList = new LinkedList<>(); for (ILocalStructuralProperty prop : localProperties) { switch (prop.getPropertyType()) { @@ -478,7 +481,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { LocalGroupingProperty g = (LocalGroupingProperty) prop; Collection<LogicalVariable> vars = (g.getPreferredOrderEnforcer() != null) ? g.getPreferredOrderEnforcer() : g.getColumnSet(); - List<OrderColumn> orderColumns = new ArrayList<OrderColumn>(); + List<OrderColumn> orderColumns = new ArrayList<>(); for (LogicalVariable v : vars) { OrderColumn oc = new OrderColumn(v, OrderKind.ASC); orderColumns.add(oc); @@ -504,12 +507,12 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { private Mutable<ILogicalOperator> enforceOrderProperties(List<LocalOrderProperty> oList, Mutable<ILogicalOperator> topOp, boolean isMicroOp, IOptimizationContext context) - throws AlgebricksException { - List<Pair<IOrder, Mutable<ILogicalExpression>>> oe = new LinkedList<Pair<IOrder, Mutable<ILogicalExpression>>>(); + throws AlgebricksException { + List<Pair<IOrder, Mutable<ILogicalExpression>>> oe = new LinkedList<>(); for (LocalOrderProperty orderProperty : oList) { for (OrderColumn oc : orderProperty.getOrderColumns()) { IOrder ordType = (oc.getOrder() == OrderKind.ASC) ? OrderOperator.ASC_ORDER : OrderOperator.DESC_ORDER; - Pair<IOrder, Mutable<ILogicalExpression>> pair = new Pair<IOrder, Mutable<ILogicalExpression>>(ordType, + Pair<IOrder, Mutable<ILogicalExpression>> pair = new Pair<>(ordType, new MutableObject<ILogicalExpression>(new VariableReferenceExpression(oc.getColumn()))); oe.add(pair); } @@ -537,13 +540,15 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { switch (pp.getPartitioningType()) { case UNPARTITIONED: { List<OrderColumn> ordCols = computeOrderColumns(deliveredByChild); - if (ordCols == null || ordCols.size() == 0) { + if (ordCols.isEmpty()) { pop = new RandomMergeExchangePOperator(); } else { if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) { + RangeId rangeId = context.newRangeId(); IRangeMap rangeMap = (IRangeMap) op.getAnnotations() .get(OperatorAnnotations.USE_RANGE_CONNECTOR); - pop = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeMap, + addRangeForwardOperator(op.getInputs().get(i), rangeId, rangeMap, context); + pop = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeId, RangePartitioningType.PROJECT); } else { OrderColumn[] sortColumns = new OrderColumn[ordCols.size()]; @@ -554,8 +559,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { break; } case UNORDERED_PARTITIONED: { - List<LogicalVariable> varList = new ArrayList<LogicalVariable>( - ((UnorderedPartitionedProperty) pp).getColumnSet()); + List<LogicalVariable> varList = new ArrayList<>(((UnorderedPartitionedProperty) pp).getColumnSet()); List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties(); List<ILocalStructuralProperty> reqdLocals = required.getLocalProperties(); boolean propWasSet = false; @@ -580,6 +584,12 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { OrderedPartitionedProperty opp = (OrderedPartitionedProperty) pp; List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties(); List<ILocalStructuralProperty> reqdLocals = required.getLocalProperties(); + + // Add RangeForwardOperator. + IRangeMap rangeMap = (IRangeMap) op.getAnnotations() + .get(OperatorAnnotations.USE_RANGE_CONNECTOR); + addRangeForwardOperator(op.getInputs().get(i), opp.getRangeId(), rangeMap, context); + boolean propWasSet = false; pop = null; if (reqdLocals != null && cldLocals != null && allAreOrderProps(cldLocals)) { @@ -589,13 +599,13 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) { List<OrderColumn> orderColumns = getOrderColumnsFromGroupingProperties(reqdLocals, cldLocals); - pop = new RangePartitionMergeExchangePOperator(orderColumns, domain, opp.getRangeMap(), + pop = new RangePartitionMergeExchangePOperator(orderColumns, domain, opp.getRangeId(), opp.getRangePartitioningType()); propWasSet = true; } } if (!propWasSet) { - pop = new RangePartitionExchangePOperator(opp.getOrderColumns(), domain, opp.getRangeMap(), + pop = new RangePartitionExchangePOperator(opp.getOrderColumns(), domain, opp.getRangeId(), opp.getRangePartitioningType()); } break; @@ -630,6 +640,21 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { } } + private void addRangeForwardOperator(Mutable<ILogicalOperator> op, RangeId rangeId, IRangeMap rangeMap, + IOptimizationContext context) throws AlgebricksException { + RangeForwardOperator rfo = new RangeForwardOperator(rangeId, rangeMap); + RangeForwardPOperator rfpo = new RangeForwardPOperator(rangeId, rangeMap); + rfo.setPhysicalOperator(rfpo); + setNewOp(op, rfo, context); + rfo.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED); + OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull(rfo, context); + context.computeAndSetTypeEnvironmentForOperator(rfo); + if (AlgebricksConfig.DEBUG) { + AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Added range forward " + rfo.getPhysicalOperator() + ".\n"); + printOp((AbstractLogicalOperator) op.getValue()); + } + } + private boolean allAreOrderProps(List<ILocalStructuralProperty> cldLocals) { for (ILocalStructuralProperty lsp : cldLocals) { if (lsp.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) { @@ -647,17 +672,17 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { } private List<OrderColumn> computeOrderColumns(IPhysicalPropertiesVector pv) { - List<OrderColumn> ordCols = new ArrayList<OrderColumn>(); + List<OrderColumn> ordCols = new ArrayList<>(); List<ILocalStructuralProperty> localProps = pv.getLocalProperties(); - if (localProps == null || localProps.size() == 0) { - return null; + if (localProps == null || localProps.isEmpty()) { + return new ArrayList<>(); } else { for (ILocalStructuralProperty p : localProps) { if (p.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) { LocalOrderProperty lop = (LocalOrderProperty) p; ordCols.addAll(lop.getOrderColumns()); } else { - return null; + return new ArrayList<>(); } } return ordCols; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java index e7cf912..101ec42 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java @@ -69,13 +69,13 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule { // Map of variables that could be replaced by their producing expression. // Populated during the top-down sweep of the plan. - protected Map<LogicalVariable, ILogicalExpression> varAssignRhs = new HashMap<LogicalVariable, ILogicalExpression>(); + protected Map<LogicalVariable, ILogicalExpression> varAssignRhs = new HashMap<>(); // Visitor for replacing variable reference expressions with their originating expression. protected InlineVariablesVisitor inlineVisitor = new InlineVariablesVisitor(varAssignRhs); // Set of FunctionIdentifiers that we should not inline. - protected Set<FunctionIdentifier> doNotInlineFuncs = new HashSet<FunctionIdentifier>(); + protected Set<FunctionIdentifier> doNotInlineFuncs = new HashSet<>(); protected boolean hasRun = false; @@ -127,9 +127,9 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule { public static boolean functionIsConstantAtRuntime(AbstractLogicalOperator op, AbstractFunctionCallExpression funcExpr, IOptimizationContext context) throws AlgebricksException { //make sure that there are no variables in the expression - Set<LogicalVariable> usedVariables = new HashSet<LogicalVariable>(); + Set<LogicalVariable> usedVariables = new HashSet<>(); funcExpr.getUsedVariables(usedVariables); - if (usedVariables.size() > 0) { + if (!usedVariables.isEmpty()) { return false; } return true; @@ -176,7 +176,7 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule { } // Variables produced by a nested subplan cannot be inlined // in operators above the subplan. - Set<LogicalVariable> producedVars = new HashSet<LogicalVariable>(); + Set<LogicalVariable> producedVars = new HashSet<>(); VariableUtilities.getProducedVariables(root.getValue(), producedVars); varAssignRhs.keySet().removeAll(producedVars); } @@ -186,7 +186,7 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule { // References to variables generated in the right branch of a left-outer-join cannot be inlined // in operators above the left-outer-join. if (op.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) { - Set<LogicalVariable> rightLiveVars = new HashSet<LogicalVariable>(); + Set<LogicalVariable> rightLiveVars = new HashSet<>(); VariableUtilities.getLiveVariables(op.getInputs().get(1).getValue(), rightLiveVars); varAssignRhs.keySet().removeAll(rightLiveVars); } @@ -208,8 +208,8 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule { protected class InlineVariablesVisitor implements ILogicalExpressionReferenceTransform { private final Map<LogicalVariable, ILogicalExpression> varAssignRhs; - private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>(); - private final List<LogicalVariable> rhsUsedVars = new ArrayList<LogicalVariable>(); + private final Set<LogicalVariable> liveVars = new HashSet<>(); + private final List<LogicalVariable> rhsUsedVars = new ArrayList<>(); private ILogicalOperator op; private IOptimizationContext context; // If set, only replace this variable reference. @@ -236,54 +236,60 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule { public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException { ILogicalExpression e = exprRef.getValue(); switch (((AbstractLogicalExpression) e).getExpressionTag()) { - case VARIABLE: { - LogicalVariable var = ((VariableReferenceExpression) e).getVariableReference(); - // Restrict replacement to targetVar if it has been set. - if (targetVar != null && var != targetVar) { - return false; - } + case VARIABLE: + return transformVariable(exprRef); + case FUNCTION_CALL: + return transformFunction(e); + default: + return false; - // Make sure has not been excluded from inlining. - if (context.shouldNotBeInlined(var)) { - return false; - } + } + } - ILogicalExpression rhs = varAssignRhs.get(var); - if (rhs == null) { - // Variable was not produced by an assign. - return false; - } + private boolean transformFunction(ILogicalExpression e) throws AlgebricksException { + AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) e; + boolean modified = false; + for (Mutable<ILogicalExpression> arg : fce.getArguments()) { + if (transform(arg)) { + modified = true; + } + } + return modified; + } - // Make sure used variables from rhs are live. - if (liveVars.isEmpty()) { - VariableUtilities.getLiveVariables(op, liveVars); - } - rhsUsedVars.clear(); - rhs.getUsedVariables(rhsUsedVars); - for (LogicalVariable rhsUsedVar : rhsUsedVars) { - if (!liveVars.contains(rhsUsedVar)) { - return false; - } - } + private boolean transformVariable(Mutable<ILogicalExpression> exprRef) throws AlgebricksException { + LogicalVariable var = ((VariableReferenceExpression) exprRef.getValue()).getVariableReference(); + // Restrict replacement to targetVar if it has been set. + if (targetVar != null && var != targetVar) { + return false; + } - // Replace variable reference with a clone of the rhs expr. - exprRef.setValue(rhs.cloneExpression()); - return true; - } - case FUNCTION_CALL: { - AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) e; - boolean modified = false; - for (Mutable<ILogicalExpression> arg : fce.getArguments()) { - if (transform(arg)) { - modified = true; - } - } - return modified; - } - default: { + // Make sure has not been excluded from inlining. + if (context.shouldNotBeInlined(var)) { + return false; + } + + ILogicalExpression rhs = varAssignRhs.get(var); + if (rhs == null) { + // Variable was not produced by an assign. + return false; + } + + // Make sure used variables from rhs are live. + if (liveVars.isEmpty()) { + VariableUtilities.getLiveVariables(op, liveVars); + } + rhsUsedVars.clear(); + rhs.getUsedVariables(rhsUsedVars); + for (LogicalVariable rhsUsedVar : rhsUsedVars) { + if (!liveVars.contains(rhsUsedVar)) { return false; } } + + // Replace variable reference with a clone of the rhs expr. + exprRef.setValue(rhs.cloneExpression()); + return true; } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java index 7eb9ac8..a54947a 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java @@ -49,6 +49,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleS import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; @@ -184,6 +185,11 @@ class ReplaceNtsWithSubplanInputOperatorVisitor implements IQueryOperatorVisitor } @Override + public ILogicalOperator visitRangeForwardOperator(RangeForwardOperator op, Void arg) throws AlgebricksException { + return visit(op); + } + + @Override public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException { return visit(op); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangeMap.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangeMap.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangeMap.java new file mode 100644 index 0000000..cd14434 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangeMap.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.dataflow.value; + +public interface IRangeMap { + + public int getSplitCount(); + + public byte[] getByteArray(int columnIndex, int splitIndex); + + public int getStartOffset(int columnIndex, int splitIndex); + + public int getLength(int columnIndex, int splitIndex); + + public int getTag(int columnIndex, int splitIndex); + + // Min value functions + public byte[] getMinByteArray(int columnIndex); + + public int getMinStartOffset(int columnIndex); + + public int getMinLength(int columnIndex); + + public int getMinTag(int columnIndex); + + // Max value functions + public byte[] getMaxByteArray(int columnIndex); + + public int getMaxStartOffset(int columnIndex); + + public int getMaxLength(int columnIndex); + + public int getMaxTag(int columnIndex); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangePartitionType.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangePartitionType.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangePartitionType.java new file mode 100644 index 0000000..abf1495 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangePartitionType.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.dataflow.value; + +public interface IRangePartitionType { + public enum RangePartitioningType { + /** + * Partitioning is determined by finding the range partition where the first data point lies. + */ + PROJECT, + /** + * Partitioning is determined by finding the range partition where the last data point lies. + */ + PROJECT_END, + /** + * Partitioning is determined by finding all the range partitions where the data has a point. + */ + SPLIT, + /** + * Partitioning is determined by finding all the range partitions where the data has a point + * or comes after the data point. + */ + REPLICATE + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java index 5406366..9b3e607 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java @@ -21,5 +21,5 @@ package org.apache.hyracks.api.dataflow.value; import java.io.Serializable; public interface ITupleRangePartitionComputerFactory extends Serializable { - public ITupleRangePartitionComputer createPartitioner(); + public ITupleRangePartitionComputer createPartitioner(IRangeMap rangeMap); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java index bd7be3f..162d3b4 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java @@ -21,28 +21,27 @@ package org.apache.hyracks.dataflow.common.data.partition.range; import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory; +import org.apache.hyracks.api.dataflow.value.IRangeMap; import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputer; import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory; +import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.storage.IGrowableIntArray; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType; public class FieldRangePartitionComputerFactory implements ITupleRangePartitionComputerFactory { private static final long serialVersionUID = 1L; private final int[] rangeFields; - private IRangeMap rangeMap; private IBinaryRangeComparatorFactory[] comparatorFactories; private RangePartitioningType rangeType; public FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryRangeComparatorFactory[] comparatorFactories, - IRangeMap rangeMap, RangePartitioningType rangeType) { + RangePartitioningType rangeType) { this.rangeFields = rangeFields; this.comparatorFactories = comparatorFactories; - this.rangeMap = rangeMap; this.rangeType = rangeType; } - public ITupleRangePartitionComputer createPartitioner() { + public ITupleRangePartitionComputer createPartitioner(IRangeMap rangeMap) { final IBinaryComparator[] minComparators = new IBinaryComparator[comparatorFactories.length]; for (int i = 0; i < comparatorFactories.length; ++i) { minComparators[i] = comparatorFactories[i].createMinBinaryComparator(); @@ -78,7 +77,8 @@ public class FieldRangePartitionComputerFactory implements ITupleRangePartitionC throws HyracksDataException { switch (rangeType) { case PROJECT: { - int minPartition = getPartitionMap(binarySearchRangePartition(accessor, tIndex, minComparators)); + int minPartition = getPartitionMap( + binarySearchRangePartition(accessor, tIndex, minComparators)); addPartition(minPartition, map); break; } @@ -89,7 +89,8 @@ public class FieldRangePartitionComputerFactory implements ITupleRangePartitionC break; } case REPLICATE: { - int minPartition = getPartitionMap(binarySearchRangePartition(accessor, tIndex, minComparators)); + int minPartition = getPartitionMap( + binarySearchRangePartition(accessor, tIndex, minComparators)); int maxPartition = getPartitionMap(rangeMap.getSplitCount() + 1); for (int pid = minPartition; pid < maxPartition; ++pid) { addPartition(pid, map); @@ -97,7 +98,8 @@ public class FieldRangePartitionComputerFactory implements ITupleRangePartitionC break; } case SPLIT: { - int minPartition = getPartitionMap(binarySearchRangePartition(accessor, tIndex, minComparators)); + int minPartition = getPartitionMap( + binarySearchRangePartition(accessor, tIndex, minComparators)); int maxPartition = getPartitionMap( binarySearchRangePartition(accessor, tIndex, maxComparators)); for (int pid = minPartition; pid <= maxPartition; ++pid) {