>From Janhavi Tripurwar <[email protected]>: Janhavi Tripurwar has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20035?usp=email )
Change subject: [NO ISSUE][RT]: Refactor: Remove code duplication in stable sort operators ...................................................................... [NO ISSUE][RT]: Refactor: Remove code duplication in stable sort operators Change-Id: I2a0fc093d356be7e2379a66c6843ebd6b6cdbd4c Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20035 Reviewed-by: Ali Alsuliman <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Janhavi Tripurwar <[email protected]> Tested-by: Jenkins <[email protected]> --- M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java 3 files changed, 69 insertions(+), 74 deletions(-) Approvals: Ali Alsuliman: Looks good to me, approved Anon. E. Moose #1000171: Janhavi Tripurwar: Looks good to me, but someone else must approve Jenkins: Verified; Verified diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java index 3162c1c..840a33e 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; @@ -32,8 +33,10 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; @@ -47,7 +50,14 @@ import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty; import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; +import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap; public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperator { @@ -127,6 +137,56 @@ return orderProp; } + protected static class SortSetupData { + protected final int[] sortFields; + protected final IBinaryComparatorFactory[] comps; + protected final INormalizedKeyComputerFactory nkcf; + protected final RecordDescriptor recDescriptor; + protected final int maxNumberOfFrames; + + protected SortSetupData(int[] sortFields, IBinaryComparatorFactory[] comps, INormalizedKeyComputerFactory nkcf, + RecordDescriptor recDescriptor, int maxNumberOfFrames) { + this.sortFields = sortFields; + this.comps = comps; + this.nkcf = nkcf; + this.recDescriptor = recDescriptor; + this.maxNumberOfFrames = maxNumberOfFrames; + } + } + + protected static SortSetupData setupSortOperator(JobGenContext context, ILogicalOperator op, + IOperatorSchema opSchema, OrderColumn[] sortColumns, LocalMemoryRequirements memReq) + throws AlgebricksException { + + RecordDescriptor recDescriptor = + JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); + int n = sortColumns.length; + int[] sortFields = new int[n]; + IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n]; + + INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider(); + INormalizedKeyComputerFactory nkcf = null; + IVariableTypeEnvironment env = context.getTypeEnvironment(op); + + for (int i = 0; i < n; i++) { + OrderColumn oc = sortColumns[i]; + LogicalVariable var = oc.getColumn(); + sortFields[i] = opSchema.findVariable(var); + Object type = env.getVarType(var); + OrderOperator.IOrder.OrderKind order = oc.getOrder(); + + if (i == 0 && nkcfProvider != null && type != null) { + nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderOperator.IOrder.OrderKind.ASC); + } + + IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider(); + comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderOperator.IOrder.OrderKind.ASC); + } + + int maxNumberOfFrames = memReq.getMemoryBudgetInFrames(); + return new SortSetupData(sortFields, comps, nkcf, recDescriptor, maxNumberOfFrames); + } + @Override public String toString() { if (orderProp == null) { diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java index 403ca57..fede506 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java @@ -21,21 +21,11 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; -import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; -import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; -import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; -import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; -import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.sort.MicroSortRuntimeFactory; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; public class MicroStableSortPOperator extends AbstractStableSortPOperator { @@ -56,32 +46,11 @@ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) throws AlgebricksException { - RecordDescriptor recDescriptor = - JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); - int n = sortColumns.length; - int[] sortFields = new int[n]; - IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n]; - int i = 0; - INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider(); - INormalizedKeyComputerFactory nkcf = null; - IVariableTypeEnvironment env = context.getTypeEnvironment(op); - for (OrderColumn oc : sortColumns) { - LogicalVariable var = oc.getColumn(); - sortFields[i] = opSchema.findVariable(var); - Object type = env.getVarType(var); - OrderKind order = oc.getOrder(); - if (i == 0 && nkcfProvider != null && type != null) { - nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC); - } + SortSetupData sortSetupData = setupSortOperator(context, op, opSchema, sortColumns, localMemoryRequirements); - IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider(); - comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC); - i++; - } - - int maxNumberOfFrames = localMemoryRequirements.getMemoryBudgetInFrames(); - IPushRuntimeFactory runtime = new MicroSortRuntimeFactory(sortFields, nkcf, comps, null, maxNumberOfFrames); - builder.contributeMicroOperator(op, runtime, recDescriptor); + IPushRuntimeFactory runtime = new MicroSortRuntimeFactory(sortSetupData.sortFields, sortSetupData.nkcf, + sortSetupData.comps, null, sortSetupData.maxNumberOfFrames); + builder.contributeMicroOperator(op, runtime, sortSetupData.recDescriptor); ILogicalOperator src = op.getInputs().get(0).getValue(); builder.contributeGraphEdge(src, 0, op, 0); } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java index 93c5c3b..729ec3f 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java @@ -21,21 +21,11 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; -import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; -import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; -import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; -import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; -import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor; import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; @@ -72,42 +62,18 @@ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) throws AlgebricksException { IOperatorDescriptorRegistry spec = builder.getJobSpec(); - RecordDescriptor recDescriptor = - JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); - int n = sortColumns.length; - int[] sortFields = new int[n]; - IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n]; + SortSetupData sortSetupData = setupSortOperator(context, op, opSchema, sortColumns, localMemoryRequirements); - INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider(); - INormalizedKeyComputerFactory nkcf = null; - - IVariableTypeEnvironment env = context.getTypeEnvironment(op); - int i = 0; - // TODO(ali): should refactor common code with micro sort op - for (OrderColumn oc : sortColumns) { - LogicalVariable var = oc.getColumn(); - sortFields[i] = opSchema.findVariable(var); - Object type = env.getVarType(var); - OrderKind order = oc.getOrder(); - if (i == 0 && nkcfProvider != null && type != null) { - nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC); - } - IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider(); - comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC); - i++; - } - - int maxNumberOfFrames = localMemoryRequirements.getMemoryBudgetInFrames(); AbstractSorterOperatorDescriptor sortOpDesc; // topK == -1 means that a topK value is not provided. if (topK == -1) { - sortOpDesc = - new ExternalSortOperatorDescriptor(spec, maxNumberOfFrames, sortFields, nkcf, comps, recDescriptor); + sortOpDesc = new ExternalSortOperatorDescriptor(spec, sortSetupData.maxNumberOfFrames, + sortSetupData.sortFields, sortSetupData.nkcf, sortSetupData.comps, sortSetupData.recDescriptor); } else { // Since topK value is provided, topK optimization is possible. // We call topKSorter instead of calling ExternalSortOperator. - sortOpDesc = new TopKSorterOperatorDescriptor(spec, maxNumberOfFrames, topK, sortFields, nkcf, comps, - recDescriptor); + sortOpDesc = new TopKSorterOperatorDescriptor(spec, sortSetupData.maxNumberOfFrames, topK, + sortSetupData.sortFields, sortSetupData.nkcf, sortSetupData.comps, sortSetupData.recDescriptor); } sortOpDesc.setSourceLocation(op.getSourceLocation()); contributeOpDesc(builder, (AbstractLogicalOperator) op, sortOpDesc); -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20035?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I2a0fc093d356be7e2379a66c6843ebd6b6cdbd4c Gerrit-Change-Number: 20035 Gerrit-PatchSet: 5 Gerrit-Owner: Janhavi Tripurwar <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Janhavi Tripurwar <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-CC: Michael Blow <[email protected]>
