>From Janhavi Tripurwar <[email protected]>:

Janhavi Tripurwar has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20035?usp=email )

Change subject: [NO ISSUE][RT]: Refactor: Remove code duplication in stable 
sort operators
......................................................................

[NO ISSUE][RT]: Refactor: Remove code duplication in stable sort operators

Change-Id: I2a0fc093d356be7e2379a66c6843ebd6b6cdbd4c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20035
Reviewed-by: Ali Alsuliman <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Janhavi Tripurwar <[email protected]>
Tested-by: Jenkins <[email protected]>
---
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
3 files changed, 69 insertions(+), 74 deletions(-)

Approvals:
  Ali Alsuliman: Looks good to me, approved
  Anon. E. Moose #1000171:
  Janhavi Tripurwar: Looks good to me, but someone else must approve
  Jenkins: Verified; Verified




diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index 3162c1c..840a33e 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -24,6 +24,7 @@
 import java.util.List;

 import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -32,8 +33,10 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
@@ -47,7 +50,14 @@
 import 
org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import 
org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;

 public abstract class AbstractStableSortPOperator extends 
AbstractPhysicalOperator {
@@ -127,6 +137,56 @@
         return orderProp;
     }

+    protected static class SortSetupData {
+        protected final int[] sortFields;
+        protected final IBinaryComparatorFactory[] comps;
+        protected final INormalizedKeyComputerFactory nkcf;
+        protected final RecordDescriptor recDescriptor;
+        protected final int maxNumberOfFrames;
+
+        protected SortSetupData(int[] sortFields, IBinaryComparatorFactory[] 
comps, INormalizedKeyComputerFactory nkcf,
+                RecordDescriptor recDescriptor, int maxNumberOfFrames) {
+            this.sortFields = sortFields;
+            this.comps = comps;
+            this.nkcf = nkcf;
+            this.recDescriptor = recDescriptor;
+            this.maxNumberOfFrames = maxNumberOfFrames;
+        }
+    }
+
+    protected static SortSetupData setupSortOperator(JobGenContext context, 
ILogicalOperator op,
+            IOperatorSchema opSchema, OrderColumn[] sortColumns, 
LocalMemoryRequirements memReq)
+            throws AlgebricksException {
+
+        RecordDescriptor recDescriptor =
+                
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
+        int n = sortColumns.length;
+        int[] sortFields = new int[n];
+        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+
+        INormalizedKeyComputerFactoryProvider nkcfProvider = 
context.getNormalizedKeyComputerFactoryProvider();
+        INormalizedKeyComputerFactory nkcf = null;
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+
+        for (int i = 0; i < n; i++) {
+            OrderColumn oc = sortColumns[i];
+            LogicalVariable var = oc.getColumn();
+            sortFields[i] = opSchema.findVariable(var);
+            Object type = env.getVarType(var);
+            OrderOperator.IOrder.OrderKind order = oc.getOrder();
+
+            if (i == 0 && nkcfProvider != null && type != null) {
+                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, 
order == OrderOperator.IOrder.OrderKind.ASC);
+            }
+
+            IBinaryComparatorFactoryProvider bcfp = 
context.getBinaryComparatorFactoryProvider();
+            comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == 
OrderOperator.IOrder.OrderKind.ASC);
+        }
+
+        int maxNumberOfFrames = memReq.getMemoryBudgetInFrames();
+        return new SortSetupData(sortFields, comps, nkcf, recDescriptor, 
maxNumberOfFrames);
+    }
+
     @Override
     public String toString() {
         if (orderProp == null) {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
index 403ca57..fede506 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
@@ -21,21 +21,11 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import 
org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.sort.MicroSortRuntimeFactory;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;

 public class MicroStableSortPOperator extends AbstractStableSortPOperator {

@@ -56,32 +46,11 @@
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
-        RecordDescriptor recDescriptor =
-                
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
-        int n = sortColumns.length;
-        int[] sortFields = new int[n];
-        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
-        int i = 0;
-        INormalizedKeyComputerFactoryProvider nkcfProvider = 
context.getNormalizedKeyComputerFactoryProvider();
-        INormalizedKeyComputerFactory nkcf = null;
-        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
-        for (OrderColumn oc : sortColumns) {
-            LogicalVariable var = oc.getColumn();
-            sortFields[i] = opSchema.findVariable(var);
-            Object type = env.getVarType(var);
-            OrderKind order = oc.getOrder();
-            if (i == 0 && nkcfProvider != null && type != null) {
-                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, 
order == OrderKind.ASC);
-            }
+        SortSetupData sortSetupData = setupSortOperator(context, op, opSchema, 
sortColumns, localMemoryRequirements);

-            IBinaryComparatorFactoryProvider bcfp = 
context.getBinaryComparatorFactoryProvider();
-            comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == 
OrderKind.ASC);
-            i++;
-        }
-
-        int maxNumberOfFrames = 
localMemoryRequirements.getMemoryBudgetInFrames();
-        IPushRuntimeFactory runtime = new MicroSortRuntimeFactory(sortFields, 
nkcf, comps, null, maxNumberOfFrames);
-        builder.contributeMicroOperator(op, runtime, recDescriptor);
+        IPushRuntimeFactory runtime = new 
MicroSortRuntimeFactory(sortSetupData.sortFields, sortSetupData.nkcf,
+                sortSetupData.comps, null, sortSetupData.maxNumberOfFrames);
+        builder.contributeMicroOperator(op, runtime, 
sortSetupData.recDescriptor);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
     }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
