>From <[email protected]>:

[email protected] has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19100 )


Change subject: [ASTERIXDB-3531][COMP] Push projections into unnest operator
......................................................................

[ASTERIXDB-3531][COMP] Push projections into unnest operator

Change-Id: I83842976792f54017eff60fcc597bdd4ec6dc8c9
---
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectIntoDataSourceScanRule.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
8 files changed, 128 insertions(+), 59 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/00/19100/1

diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java
index 64e41f5..8de2959 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java
@@ -25,7 +25,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;

-public abstract class AbstractScanOperator extends AbstractLogicalOperator {
+public abstract class AbstractScanOperator extends AbstractProjectingOperator {
     protected List<LogicalVariable> variables;

     public AbstractScanOperator(List<LogicalVariable> variables) {
@@ -47,6 +47,14 @@
     @Override
     public void recomputeSchema() {
         schema = new ArrayList<>();
+
+        if (isProjectPushed()) {
+            for (LogicalVariable v : getProjectVariables()) {
+                schema.add(v);
+            }
+            return;
+        }
+
         schema.addAll(inputs.get(0).getValue().getSchema());
         schema.addAll(variables);
     }
@@ -58,6 +66,14 @@
             @Override
             public void propagateVariables(IOperatorSchema target, 
IOperatorSchema... sources)
                     throws AlgebricksException {
+
+                if (isProjectPushed()) {
+                    for (LogicalVariable v : getProjectVariables()) {
+                        target.addVariable(v);
+                    }
+                    return;
+                }
+
                 if (sources.length > 0) {
                     target.addAllVariables(sources[0]);
                 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java
index 5c485d5..6a1ccc8 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java
@@ -90,6 +90,13 @@

             @Override
             public void propagateVariables(IOperatorSchema target, 
IOperatorSchema... sources) {
+                if (isProjectPushed()) {
+                    for (LogicalVariable v : getProjectVariables()) {
+                        target.addVariable(v);
+                    }
+                    return;
+                }
+
                 if (sources.length > 0) {
                     target.addAllVariables(sources[0]);
                 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
index c0f71fe..c5310b7 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.logical;

-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;

 import org.apache.commons.lang3.mutable.Mutable;
@@ -37,10 +35,6 @@
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;

 public class DataSourceScanOperator extends AbstractDataSourceOperator {
-    private final List<LogicalVariable> projectVars;
-
-    private boolean projectPushed = false;
-
     private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
     private List<LogicalVariable> minFilterVars;
     private List<LogicalVariable> maxFilterVars;
@@ -61,7 +55,6 @@
             Mutable<ILogicalExpression> selectCondition, long outputLimit,
             IProjectionFiltrationInfo projectionFiltrationInfo) {
         super(variables, dataSource);
-        projectVars = new ArrayList<>();
         this.selectCondition = selectCondition;
         this.outputLimit = outputLimit;
         setProjectionFiltrationInfo(projectionFiltrationInfo);
@@ -93,19 +86,6 @@
         return false;
     }

-    public void addProjectVariables(Collection<LogicalVariable> vars) {
-        projectVars.addAll(vars);
-        projectPushed = true;
-    }
-
-    public List<LogicalVariable> getProjectVariables() {
-        return projectVars;
-    }
-
-    public boolean isProjectPushed() {
-        return projectPushed;
-    }
-
     @Override
     public VariablePropagationPolicy getVariablePropagationPolicy() {
         return new VariablePropagationPolicy() {
@@ -115,7 +95,7 @@
                 if (sources.length > 0) {
                     target.addAllVariables(sources[0]);
                 }
-                List<LogicalVariable> outputVariables = projectPushed ? 
projectVars : variables;
+                List<LogicalVariable> outputVariables = isProjectPushed() ? 
getProjectVariables() : variables;
                 for (LogicalVariable v : outputVariables) {
                     target.addVariable(v);
                 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
index 45360d4..589a4a7 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
@@ -64,7 +65,6 @@
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
         AbstractUnnestNonMapOperator unnest = (AbstractUnnestNonMapOperator) 
op;
-        int outCol = opSchema.findVariable(unnest.getVariable());
         ILogicalExpression unnestExpr = unnest.getExpressionRef().getValue();
         IExpressionRuntimeProvider expressionRuntimeProvider = 
context.getExpressionRuntimeProvider();
         boolean exit = false;
@@ -83,14 +83,42 @@
         UnnestingFunctionCallExpression agg = 
(UnnestingFunctionCallExpression) unnestExpr;
         IUnnestingEvaluatorFactory unnestingFactory = 
expressionRuntimeProvider.createUnnestingFunctionFactory(agg,
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), 
inputSchemas, context);
-        int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
         IUnnestingPositionWriterFactory positionWriterFactory =
                 unnest.hasPositionalVariable() ? 
context.getUnnestingPositionWriterFactory() : null;
         IMissingWriterFactory missingWriterFactory = leftOuter
                 ? JobGenHelper.getMissingWriterFactory(context, 
((LeftOuterUnnestOperator) op).getMissingValue())
                 : null;
-        UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, 
unnestingFactory, projectionList,
-                positionWriterFactory, leftOuter, missingWriterFactory);
+
+        int outCol;
+        int positionalCol;
+        int[] projectionList;
+
+        if (unnest.isProjectPushed()) {
+
+            outCol = -1;
+            positionalCol = -1;
+            projectionList = new int[unnest.getProjectVariables().size()];
+            int c = 0;
+            for (LogicalVariable projectVar : unnest.getProjectVariables()) {
+                if (projectVar.equals(unnest.getVariable())) {
+                    outCol = inputSchemas[0].getSize();
+                    projectionList[c++] = inputSchemas[0].getSize();
+                } else if (unnest.hasPositionalVariable() && 
projectVar.equals(unnest.getPositionalVariable())) {
+                    positionalCol = inputSchemas[0].getSize() + 1;
+                    projectionList[c++] = inputSchemas[0].getSize() + 1;
+                } else {
+                    projectionList[c++] = 
inputSchemas[0].findVariable(projectVar);
+                }
+            }
+
+        } else {
+            outCol = opSchema.findVariable(unnest.getVariable());
+            positionalCol = unnest.hasPositionalVariable() ? 
opSchema.findVariable(unnest.getPositionalVariable()) : -1;
+            projectionList = JobGenHelper.projectAllVariables(opSchema);
+        }
+
+        UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, 
positionalCol, unnestingFactory,
+                projectionList, positionWriterFactory, leftOuter, 
missingWriterFactory);
         unnestRuntime.setSourceLocation(unnest.getSourceLocation());
         RecordDescriptor recDesc = 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
         builder.contributeMicroOperator(unnest, unnestRuntime, recDesc);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 2044495..f72e48b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -357,6 +357,9 @@
             buffer.append(" at " + op.getPositionalVariable());
         }
         buffer.append(" <- " + 
op.getExpressionRef().getValue().accept(exprVisitor, indent));
+        if (op.isProjectPushed()) {
+            buffer.append("with pushed projection: " + 
op.getProjectVariables());
+        }
         return null;
     }

diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
index 2d6c425..e7a3c31 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
@@ -46,7 +46,7 @@
         Mutable<ILogicalOperator> opRef2 = op.getInputs().get(0);
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
opRef2.getValue();

-        if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+        if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN && 
op2.getOperatorTag() != LogicalOperatorTag.UNNEST) {
             return false;
         }

diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectIntoDataSourceScanRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectIntoDataSourceScanRule.java
index fedc420..46d187c 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectIntoDataSourceScanRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectIntoDataSourceScanRule.java
@@ -53,7 +53,7 @@
             return false;
         DataSourceScanOperator scanOp = (DataSourceScanOperator) inputOp;
         ProjectOperator projectOp = (ProjectOperator) project;
-        scanOp.addProjectVariables(projectOp.getVariables());
+        scanOp.pushProjectionVariables(projectOp.getVariables());
         if (op.getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
             op.getInputs().set(0, project.getInputs().get(0));
         } else {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 1b42bb6..0039284 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -44,29 +44,36 @@
     private static final long serialVersionUID = 1L;

     private final int outCol;
+    private final int positionalCol;
     private final IUnnestingEvaluatorFactory unnestingFactory;
     private final boolean unnestColIsProjected;
     private final IUnnestingPositionWriterFactory positionWriterFactory;
     private final boolean leftOuter;
     private final IMissingWriterFactory missingWriterFactory;
     private int outColPos;
+    private int positionalColIndex;

     public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory 
unnestingFactory, int[] projectionList,
             boolean leftOuter, IMissingWriterFactory missingWriterFactory) {
-        this(outCol, unnestingFactory, projectionList, null, leftOuter, 
missingWriterFactory);
+        this(outCol, -1, unnestingFactory, projectionList, null, leftOuter, 
missingWriterFactory);
     }

-    public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory 
unnestingFactory, int[] projectionList,
-            IUnnestingPositionWriterFactory positionWriterFactory, boolean 
leftOuter,
+    public UnnestRuntimeFactory(int outCol, int positionalCol, 
IUnnestingEvaluatorFactory unnestingFactory,
+            int[] projectionList, IUnnestingPositionWriterFactory 
positionWriterFactory, boolean leftOuter,
             IMissingWriterFactory missingWriterFactory) {
         super(projectionList);
         this.outCol = outCol;
+        this.positionalCol = positionalCol;
         this.unnestingFactory = unnestingFactory;
         outColPos = -1;
+        positionalColIndex = -1;
         for (int f = 0; f < projectionList.length; f++) {
             if (projectionList[f] == outCol) {
                 outColPos = f;
             }
+            if (projectionList[f] == positionalCol) {
+                positionalColIndex = f;
+            }
         }
         unnestColIsProjected = outColPos >= 0;
         this.positionWriterFactory = positionWriterFactory;
@@ -134,36 +141,55 @@

             private void writeOutput(int t, int positionIndex, boolean missing)
                     throws HyracksDataException, IOException {
-                if (!unnestColIsProjected && positionWriter == null) {
-                    appendProjectionToFrame(t, projectionList);
-                    appendToFrameFromTupleBuilder(tupleBuilder);
-                    return;
+                //                if (!unnestColIsProjected && positionWriter 
== null) {
+                //                    appendProjectionToFrame(t, 
projectionList);
+                //                    
appendToFrameFromTupleBuilder(tupleBuilder);
+                //                    return;
+                //                }
+                tupleBuilder.reset();
+
+                for (int f = 0; f < projectionList.length; f++) {
+                    if (f == outColPos) {
+                        if (missing) {
+                            tupleBuilder.addField(missingBytes.getByteArray(), 
0, missingBytes.size());
+                        } else {
+                            tupleBuilder.addField(p.getByteArray(), 
p.getStartOffset(), p.getLength());
+                        }
+                    } else if (f == positionalColIndex) {
+                        if (missing) {
+                            tupleBuilder.addField(missingBytes.getByteArray(), 
0, missingBytes.size());
+                        } else {
+                            positionWriter.write(tupleBuilder.getDataOutput(), 
positionIndex);
+                            tupleBuilder.addFieldEndOffset();
+                        }
+                    } else {
+                        tupleBuilder.addField(tAccess, t, projectionList[f]);
+                    }
                 }

-                tupleBuilder.reset();
-                for (int f = 0; f < outColPos; f++) {
-                    tupleBuilder.addField(tAccess, t, f);
-                }
-                if (unnestColIsProjected) {
-                    if (missing) {
-                        tupleBuilder.addField(missingBytes.getByteArray(), 0, 
missingBytes.size());
-                    } else {
-                        tupleBuilder.addField(p.getByteArray(), 
p.getStartOffset(), p.getLength());
-                    }
-                }
-                for (int f = unnestColIsProjected ? outColPos + 1 : outColPos; 
f < (positionWriter != null
-                        ? projectionList.length - 1 : projectionList.length); 
f++) {
-                    tupleBuilder.addField(tAccess, t, f);
-                }
-                if (positionWriter != null) {
-                    // Write the positional variable
-                    if (missing) {
-                        tupleBuilder.addField(missingBytes.getByteArray(), 0, 
missingBytes.size());
-                    } else {
-                        positionWriter.write(tupleBuilder.getDataOutput(), 
positionIndex);
-                        tupleBuilder.addFieldEndOffset();
-                    }
-                }
+                //                for (int f = 0; f < outColPos; f++) {
+                //                    tupleBuilder.addField(tAccess, t, f);
+                //                }
+                //                if (unnestColIsProjected) {
+                //                    if (missing) {
+                //                        
tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
+                //                    } else {
+                //                        
tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+                //                    }
+                //                }
+                //                for (int f = unnestColIsProjected ? 
outColPos + 1 : outColPos; f < (positionWriter != null
+                //                        ? projectionList.length - 1 : 
projectionList.length); f++) {
+                //                    tupleBuilder.addField(tAccess, t, f);
+                //                }
+                //                if (positionWriter != null) {
+                //                    // Write the positional variable
+                //                    if (missing) {
+                //                        
tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
+                //                    } else {
+                //                        
positionWriter.write(tupleBuilder.getDataOutput(), positionIndex);
+                //                        tupleBuilder.addFieldEndOffset();
+                //                    }
+                //                }
                 appendToFrameFromTupleBuilder(tupleBuilder);
             }


--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19100
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I83842976792f54017eff60fcc597bdd4ec6dc8c9
Gerrit-Change-Number: 19100
Gerrit-PatchSet: 1
Gerrit-Owner: [email protected]
Gerrit-MessageType: newchange

Reply via email to