>From <[email protected]>:
[email protected] has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19100 )
Change subject: [ASTERIXDB-3531][COMP] Push projections into unnest operator
......................................................................
[ASTERIXDB-3531][COMP] Push projections into unnest operator
Change-Id: I83842976792f54017eff60fcc597bdd4ec6dc8c9
---
M
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectIntoDataSourceScanRule.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java
M
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
8 files changed, 128 insertions(+), 59 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/00/19100/1
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java
index 64e41f5..8de2959 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractScanOperator.java
@@ -25,7 +25,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import
org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-public abstract class AbstractScanOperator extends AbstractLogicalOperator {
+public abstract class AbstractScanOperator extends AbstractProjectingOperator {
protected List<LogicalVariable> variables;
public AbstractScanOperator(List<LogicalVariable> variables) {
@@ -47,6 +47,14 @@
@Override
public void recomputeSchema() {
schema = new ArrayList<>();
+
+ if (isProjectPushed()) {
+ for (LogicalVariable v : getProjectVariables()) {
+ schema.add(v);
+ }
+ return;
+ }
+
schema.addAll(inputs.get(0).getValue().getSchema());
schema.addAll(variables);
}
@@ -58,6 +66,14 @@
@Override
public void propagateVariables(IOperatorSchema target,
IOperatorSchema... sources)
throws AlgebricksException {
+
+ if (isProjectPushed()) {
+ for (LogicalVariable v : getProjectVariables()) {
+ target.addVariable(v);
+ }
+ return;
+ }
+
if (sources.length > 0) {
target.addAllVariables(sources[0]);
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java
index 5c485d5..6a1ccc8 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractUnnestNonMapOperator.java
@@ -90,6 +90,13 @@
@Override
public void propagateVariables(IOperatorSchema target,
IOperatorSchema... sources) {
+ if (isProjectPushed()) {
+ for (LogicalVariable v : getProjectVariables()) {
+ target.addVariable(v);
+ }
+ return;
+ }
+
if (sources.length > 0) {
target.addAllVariables(sources[0]);
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
index c0f71fe..c5310b7 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
@@ -18,8 +18,6 @@
*/
package org.apache.hyracks.algebricks.core.algebra.operators.logical;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
@@ -37,10 +35,6 @@
import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
public class DataSourceScanOperator extends AbstractDataSourceOperator {
- private final List<LogicalVariable> projectVars;
-
- private boolean projectPushed = false;
-
private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
private List<LogicalVariable> minFilterVars;
private List<LogicalVariable> maxFilterVars;
@@ -61,7 +55,6 @@
Mutable<ILogicalExpression> selectCondition, long outputLimit,
IProjectionFiltrationInfo projectionFiltrationInfo) {
super(variables, dataSource);
- projectVars = new ArrayList<>();
this.selectCondition = selectCondition;
this.outputLimit = outputLimit;
setProjectionFiltrationInfo(projectionFiltrationInfo);
@@ -93,19 +86,6 @@
return false;
}
- public void addProjectVariables(Collection<LogicalVariable> vars) {
- projectVars.addAll(vars);
- projectPushed = true;
- }
-
- public List<LogicalVariable> getProjectVariables() {
- return projectVars;
- }
-
- public boolean isProjectPushed() {
- return projectPushed;
- }
-
@Override
public VariablePropagationPolicy getVariablePropagationPolicy() {
return new VariablePropagationPolicy() {
@@ -115,7 +95,7 @@
if (sources.length > 0) {
target.addAllVariables(sources[0]);
}
- List<LogicalVariable> outputVariables = projectPushed ?
projectVars : variables;
+ List<LogicalVariable> outputVariables = isProjectPushed() ?
getProjectVariables() : variables;
for (LogicalVariable v : outputVariables) {
target.addVariable(v);
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
index 45360d4..589a4a7 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
import
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
@@ -64,7 +65,6 @@
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas,
IOperatorSchema outerPlanSchema)
throws AlgebricksException {
AbstractUnnestNonMapOperator unnest = (AbstractUnnestNonMapOperator)
op;
- int outCol = opSchema.findVariable(unnest.getVariable());
ILogicalExpression unnestExpr = unnest.getExpressionRef().getValue();
IExpressionRuntimeProvider expressionRuntimeProvider =
context.getExpressionRuntimeProvider();
boolean exit = false;
@@ -83,14 +83,42 @@
UnnestingFunctionCallExpression agg =
(UnnestingFunctionCallExpression) unnestExpr;
IUnnestingEvaluatorFactory unnestingFactory =
expressionRuntimeProvider.createUnnestingFunctionFactory(agg,
context.getTypeEnvironment(op.getInputs().get(0).getValue()),
inputSchemas, context);
- int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
IUnnestingPositionWriterFactory positionWriterFactory =
unnest.hasPositionalVariable() ?
context.getUnnestingPositionWriterFactory() : null;
IMissingWriterFactory missingWriterFactory = leftOuter
? JobGenHelper.getMissingWriterFactory(context,
((LeftOuterUnnestOperator) op).getMissingValue())
: null;
- UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol,
unnestingFactory, projectionList,
- positionWriterFactory, leftOuter, missingWriterFactory);
+
+ int outCol;
+ int positionalCol;
+ int[] projectionList;
+
+ if (unnest.isProjectPushed()) {
+
+ outCol = -1;
+ positionalCol = -1;
+ projectionList = new int[unnest.getProjectVariables().size()];
+ int c = 0;
+ for (LogicalVariable projectVar : unnest.getProjectVariables()) {
+ if (projectVar.equals(unnest.getVariable())) {
+ outCol = inputSchemas[0].getSize();
+ projectionList[c++] = inputSchemas[0].getSize();
+ } else if (unnest.hasPositionalVariable() &&
projectVar.equals(unnest.getPositionalVariable())) {
+ positionalCol = inputSchemas[0].getSize() + 1;
+ projectionList[c++] = inputSchemas[0].getSize() + 1;
+ } else {
+ projectionList[c++] =
inputSchemas[0].findVariable(projectVar);
+ }
+ }
+
+ } else {
+ outCol = opSchema.findVariable(unnest.getVariable());
+ positionalCol = unnest.hasPositionalVariable() ?
opSchema.findVariable(unnest.getPositionalVariable()) : -1;
+ projectionList = JobGenHelper.projectAllVariables(opSchema);
+ }
+
+ UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol,
positionalCol, unnestingFactory,
+ projectionList, positionWriterFactory, leftOuter,
missingWriterFactory);
unnestRuntime.setSourceLocation(unnest.getSourceLocation());
RecordDescriptor recDesc =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
context);
builder.contributeMicroOperator(unnest, unnestRuntime, recDesc);
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 2044495..f72e48b 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -357,6 +357,9 @@
buffer.append(" at " + op.getPositionalVariable());
}
buffer.append(" <- " +
op.getExpressionRef().getValue().accept(exprVisitor, indent));
+ if (op.isProjectPushed()) {
+ buffer.append("with pushed projection: " +
op.getProjectVariables());
+ }
return null;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
index 2d6c425..e7a3c31 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EmbedProjectRule.java
@@ -46,7 +46,7 @@
Mutable<ILogicalOperator> opRef2 = op.getInputs().get(0);
AbstractLogicalOperator op2 = (AbstractLogicalOperator)
opRef2.getValue();
- if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+ if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN &&
op2.getOperatorTag() != LogicalOperatorTag.UNNEST) {
return false;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectIntoDataSourceScanRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectIntoDataSourceScanRule.java
index fedc420..46d187c 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectIntoDataSourceScanRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectIntoDataSourceScanRule.java
@@ -53,7 +53,7 @@
return false;
DataSourceScanOperator scanOp = (DataSourceScanOperator) inputOp;
ProjectOperator projectOp = (ProjectOperator) project;
- scanOp.addProjectVariables(projectOp.getVariables());
+ scanOp.pushProjectionVariables(projectOp.getVariables());
if (op.getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
op.getInputs().set(0, project.getInputs().get(0));
} else {
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 1b42bb6..0039284 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -44,29 +44,36 @@
private static final long serialVersionUID = 1L;
private final int outCol;
+ private final int positionalCol;
private final IUnnestingEvaluatorFactory unnestingFactory;
private final boolean unnestColIsProjected;
private final IUnnestingPositionWriterFactory positionWriterFactory;
private final boolean leftOuter;
private final IMissingWriterFactory missingWriterFactory;
private int outColPos;
+ private int positionalColIndex;
public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory
unnestingFactory, int[] projectionList,
boolean leftOuter, IMissingWriterFactory missingWriterFactory) {
- this(outCol, unnestingFactory, projectionList, null, leftOuter,
missingWriterFactory);
+ this(outCol, -1, unnestingFactory, projectionList, null, leftOuter,
missingWriterFactory);
}
- public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory
unnestingFactory, int[] projectionList,
- IUnnestingPositionWriterFactory positionWriterFactory, boolean
leftOuter,
+ public UnnestRuntimeFactory(int outCol, int positionalCol,
IUnnestingEvaluatorFactory unnestingFactory,
+ int[] projectionList, IUnnestingPositionWriterFactory
positionWriterFactory, boolean leftOuter,
IMissingWriterFactory missingWriterFactory) {
super(projectionList);
this.outCol = outCol;
+ this.positionalCol = positionalCol;
this.unnestingFactory = unnestingFactory;
outColPos = -1;
+ positionalColIndex = -1;
for (int f = 0; f < projectionList.length; f++) {
if (projectionList[f] == outCol) {
outColPos = f;
}
+ if (projectionList[f] == positionalCol) {
+ positionalColIndex = f;
+ }
}
unnestColIsProjected = outColPos >= 0;
this.positionWriterFactory = positionWriterFactory;
@@ -134,36 +141,55 @@
private void writeOutput(int t, int positionIndex, boolean missing)
throws HyracksDataException, IOException {
- if (!unnestColIsProjected && positionWriter == null) {
- appendProjectionToFrame(t, projectionList);
- appendToFrameFromTupleBuilder(tupleBuilder);
- return;
+ // if (!unnestColIsProjected && positionWriter
== null) {
+ // appendProjectionToFrame(t,
projectionList);
+ //
appendToFrameFromTupleBuilder(tupleBuilder);
+ // return;
+ // }
+ tupleBuilder.reset();
+
+ for (int f = 0; f < projectionList.length; f++) {
+ if (f == outColPos) {
+ if (missing) {
+ tupleBuilder.addField(missingBytes.getByteArray(),
0, missingBytes.size());
+ } else {
+ tupleBuilder.addField(p.getByteArray(),
p.getStartOffset(), p.getLength());
+ }
+ } else if (f == positionalColIndex) {
+ if (missing) {
+ tupleBuilder.addField(missingBytes.getByteArray(),
0, missingBytes.size());
+ } else {
+ positionWriter.write(tupleBuilder.getDataOutput(),
positionIndex);
+ tupleBuilder.addFieldEndOffset();
+ }
+ } else {
+ tupleBuilder.addField(tAccess, t, projectionList[f]);
+ }
}
- tupleBuilder.reset();
- for (int f = 0; f < outColPos; f++) {
- tupleBuilder.addField(tAccess, t, f);
- }
- if (unnestColIsProjected) {
- if (missing) {
- tupleBuilder.addField(missingBytes.getByteArray(), 0,
missingBytes.size());
- } else {
- tupleBuilder.addField(p.getByteArray(),
p.getStartOffset(), p.getLength());
- }
- }
- for (int f = unnestColIsProjected ? outColPos + 1 : outColPos;
f < (positionWriter != null
- ? projectionList.length - 1 : projectionList.length);
f++) {
- tupleBuilder.addField(tAccess, t, f);
- }
- if (positionWriter != null) {
- // Write the positional variable
- if (missing) {
- tupleBuilder.addField(missingBytes.getByteArray(), 0,
missingBytes.size());
- } else {
- positionWriter.write(tupleBuilder.getDataOutput(),
positionIndex);
- tupleBuilder.addFieldEndOffset();
- }
- }
+ // for (int f = 0; f < outColPos; f++) {
+ // tupleBuilder.addField(tAccess, t, f);
+ // }
+ // if (unnestColIsProjected) {
+ // if (missing) {
+ //
tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
+ // } else {
+ //
tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+ // }
+ // }
+ // for (int f = unnestColIsProjected ?
outColPos + 1 : outColPos; f < (positionWriter != null
+ // ? projectionList.length - 1 :
projectionList.length); f++) {
+ // tupleBuilder.addField(tAccess, t, f);
+ // }
+ // if (positionWriter != null) {
+ // // Write the positional variable
+ // if (missing) {
+ //
tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
+ // } else {
+ //
positionWriter.write(tupleBuilder.getDataOutput(), positionIndex);
+ // tupleBuilder.addFieldEndOffset();
+ // }
+ // }
appendToFrameFromTupleBuilder(tupleBuilder);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19100
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I83842976792f54017eff60fcc597bdd4ec6dc8c9
Gerrit-Change-Number: 19100
Gerrit-PatchSet: 1
Gerrit-Owner: [email protected]
Gerrit-MessageType: newchange