index 93c5c3b..729ec3f 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
@@ -21,21 +21,11 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import 
org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
@@ -72,42 +62,18 @@
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
-        RecordDescriptor recDescriptor =
-                
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
-        int n = sortColumns.length;
-        int[] sortFields = new int[n];
-        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+        SortSetupData sortSetupData = setupSortOperator(context, op, opSchema, 
sortColumns, localMemoryRequirements);

-        INormalizedKeyComputerFactoryProvider nkcfProvider = 
context.getNormalizedKeyComputerFactoryProvider();
-        INormalizedKeyComputerFactory nkcf = null;
-
-        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
-        int i = 0;
-        // TODO(ali): should refactor common code with micro sort op
-        for (OrderColumn oc : sortColumns) {
-            LogicalVariable var = oc.getColumn();
-            sortFields[i] = opSchema.findVariable(var);
-            Object type = env.getVarType(var);
-            OrderKind order = oc.getOrder();
-            if (i == 0 && nkcfProvider != null && type != null) {
-                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, 
order == OrderKind.ASC);
-            }
-            IBinaryComparatorFactoryProvider bcfp = 
context.getBinaryComparatorFactoryProvider();
-            comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == 
OrderKind.ASC);
-            i++;
-        }
-
-        int maxNumberOfFrames = 
localMemoryRequirements.getMemoryBudgetInFrames();
         AbstractSorterOperatorDescriptor sortOpDesc;
         // topK == -1 means that a topK value is not provided.
         if (topK == -1) {
-            sortOpDesc =
-                    new ExternalSortOperatorDescriptor(spec, 
maxNumberOfFrames, sortFields, nkcf, comps, recDescriptor);
+            sortOpDesc = new ExternalSortOperatorDescriptor(spec, 
sortSetupData.maxNumberOfFrames,
+                    sortSetupData.sortFields, sortSetupData.nkcf, 
sortSetupData.comps, sortSetupData.recDescriptor);
         } else {
             // Since topK value is provided, topK optimization is possible.
             // We call topKSorter instead of calling ExternalSortOperator.
-            sortOpDesc = new TopKSorterOperatorDescriptor(spec, 
maxNumberOfFrames, topK, sortFields, nkcf, comps,
-                    recDescriptor);
+            sortOpDesc = new TopKSorterOperatorDescriptor(spec, 
sortSetupData.maxNumberOfFrames, topK,
+                    sortSetupData.sortFields, sortSetupData.nkcf, 
sortSetupData.comps, sortSetupData.recDescriptor);
         }
         sortOpDesc.setSourceLocation(op.getSourceLocation());
         contributeOpDesc(builder, (AbstractLogicalOperator) op, sortOpDesc);

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20035?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I2a0fc093d356be7e2379a66c6843ebd6b6cdbd4c
Gerrit-Change-Number: 20035
Gerrit-PatchSet: 5
Gerrit-Owner: Janhavi Tripurwar <[email protected]>
Gerrit-Reviewer: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Janhavi Tripurwar <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-CC: Michael Blow <[email protected]>

Reply via email to