Jianfeng Jia has submitted this change and it was merged. Change subject: Intersect the 2ndary indexes before primary search ......................................................................
Intersect the 2ndary indexes before primary search The following commits from your working branch will be included: Change-Id: Ic16c67c529ca19d8b1a5439ddef22760945fd0d7 Reviewed-on: https://asterix-gerrit.ics.uci.edu/577 Tested-by: Jenkins <[email protected]> Reviewed-by: Yingyi Bu <[email protected]> --- M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java A algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java A algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java M algebricks/algebricks-examples/piglet-example/pom.xml M algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java M hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/resources/memory/FrameManager.java M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java M hyracks/hyracks-dataflow-std/pom.xml A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java A hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java A hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/InputFrameGenerator.java A hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/MultiThreadTaskEmulator.java A hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/OutputFrameVerifier.java M hyracks/hyracks-test-support/pom.xml 35 files changed, 1,435 insertions(+), 84 deletions(-) Approvals: Yingyi Bu: Looks good to me, approved Jenkins: Verified diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java index 7e9da44..977107c 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java @@ -53,4 +53,5 @@ UPDATE, WRITE, WRITE_RESULT, + INTERSECT, } \ No newline at end of file diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java index c581e82..82d0b0e 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java @@ -73,4 +73,5 @@ UNNEST, UPDATE, WRITE_RESULT, + INTERSECT, } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java new file mode 100644 index 0000000..e64be2b --- /dev/null +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.core.algebra.operators.logical; + +import java.util.List; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.properties.FilteredVariablePropagationPolicy; +import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy; +import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; +import org.apache.hyracks.algebricks.core.algebra.typing.NonPropagatingTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; +import org.apache.hyracks.algebricks.core.config.AlgebricksConfig; + +public class IntersectOperator extends AbstractLogicalOperator { + + private final List<List<LogicalVariable>> inputVars; + private final List<LogicalVariable> outputVars; + + public IntersectOperator(List<LogicalVariable> outputVars, List<List<LogicalVariable>> inputVars) + throws AlgebricksException { + if (outputVars.size() != inputVars.get(0).size()) { + throw new AlgebricksException("The number of output variables is different with the input variable number"); + } + if (inputVars.stream().anyMatch(vlist -> vlist.size() != outputVars.size())) { + throw new AlgebricksException("The schemas of input variables are not consistent"); + } + this.outputVars = outputVars; + this.inputVars = inputVars; + } + + @Override + public LogicalOperatorTag getOperatorTag() { + return LogicalOperatorTag.INTERSECT; + } + + @Override + public void recomputeSchema() throws AlgebricksException { + schema = outputVars; + } + + @Override + public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) + throws AlgebricksException { + return false; + } + + @Override + public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException { + return visitor.visitIntersectOperator(this, arg); + } + + @Override + public boolean isMap() { + return false; + } + + @Override + public VariablePropagationPolicy getVariablePropagationPolicy() { + return new FilteredVariablePropagationPolicy(outputVars); + } + + @Override + public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException { + IVariableTypeEnvironment typeEnv = ctx.getOutputTypeEnvironment(inputs.get(0).getValue()); + + for (int i = 1; i < inputs.size(); i++) { + checkTypeConsistency(typeEnv, inputVars.get(0), ctx.getOutputTypeEnvironment(inputs.get(i).getValue()), + inputVars.get(i)); + } + + IVariableTypeEnvironment env = new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), + ctx.getMetadataProvider()); + for (int i = 0; i < outputVars.size(); i++) { + env.setVarType(outputVars.get(i), typeEnv.getVarType(inputVars.get(0).get(i))); + } + return typeEnv; + } + + public List<LogicalVariable> getOutputVars() { + return outputVars; + } + + public int getNumInput() { + return inputVars.size(); + } + + public List<LogicalVariable> getInputVariables(int inputIndex) { + return inputVars.get(inputIndex); + } + + private void checkTypeConsistency(IVariableTypeEnvironment expected, List<LogicalVariable> expectedVariables, + IVariableTypeEnvironment actual, List<LogicalVariable> actualVariables) throws AlgebricksException { + for (int i = 0; i < expectedVariables.size(); i++) { + Object expectedType = expected.getVarType(expectedVariables.get(i)); + Object actualType = actual.getVarType(actualVariables.get(i)); + if (!expectedType.equals(actualType)) { + AlgebricksConfig.ALGEBRICKS_LOGGER + .warning("Type of two variables are not equal." + expectedVariables.get(i) + " is of type: " + + expectedType + actualVariables.get(i) + " is of type: " + actualType); + } + } + } + +} diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java index b2c97c3..398b4d2 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java @@ -59,6 +59,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; @@ -475,6 +476,12 @@ } @Override + public Void visitIntersectOperator(IntersectOperator op, IOptimizationContext ctx) throws AlgebricksException { + setEmptyFDsEqClasses(op, ctx); + return null; + } + + @Override public Void visitUnnestMapOperator(UnnestMapOperator op, IOptimizationContext ctx) throws AlgebricksException { fdsEqClassesForAbstractUnnestOperator(op, ctx); return null; diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java index 7a4e7e1..eb6cd15 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java @@ -48,6 +48,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; @@ -331,6 +332,32 @@ } @Override + public Boolean visitIntersectOperator(IntersectOperator op, ILogicalOperator arg) throws AlgebricksException { + if (op.getOperatorTag() != LogicalOperatorTag.INTERSECT){ + return Boolean.FALSE; + } + IntersectOperator intersetOpArg = (IntersectOperator) copyAndSubstituteVar(op, arg); + List<LogicalVariable> variables = op.getOutputVars(); + List<LogicalVariable> variablesArg = intersetOpArg.getOutputVars(); + if (variables.size() != variablesArg.size()){ + return Boolean.FALSE; + } + if (!VariableUtilities.varListEqualUnordered(variables, variablesArg)){ + return Boolean.FALSE; + } + + if (op.getNumInput() != intersetOpArg.getNumInput()){ + return Boolean.FALSE; + } + for (int i = 0; i < op.getNumInput(); i++){ + if (!VariableUtilities.varListEqualUnordered(op.getInputVariables(i), intersetOpArg.getInputVariables(i))){ + return Boolean.FALSE; + } + } + return Boolean.TRUE; + } + + @Override public Boolean visitUnnestOperator(UnnestOperator op, ILogicalOperator arg) throws AlgebricksException { AbstractLogicalOperator aop = (AbstractLogicalOperator) arg; if (aop.getOperatorTag() != LogicalOperatorTag.UNNEST) diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java index c46ffde..7b0b944 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java @@ -49,6 +49,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; @@ -198,6 +199,13 @@ public Void visitUnionOperator(UnionAllOperator op, ILogicalOperator arg) throws AlgebricksException { mapChildren(op, arg); mapVariablesForUnion(op, arg); + return null; + } + + @Override + public Void visitIntersectOperator(IntersectOperator op, ILogicalOperator arg) throws AlgebricksException { + mapChildren(op, arg); + mapVariablesForIntersect(op, arg); return null; } @@ -428,6 +436,22 @@ } } + private void mapVariablesForIntersect(IntersectOperator op, ILogicalOperator arg) { + IntersectOperator opArg = (IntersectOperator) arg; + if (op.getNumInput() != opArg.getNumInput()){ + return; + } + for (int i = 0; i < op.getNumInput(); i++){ + for (int j = 0; j < op.getInputVariables(i).size(); j++){ + if (!varEquivalent(op.getInputVariables(i).get(j), opArg.getInputVariables(i).get(j))){ + return; + } + } + + } + mapVariables(op.getOutputVars(), opArg.getOutputVars()); + } + private boolean varEquivalent(LogicalVariable left, LogicalVariable right) { if (variableMapping.get(right) == null) return false; diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java index b3f6639..9d83ae5 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java @@ -44,6 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; @@ -464,6 +465,33 @@ } @Override + public ILogicalOperator visitIntersectOperator(IntersectOperator op, ILogicalOperator arg) + throws AlgebricksException { + List<List<LogicalVariable>> liveVarsInInputs = getLiveVarsInInputs(op); + List<LogicalVariable> outputCopy = new ArrayList<>(); + for (LogicalVariable var : op.getOutputVars()){ + outputCopy.add(deepCopyVariable(var)); + } + IntersectOperator opCopy = new IntersectOperator(outputCopy, liveVarsInInputs); + deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy); + return opCopy; + } + + private List<List<LogicalVariable>> getLiveVarsInInputs(AbstractLogicalOperator op) throws AlgebricksException { + List<Mutable<ILogicalOperator>> copiedInputs = new ArrayList<>(); + for (Mutable<ILogicalOperator> childRef : op.getInputs()) { + copiedInputs.add(deepCopyOperatorReference(childRef, null)); + } + List<List<LogicalVariable>> liveVarsInInputs = new ArrayList<>(); + for (Mutable<ILogicalOperator> inputOpRef : copiedInputs) { + List<LogicalVariable> liveVars = new ArrayList<>(); + VariableUtilities.getLiveVariables(inputOpRef.getValue(), liveVars); + liveVarsInInputs.add(liveVars); + } + return liveVarsInInputs; + } + + @Override public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, ILogicalOperator arg) throws AlgebricksException { UnnestMapOperator opCopy = new UnnestMapOperator(deepCopyVariableList(op.getVariables()), diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java index 4e5b13e..9de0992 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java @@ -39,6 +39,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; @@ -229,6 +230,11 @@ } @Override + public Void visitIntersectOperator(IntersectOperator op, IOptimizationContext arg) throws AlgebricksException { + return null; + } + + @Override public Void visitUnnestMapOperator(UnnestMapOperator op, IOptimizationContext arg) throws AlgebricksException { // TODO Auto-generated method stub return null; diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java index 2e402fc..8d436d1 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java @@ -44,6 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; @@ -206,6 +207,16 @@ } @Override + public ILogicalOperator visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException { + List<LogicalVariable> outputVar = new ArrayList<>(op.getOutputVars()); + List<List<LogicalVariable>> inputVars = new ArrayList<>(op.getNumInput()); + for(int i = 0; i < op.getNumInput(); i++){ + inputVars.add(new ArrayList<>(op.getInputVariables(i))); + } + return new IntersectOperator(outputVar, inputVars); + } + + @Override public ILogicalOperator visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException { return new UnnestOperator(op.getVariable(), deepCopyExpressionRef(op.getExpressionRef()), op.getPositionalVariable(), op.getPositionalVariableType(), op.getPositionWriter()); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java index 8df772b..d23ff94 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java @@ -44,6 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; @@ -204,6 +205,12 @@ } @Override + public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException { + producedVariables.addAll(op.getOutputVars()); + return null; + } + + @Override public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException { producedVariables.addAll(op.getVariables()); return null; diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java index b488df1..5c6d3c3 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java @@ -44,6 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; @@ -230,6 +231,12 @@ } @Override + public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException { + VariableUtilities.getProducedVariables(op, schemaVariables); + return null; + } + + @Override public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException { if (op.propagatesInput()) { standardLayout(op); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java index 91ff073..d8e25f7 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java @@ -42,6 +42,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; @@ -303,6 +304,24 @@ } @Override + public Void visitIntersectOperator(IntersectOperator op, Pair<LogicalVariable, LogicalVariable> pair) + throws AlgebricksException { + for (int i = 0; i < op.getOutputVars().size(); i++) { + if (op.getOutputVars().get(i).equals(pair.first)){ + op.getOutputVars().set(i, pair.second); + } + } + for(int i = 0; i < op.getNumInput(); i++){ + for (int j = 0; j < op.getInputVariables(i).size(); j++){ + if (op.getInputVariables(i).get(j).equals(pair.first)){ + op.getInputVariables(i).set(j, pair.second); + } + } + } + return null; + } + + @Override public Void visitUnnestMapOperator(UnnestMapOperator op, Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException { List<LogicalVariable> variables = op.getVariables(); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java index cfc57c2..ef02feb 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java @@ -43,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; @@ -297,6 +298,18 @@ } @Override + public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException { + for (int i = 0; i < op.getNumInput(); i++) { + for (LogicalVariable var : op.getInputVariables(i)) { + if (!usedVariables.contains(var)) { + usedVariables.add(var); + } + } + } + return null; + } + + @Override public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) { op.getExpressionRef().getValue().getUsedVariables(usedVariables); if (op.getAdditionalFilteringExpressions() != null) { diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java new file mode 100644 index 0000000..b6d0f1f --- /dev/null +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; +import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; +import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.std.intersect.IntersectOperatorDescriptor; + +public class IntersectPOperator extends AbstractPhysicalOperator { + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.INTERSECT; + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop, + IPhysicalPropertiesVector reqdByParent) { + IntersectOperator intersectOp = (IntersectOperator) iop; + StructuralPropertiesVector[] pv = new StructuralPropertiesVector[intersectOp.getNumInput()]; + for (int i = 0; i < intersectOp.getNumInput(); i++) { + List<ILocalStructuralProperty> localProps = new ArrayList<>(); + List<OrderColumn> orderColumns = new ArrayList<>(); + for (LogicalVariable column : intersectOp.getInputVariables(i)) { + orderColumns.add(new OrderColumn(column, OrderOperator.IOrder.OrderKind.ASC)); + } + localProps.add(new LocalOrderProperty(orderColumns)); + IPartitioningProperty pp = null; + if (intersectOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) { + pp = new RandomPartitioningProperty(null); + } + pv[i] = new StructuralPropertiesVector(pp, localProps); + } + return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION); + } + + @Override + public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) + throws AlgebricksException { + IntersectOperator op = (IntersectOperator) iop; + IPartitioningProperty pp = op.getInputs().get(0).getValue().getDeliveredPhysicalProperties() + .getPartitioningProperty(); + + HashMap<LogicalVariable, LogicalVariable> varMaps = new HashMap<>(op.getOutputVars().size()); + for (int i = 0; i < op.getOutputVars().size(); i++) { + varMaps.put(op.getInputVariables(0).get(i), op.getOutputVars().get(i)); + } + pp.substituteColumnVars(varMaps); + + List<ILocalStructuralProperty> propsLocal = new ArrayList<>(); + List<OrderColumn> orderColumns = new ArrayList<>(); + for (LogicalVariable var : op.getOutputVars()) { + orderColumns.add(new OrderColumn(var, OrderOperator.IOrder.OrderKind.ASC)); + } + propsLocal.add(new LocalOrderProperty(orderColumns)); + deliveredProperties = new StructuralPropertiesVector(pp, propsLocal); + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + // logical op should have checked all the mismatch issues. + IntersectOperator logicalOp = (IntersectOperator) op; + int nInput = logicalOp.getNumInput(); + int[][] compareFields = new int[nInput][]; + + IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories( + logicalOp.getInputVariables(0), context.getTypeEnvironment(op), context); + + INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider(); + INormalizedKeyComputerFactory nkcf = null; + + if (nkcfProvider != null) { + Object type = context.getTypeEnvironment(op).getVarType(logicalOp.getInputVariables(0).get(0)); + if (type != null) { + nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, true); + } + } + + for (int i = 0; i < logicalOp.getNumInput(); i++) { + compareFields[i] = JobGenHelper.variablesToFieldIndexes(logicalOp.getInputVariables(i), inputSchemas[i]); + } + + IOperatorDescriptorRegistry spec = builder.getJobSpec(); + RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, + context); + + IntersectOperatorDescriptor opDescriptor = null; + try { + opDescriptor = new IntersectOperatorDescriptor(spec, nInput, compareFields, nkcf, comparatorFactories, + recordDescriptor); + } catch (HyracksException e) { + throw new AlgebricksException(e); + } + + contributeOpDesc(builder, (AbstractLogicalOperator) op, opDescriptor); + for (int i = 0; i < op.getInputs().size(); i++) { + builder.contributeGraphEdge(op.getInputs().get(i).getValue(), 0, op, i); + } + } + + @Override + public boolean isMicroOperator() { + return false; + } + + @Override + public boolean expensiveThanMaterialization() { + return false; + } +} diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java index 4d51bf0..2c407b7 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java @@ -75,18 +75,6 @@ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) throws AlgebricksException { - UnionAllOperator unionOp = (UnionAllOperator) op; - int n = unionOp.getVariableMappings().size(); - int[] leftColumns = new int[n]; - int[] rightColumns = new int[n]; - int i = 0; - for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> t : unionOp.getVariableMappings()) { - int posLeft = inputSchemas[0].findVariable(t.first); - leftColumns[i] = posLeft; - int posRight = inputSchemas[1].findVariable(t.second); - rightColumns[i] = posRight; - ++i; - } IOperatorDescriptorRegistry spec = builder.getJobSpec(); RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java index e310ff9..0514fbd 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java @@ -42,6 +42,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; @@ -253,6 +254,36 @@ } @Override + public String visitIntersectOperator(IntersectOperator op, Integer indent) throws AlgebricksException { + StringBuilder builder = new StringBuilder(); + addIndent(builder, indent).append("intersect ("); + + builder.append('['); + for (int i = 0; i < op.getOutputVars().size(); i++) { + if (i > 0) { + builder.append(", "); + } + builder.append(op.getOutputVars().get(i)); + } + builder.append("] <- ["); + for (int i = 0; i < op.getNumInput(); i++) { + if (i > 0) { + builder.append(", "); + } + builder.append('['); + for (int j = 0; j < op.getInputVariables(i).size(); j++) { + if (j > 0) { + builder.append(", "); + } + builder.append(op.getInputVariables(i).get(j)); + } + builder.append(']'); + } + builder.append("])"); + return builder.toString(); + } + + @Override public String visitUnnestOperator(UnnestOperator op, Integer indent) throws AlgebricksException { StringBuilder buffer = new StringBuilder(); addIndent(buffer, indent).append("unnest " + op.getVariable()); diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java index 5926a20..f3ba030 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java @@ -57,4 +57,8 @@ this.domain = domain; } + @Override + public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) { + } + } \ No newline at end of file diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java index 3142d10..89ac374 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java @@ -26,11 +26,11 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; public interface IPartitioningProperty extends IStructuralProperty { - public enum PartitioningType { + enum PartitioningType { UNPARTITIONED, RANDOM, BROADCAST, UNORDERED_PARTITIONED, ORDERED_PARTITIONED } - static final INodeDomain DOMAIN_FOR_UNPARTITIONED_DATA = new INodeDomain() { + INodeDomain DOMAIN_FOR_UNPARTITIONED_DATA = new INodeDomain() { @Override public boolean sameAs(INodeDomain domain) { return domain == this; @@ -42,7 +42,7 @@ } }; - public static final IPartitioningProperty UNPARTITIONED = new IPartitioningProperty() { + IPartitioningProperty UNPARTITIONED = new IPartitioningProperty() { @Override public PartitioningType getPartitioningType() { @@ -72,14 +72,20 @@ public void setNodeDomain(INodeDomain domain) { throw new IllegalStateException(); } + + @Override + public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> variableMap) { + } }; - public abstract PartitioningType getPartitioningType(); + PartitioningType getPartitioningType(); - public abstract void normalize(Map<LogicalVariable, EquivalenceClass> equivalenceClasses, + void normalize(Map<LogicalVariable, EquivalenceClass> equivalenceClasses, List<FunctionalDependency> fds); - public abstract INodeDomain getNodeDomain(); + INodeDomain getNodeDomain(); - public abstract void setNodeDomain(INodeDomain domain); + void setNodeDomain(INodeDomain domain); + + void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap); } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java index f28bc56..5808da1 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java @@ -81,4 +81,13 @@ this.domain = domain; } + @Override + public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) { + for (OrderColumn orderColumn : orderColumns){ + if (varMap.containsKey(orderColumn.getColumn())){ + orderColumn.setColumn(varMap.get(orderColumn.getColumn())); + } + } + } + } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java index 24fe8e7..917fdd8 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java @@ -62,4 +62,8 @@ this.domain = domain; } + @Override + public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) { + } + } \ No newline at end of file diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java index de3b102..17e0cb3 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java @@ -65,4 +65,13 @@ this.domain = domain; } + @Override + public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) { + for (Map.Entry<LogicalVariable, LogicalVariable> var : varMap.entrySet()){ + if (columnSet.remove(var.getKey())){ + columnSet.add(var.getValue()); + } + } + } + } diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java index b5099b1..0aa6676 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java @@ -226,6 +226,12 @@ return op.accept(visitor, null); } + public static ILogicalOperator deepCopyWithExcutionMode(ILogicalOperator op) throws AlgebricksException { + OperatorDeepCopyVisitor visitor = new OperatorDeepCopyVisitor(); + AbstractLogicalOperator newOp = (AbstractLogicalOperator) op.accept(visitor, null); + newOp.setExecutionMode(op.getExecutionMode()); + return newOp; + } /** * Compute type environment of a newly generated operator {@code op} and its input. * diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java index 53c8b69..6509e2a 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java @@ -32,6 +32,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; @@ -95,6 +96,8 @@ public R visitUnionOperator(UnionAllOperator op, T arg) throws AlgebricksException; + public R visitIntersectOperator(IntersectOperator op, T arg) throws AlgebricksException; + public R visitUnnestOperator(UnnestOperator op, T arg) throws AlgebricksException; public R visitOuterUnnestOperator(OuterUnnestOperator op, T arg) throws AlgebricksException; diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java index 186ac6f..1a61f2e 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java @@ -76,18 +76,18 @@ int n = op.getInputs().size(); IOperatorSchema[] schemas = new IOperatorSchema[n]; int i = 0; - for (Mutable<ILogicalOperator> opRef2 : op.getInputs()) { - List<Mutable<ILogicalOperator>> parents = operatorVisitedToParents.get(opRef2); + for (Mutable<ILogicalOperator> opChild : op.getInputs()) { + List<Mutable<ILogicalOperator>> parents = operatorVisitedToParents.get(opChild); if (parents == null) { parents = new ArrayList<Mutable<ILogicalOperator>>(); - operatorVisitedToParents.put(opRef2, parents); + operatorVisitedToParents.put(opChild, parents); parents.add(opRef); - compileOpRef(opRef2, spec, builder, outerPlanSchema); - schemas[i++] = context.getSchema(opRef2.getValue()); + compileOpRef(opChild, spec, builder, outerPlanSchema); + schemas[i++] = context.getSchema(opChild.getValue()); } else { if (!parents.contains(opRef)) parents.add(opRef); - schemas[i++] = context.getSchema(opRef2.getValue()); + schemas[i++] = context.getSchema(opChild.getValue()); continue; } } diff --git a/algebricks/algebricks-examples/piglet-example/pom.xml b/algebricks/algebricks-examples/piglet-example/pom.xml index ae2ec51..20bc586 100644 --- a/algebricks/algebricks-examples/piglet-example/pom.xml +++ b/algebricks/algebricks-examples/piglet-example/pom.xml @@ -48,6 +48,7 @@ <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> + <version>1.10</version> <executions> <execution> <id>add-source</id> diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java index e99d126..73bba8f 100644 --- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java +++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java @@ -64,6 +64,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.InsertDeleteUpsertPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.IntersectPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator; @@ -249,7 +250,10 @@ op.setPhysicalOperator(new UnionAllPOperator()); break; } - + case INTERSECT: { + op.setPhysicalOperator(new IntersectPOperator()); + break; + } case UNNEST: { op.setPhysicalOperator(new UnnestPOperator()); break; diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/resources/memory/FrameManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/resources/memory/FrameManager.java index 3d3ed8d..8df1f38 100644 --- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/resources/memory/FrameManager.java +++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/resources/memory/FrameManager.java @@ -52,7 +52,8 @@ ByteBuffer buffer = ByteBuffer.allocate(bytes); if (bytes / minFrameSize > FrameConstants.MAX_NUM_MINFRAME) { throw new HyracksDataException( - "Unable to allocate frame larger than:" + FrameConstants.MAX_NUM_MINFRAME + " bytes"); + "Unable to allocate frame larger than:" + FrameConstants.MAX_NUM_MINFRAME * minFrameSize + + " bytes"); } FrameHelper.serializeFrameSize(buffer, (byte) (bytes / minFrameSize)); return (ByteBuffer) buffer.clear(); @@ -74,8 +75,8 @@ buffer.position(pos); if (newSizeInBytes / minFrameSize > FrameConstants.MAX_NUM_MINFRAME) { - throw new HyracksDataException("Unable to allocate frame of size bigger than MinFrameSize * " - + FrameConstants.MAX_NUM_MINFRAME); + throw new HyracksDataException("Unable to allocate frame of size bigger than: " + + FrameConstants.MAX_NUM_MINFRAME * minFrameSize + " bytes"); } FrameHelper.serializeFrameSize(buffer, (byte) (newSizeInBytes / minFrameSize)); return buffer; diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java index 21a7a71..e62e9e7 100644 --- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java +++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java @@ -49,18 +49,22 @@ leftOverSize = 0; } + /** + * Reset frame states and copy the left over data into the new frame + * + * @param frame + * @throws HyracksDataException + */ + public void resetWithLeftOverData(IFrame frame) throws HyracksDataException { + super.reset(frame, true); + copyLeftOverDataFromeBufferToFrame(); + } + @Override public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException { super.write(outWriter, clearFrame); if (clearFrame) { - if (leftOverSize > 0) { - if (!canHoldNewTuple(0, leftOverSize)) { - throw new HyracksDataException( - "The given frame can not be extended to insert the leftover data from the last record"); - } - System.arraycopy(cachedLeftOverFields, 0, array, tupleDataEndOffset, leftOverSize); - leftOverSize = 0; - } + copyLeftOverDataFromeBufferToFrame(); } } @@ -85,13 +89,13 @@ return true; } else { if (currentField > 0) { - copyLeftOverData(); + copyLeftOverDataFromFrameToBuffer(); } return false; } } - private void copyLeftOverData() { + private void copyLeftOverDataFromFrameToBuffer() { leftOverSize = lastFieldEndOffset + fieldCount * 4; if (cachedLeftOverFields == null || cachedLeftOverFields.length < leftOverSize) { cachedLeftOverFields = new byte[leftOverSize]; @@ -99,6 +103,17 @@ System.arraycopy(array, tupleDataEndOffset, cachedLeftOverFields, 0, leftOverSize); } + private void copyLeftOverDataFromeBufferToFrame() throws HyracksDataException { + if (leftOverSize > 0) { + if (!canHoldNewTuple(0, leftOverSize)) { + throw new HyracksDataException( + "The given frame can not be extended to insert the leftover data from the last record"); + } + System.arraycopy(cachedLeftOverFields, 0, array, tupleDataEndOffset, leftOverSize); + leftOverSize = 0; + } + } + public boolean appendField(IFrameTupleAccessor fta, int tIndex, int fIndex) throws HyracksDataException { int startOffset = fta.getTupleStartOffset(tIndex); int fStartOffset = fta.getFieldStartOffset(tIndex, fIndex); diff --git a/hyracks/hyracks-dataflow-std/pom.xml b/hyracks/hyracks-dataflow-std/pom.xml index 949ea38..b01789c 100644 --- a/hyracks/hyracks-dataflow-std/pom.xml +++ b/hyracks/hyracks-dataflow-std/pom.xml @@ -17,52 +17,52 @@ ! under the License. !--> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <artifactId>hyracks-dataflow-std</artifactId> - <name>hyracks-dataflow-std</name> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <artifactId>hyracks-dataflow-std</artifactId> + <name>hyracks-dataflow-std</name> - <parent> - <groupId>org.apache.hyracks</groupId> - <artifactId>hyracks</artifactId> - <version>0.2.17-SNAPSHOT</version> - </parent> + <parent> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks</artifactId> + <version>0.2.17-SNAPSHOT</version> + </parent> - <licenses> - <license> - <name>Apache License, Version 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> - <distribution>repo</distribution> - <comments>A business-friendly OSS license</comments> - </license> - </licenses> + <licenses> + <license> + <name>Apache License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + <distribution>repo</distribution> + <comments>A business-friendly OSS license</comments> + </license> + </licenses> - - <dependencies> - <dependency> - <groupId>org.apache.hyracks</groupId> - <artifactId>hyracks-api</artifactId> - <version>0.2.17-SNAPSHOT</version> - <type>jar</type> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>org.apache.hyracks</groupId> - <artifactId>hyracks-dataflow-common</artifactId> - <version>0.2.17-SNAPSHOT</version> - <type>jar</type> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hyracks</groupId> - <artifactId>hyracks-control-nc</artifactId> - <version>0.2.17-SNAPSHOT</version> - <scope>test</scope> - </dependency> - </dependencies> + <dependencies> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-api</artifactId> + <version>0.2.17-SNAPSHOT</version> + <type>jar</type> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-dataflow-common</artifactId> + <version>0.2.17-SNAPSHOT</version> + <type>jar</type> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-control-nc</artifactId> + <version>0.2.17-SNAPSHOT</version> + <scope>test</scope> + </dependency> + </dependencies> </project> diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java new file mode 100644 index 0000000..98807eb --- /dev/null +++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.std.intersect; + +import java.nio.ByteBuffer; +import java.util.BitSet; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.ActivityId; +import org.apache.hyracks.api.dataflow.IActivity; +import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; +import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable; + +/** + * This intersection operator is to get the common elements from multiple way inputs. + * It will only produce the projected fields which are used for comparison. + */ +public class IntersectOperatorDescriptor extends AbstractOperatorDescriptor { + + private final int[][] projectFields; + private final INormalizedKeyComputerFactory firstKeyNormalizerFactory; + private final IBinaryComparatorFactory[] comparatorFactory; + + /** + * @param spec + * @param nInputs Number of inputs + * @param compareAndProjectFields The project field list of each input. + * All the fields order should be the same with the comparatorFactories + * @param firstKeyNormalizerFactory Normalizer for the first comparison key. + * @param comparatorFactories A list of comparators for each field + * @param recordDescriptor + * @throws HyracksException + */ + public IntersectOperatorDescriptor(IOperatorDescriptorRegistry spec, int nInputs, int[][] compareAndProjectFields, + INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories, + RecordDescriptor recordDescriptor) throws HyracksException { + super(spec, nInputs, 1); + recordDescriptors[0] = recordDescriptor; + + validateParameters(compareAndProjectFields, comparatorFactories); + + this.projectFields = compareAndProjectFields; + this.firstKeyNormalizerFactory = firstKeyNormalizerFactory; + this.comparatorFactory = comparatorFactories; + } + + private void validateParameters(int[][] compareAndProjectFields, IBinaryComparatorFactory[] comparatorFactories) + throws HyracksException { + + int firstLength = compareAndProjectFields[0].length; + for (int[] fields : compareAndProjectFields) { + if (fields.length != firstLength) { + throw new HyracksException("The given input comparison fields is not equal"); + } + for (int fid : fields) { + if (fid < 0) { + throw new HyracksException("Invalid field index in given comparison fields array"); + } + } + } + + if (firstLength != comparatorFactories.length) { + throw new HyracksException("The size of given fields is not equal with the number of comparators"); + } + } + + @Override + public void contributeActivities(IActivityGraphBuilder builder) { + IActivity intersectActivity = new IntersectActivity(new ActivityId(getOperatorId(), 0)); + builder.addActivity(this, intersectActivity); + for (int i = 0; i < getInputArity(); i++) { + builder.addSourceEdge(i, intersectActivity, i); + } + builder.addTargetEdge(0, intersectActivity, 0); + } + + private class IntersectActivity extends AbstractActivityNode { + + public IntersectActivity(ActivityId activityId) { + super(activityId); + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) + throws HyracksDataException { + RecordDescriptor[] inputRecordDesc = new RecordDescriptor[inputArity]; + for (int i = 0; i < inputRecordDesc.length; i++) { + inputRecordDesc[i] = recordDescProvider.getInputRecordDescriptor(getActivityId(), i); + } + return new IntersectOperatorNodePushable(ctx, inputArity, inputRecordDesc, projectFields, + firstKeyNormalizerFactory, comparatorFactory); + } + } + + public static class IntersectOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable { + + private enum ACTION {FAILED, CLOSE} + + private final int inputArity; + private final int[][] projectFields; + private final BitSet consumed; + private final int[] tupleIndexMarker; + private final FrameTupleAccessor[] refAccessor; + private final FrameTupleAppender appender; + + private final INormalizedKeyComputer firstKeyNormalizerComputer; + private final IBinaryComparator[] comparators; + + private boolean done = false; + + public IntersectOperatorNodePushable(IHyracksTaskContext ctx, int inputArity, + RecordDescriptor[] inputRecordDescriptors, int[][] projectFields, + INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactory) + throws HyracksDataException { + + this.inputArity = inputArity; + this.projectFields = projectFields; + this.firstKeyNormalizerComputer = + firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer(); + + comparators = new IBinaryComparator[projectFields[0].length]; + for (int i = 0; i < comparators.length; i++) { + comparators[i] = comparatorFactory[i].createBinaryComparator(); + } + + appender = new FrameTupleAppender(new VSizeFrame(ctx)); + + refAccessor = new FrameTupleAccessor[inputArity]; + for (int i = 0; i < inputArity; i++) { + refAccessor[i] = new FrameTupleAccessor(inputRecordDescriptors[i]); + } + + consumed = new BitSet(inputArity); + consumed.set(0, inputArity); + tupleIndexMarker = new int[inputArity]; + } + + @Override + public int getInputArity() { + return inputArity; + } + + @Override + public IFrameWriter getInputFrameWriter(final int index) { + return new IFrameWriter() { + @Override + public void open() throws HyracksDataException { + if (index == 0) { + writer.open(); + } + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + synchronized (IntersectOperatorNodePushable.this) { + if (done) { + return; + } + refAccessor[index].reset(buffer); + tupleIndexMarker[index] = 0; + consumed.clear(index); + if (index != 0) { + if (allInputArrived()) { + IntersectOperatorNodePushable.this.notifyAll(); + } + while (!consumed.get(index) && !done) { + waitOrHyracksException(); + } + } else { //(index == 0) + while (!consumed.get(0)) { + while (!allInputArrived() && !done) { + waitOrHyracksException(); + } + if (done) { + break; + } + intersectAllInputs(); + IntersectOperatorNodePushable.this.notifyAll(); + } + } + } + } + + private void waitOrHyracksException() throws HyracksDataException { + try { + IntersectOperatorNodePushable.this.wait(); + } catch (InterruptedException e) { + throw new HyracksDataException(e); + } + } + + private boolean allInputArrived() { + return consumed.cardinality() == 0; + } + + private void intersectAllInputs() throws HyracksDataException { + do { + int maxInput = findMaxInput(); + int match = 1; + boolean needToUpdateMax = false; + for (int i = 0; i < inputArity; i++) { + if (i == maxInput) { + continue; + } + while (tupleIndexMarker[i] < refAccessor[i].getTupleCount()) { + int cmp = compare(i, refAccessor[i], tupleIndexMarker[i], maxInput, + refAccessor[maxInput], tupleIndexMarker[maxInput]); + if (cmp == 0) { + match++; + break; + } else if (cmp < 0) { + tupleIndexMarker[i]++; + } else { + needToUpdateMax = true; + break; + } + } + + if (tupleIndexMarker[i] >= refAccessor[i].getTupleCount()) { + consumed.set(i); + } + } + if (match == inputArity) { + FrameUtils.appendProjectionToWriter(writer, appender, refAccessor[maxInput], + tupleIndexMarker[maxInput], projectFields[maxInput]); + for (int i = 0; i < inputArity; i++) { + tupleIndexMarker[i]++; + if (tupleIndexMarker[i] >= refAccessor[i].getTupleCount()) { + consumed.set(i); + } + } + } else if (needToUpdateMax) { + tupleIndexMarker[maxInput]++; + if (tupleIndexMarker[maxInput] >= refAccessor[maxInput].getTupleCount()) { + consumed.set(maxInput); + } + } + + } while (consumed.nextSetBit(0) < 0); + appender.write(writer, true); + } + + private int compare(int input1, FrameTupleAccessor frameTupleAccessor1, int tid1, int input2, + FrameTupleAccessor frameTupleAccessor2, int tid2) throws HyracksDataException { + int firstNorm1 = getFirstNorm(input1, frameTupleAccessor1, tid1); + int firstNorm2 = getFirstNorm(input2, frameTupleAccessor2, tid2); + + if (firstNorm1 < firstNorm2) { + return -1; + } else if (firstNorm1 > firstNorm2) { + return 1; + } + + for (int i = 0; i < comparators.length; i++) { + int cmp = comparators[i].compare(frameTupleAccessor1.getBuffer().array(), + frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, projectFields[input1][i]), + frameTupleAccessor1.getFieldLength(tid1, projectFields[input1][i]), + frameTupleAccessor2.getBuffer().array(), + frameTupleAccessor2.getAbsoluteFieldStartOffset(tid2, projectFields[input2][i]), + frameTupleAccessor2.getFieldLength(tid2, projectFields[input2][i])); + + if (cmp != 0) { + return cmp; + } + } + return 0; + } + + private int getFirstNorm(int inputId1, FrameTupleAccessor frameTupleAccessor1, int tid1) { + return firstKeyNormalizerComputer == null ? + 0 : + firstKeyNormalizerComputer.normalize(frameTupleAccessor1.getBuffer().array(), + frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, projectFields[inputId1][0]), + frameTupleAccessor1.getFieldLength(tid1, projectFields[inputId1][0])); + } + + private int findMaxInput() throws HyracksDataException { + int max = 0; + for (int i = 1; i < inputArity; i++) { + int cmp = compare(max, refAccessor[max], tupleIndexMarker[max], i, refAccessor[i], + tupleIndexMarker[i]); + if (cmp < 0) { + max = i; + } + } + return max; + } + + @Override + public void fail() throws HyracksDataException { + clearStateWith(ACTION.FAILED); + } + + @Override + public void close() throws HyracksDataException { + clearStateWith(ACTION.CLOSE); + } + + private void clearStateWith(ACTION action) throws HyracksDataException { + synchronized (IntersectOperatorNodePushable.this) { + if (index == 0) { + doAction(action); + } + if (done) { + return; + } + consumed.set(index); + refAccessor[index] = null; + done = true; + IntersectOperatorNodePushable.this.notifyAll(); + } + } + + private void doAction(ACTION action) throws HyracksDataException { + switch (action) { + case CLOSE: + writer.close(); + break; + case FAILED: + writer.fail(); + break; + } + } + + }; + } + } +} diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java new file mode 100644 index 0000000..26c83ab --- /dev/null +++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.tests.unit; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.OperatorDescriptorId; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; +import org.apache.hyracks.data.std.primitive.IntegerPointable; +import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; +import org.apache.hyracks.dataflow.std.intersect.IntersectOperatorDescriptor; +import org.apache.hyracks.tests.util.InputFrameGenerator; +import org.apache.hyracks.tests.util.MultiThreadTaskEmulator; +import org.apache.hyracks.tests.util.OutputFrameVerifier; +import org.apache.hyracks.test.support.TestUtils; +import org.junit.Before; +import org.junit.Test; + +public class IntersectOperatorDescriptorTest { + + IOperatorDescriptorRegistry mockRegistry = when( + mock(IOperatorDescriptorRegistry.class).createOperatorDescriptorId(any())) + .thenReturn(new OperatorDescriptorId(1)).getMock(); + MultiThreadTaskEmulator multiThreadTaskEmulator = new MultiThreadTaskEmulator(); + InputFrameGenerator frameGenerator = new InputFrameGenerator(256); + IHyracksTaskContext ctx = TestUtils.create(256); + + int nInputs; + int nProjectFields; + int[][] compareFields; + RecordDescriptor[] inputRecordDescriptor; + INormalizedKeyComputerFactory normalizedKeyFactory; + IBinaryComparatorFactory[] comparatorFactory; + RecordDescriptor outRecordDescriptor; + + protected void initializeParameters() { + compareFields = new int[nInputs][]; + + inputRecordDescriptor = new RecordDescriptor[nInputs]; + + normalizedKeyFactory = null; + comparatorFactory = new IBinaryComparatorFactory[] { + PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY), + PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) + }; + + for (int i = 0; i < nInputs; i++) { + compareFields[i] = new int[nProjectFields]; + for (int f = 0; f < nProjectFields; f++) { + compareFields[i][f] = f; + } + } + for (int i = 0; i < nInputs; i++) { + inputRecordDescriptor[i] = new RecordDescriptor(new ISerializerDeserializer[] { + IntegerSerializerDeserializer.INSTANCE, + IntegerSerializerDeserializer.INSTANCE, + IntegerSerializerDeserializer.INSTANCE + }); + } + + outRecordDescriptor = new RecordDescriptor(new ISerializerDeserializer[] { + IntegerSerializerDeserializer.INSTANCE, + IntegerSerializerDeserializer.INSTANCE + }); + } + + @Before + public void setUpInput() { + nInputs = 3; + nProjectFields = 2; + initializeParameters(); + } + + @Test + public void testNormalOperatorInitialization() throws HyracksException { + + IntersectOperatorDescriptor operatorDescriptor = new IntersectOperatorDescriptor(mockRegistry, nInputs, + compareFields, normalizedKeyFactory, comparatorFactory, outRecordDescriptor); + + assertEquals(nInputs, operatorDescriptor.getInputArity()); + } + + @Test + public void testCommonIntersect() throws Exception { + List<Object[]> answer = new ArrayList<>(); + List<IFrame>[] inputFrames = new ArrayList[nInputs]; + prepareCommonDataFrame(inputFrames, answer); + executeAndVerifyResult(inputFrames, answer); + } + + @Test + public void testNullOutputIntersect() throws Exception { + List<Object[]> answer = new ArrayList<>(); + List<IFrame>[] inputFrames = new ArrayList[nInputs]; + prepareNullResultDataFrame(inputFrames, answer); + executeAndVerifyResult(inputFrames, answer); + } + + @Test + public void testOneInputIsVeryShortIntersect() throws Exception { + List<Object[]> answer = new ArrayList<>(); + List<IFrame>[] inputFrames = new ArrayList[nInputs]; + prepareOneInputIsVeryShortDataFrame(inputFrames, answer); + executeAndVerifyResult(inputFrames, answer); + } + + @Test + public void testAllSameInputIntersect() throws Exception { + List<Object[]> answer = new ArrayList<>(); + List<IFrame>[] inputFrames = new ArrayList[nInputs]; + prepareAllSameInputDataFrame(inputFrames, answer); + executeAndVerifyResult(inputFrames, answer); + } + + @Test + public void testOnlyOneInputIntersect() throws Exception { + nInputs = 1; + initializeParameters(); + List<Object[]> answer = new ArrayList<>(); + List<IFrame>[] inputFrames = new ArrayList[nInputs]; + prepareAllSameInputDataFrame(inputFrames, answer); + executeAndVerifyResult(inputFrames, answer); + } + + private void executeAndVerifyResult(List<IFrame>[] inputFrames, List<Object[]> answer) throws Exception { + IntersectOperatorDescriptor.IntersectOperatorNodePushable pushable = + new IntersectOperatorDescriptor.IntersectOperatorNodePushable(ctx, nInputs, inputRecordDescriptor, + compareFields, null, comparatorFactory); + assertEquals(nInputs, pushable.getInputArity()); + + IFrameWriter[] writers = new IFrameWriter[nInputs]; + for (int i = 0; i < nInputs; i++) { + writers[i] = pushable.getInputFrameWriter(i); + } + IFrameWriter resultVerifier = new OutputFrameVerifier(outRecordDescriptor, answer); + pushable.setOutputFrameWriter(0, resultVerifier, outRecordDescriptor); + multiThreadTaskEmulator.runInParallel(writers, inputFrames); + } + + protected void prepareCommonDataFrame(List<IFrame>[] inputFrames, List<Object[]> answer) + throws HyracksDataException { + for (int i = 0; i < nInputs; i++) { + List<Object[]> inputObjects = new ArrayList<>(); + generateRecordStream(inputObjects, inputRecordDescriptor[i], i + 1, (i + 1) * 100, 1); + inputFrames[i] = frameGenerator.generateDataFrame(inputRecordDescriptor[i], inputObjects); + } + generateRecordStream(answer, outRecordDescriptor, nInputs, 100, 1); + } + + protected void prepareNullResultDataFrame(List<IFrame>[] inputFrames, List<Object[]> answer) + throws HyracksDataException { + for (int i = 0; i < nInputs; i++) { + List<Object[]> inputObjects = new ArrayList<>(); + generateRecordStream(inputObjects, inputRecordDescriptor[i], (i + 1) * 100, (i + 2) * 100, 1); + inputFrames[i] = frameGenerator.generateDataFrame(inputRecordDescriptor[i], inputObjects); + } + } + + protected void prepareOneInputIsVeryShortDataFrame(List<IFrame>[] inputFrames, List<Object[]> answer) + throws HyracksDataException { + for (int i = 0; i < nInputs; i++) { + List<Object[]> inputObjects = new ArrayList<>(); + generateRecordStream(inputObjects, inputRecordDescriptor[i], i, i * 100 + 1, 1); + inputFrames[i] = frameGenerator.generateDataFrame(inputRecordDescriptor[i], inputObjects); + } + } + + protected void prepareAllSameInputDataFrame(List<IFrame>[] inputFrames, List<Object[]> answer) + throws HyracksDataException { + for (int i = 0; i < nInputs; i++) { + List<Object[]> inputObjects = new ArrayList<>(); + generateRecordStream(inputObjects, inputRecordDescriptor[i], 0, 100, 1); + inputFrames[i] = frameGenerator.generateDataFrame(inputRecordDescriptor[i], inputObjects); + } + generateRecordStream(answer, outRecordDescriptor, 0, 100, 1); + } + + private void generateRecordStream(List<Object[]> inputs, RecordDescriptor recordDesc, + int start, int end, int step) { + for (int i = start; i < end; i += step) { + Object[] obj = new Object[recordDesc.getFieldCount()]; + for (int f = 0; f < recordDesc.getFieldCount(); f++) { + obj[f] = i; + } + inputs.add(obj); + } + } + +} \ No newline at end of file diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/InputFrameGenerator.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/InputFrameGenerator.java new file mode 100644 index 0000000..d37bfd9 --- /dev/null +++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/InputFrameGenerator.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.tests.util; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.control.nc.resources.memory.FrameManager; +import org.apache.hyracks.dataflow.common.comm.io.FrameFixedFieldAppender; + +public class InputFrameGenerator { + + protected final FrameManager manager; + + public InputFrameGenerator(int initialFrameSize) { + manager = new FrameManager(initialFrameSize); + } + + public List<IFrame> generateDataFrame(RecordDescriptor recordDescriptor, List<Object[]> listOfObject) + throws HyracksDataException { + List<IFrame> listFrame = new ArrayList<>(); + VSizeFrame frame = new VSizeFrame(manager); + FrameFixedFieldAppender appender = new FrameFixedFieldAppender(recordDescriptor.getFieldCount()); + appender.reset(frame, true); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(manager.getInitialFrameSize()); + DataOutputStream ds = new DataOutputStream(baos); + for (Object[] objs : listOfObject) { + for (int i = 0; i < recordDescriptor.getFieldCount(); i++) { + baos.reset(); + recordDescriptor.getFields()[i].serialize(objs[i], ds); + if (!appender.appendField(baos.toByteArray(), 0, baos.size())) { + listFrame.add(frame); + frame = new VSizeFrame(manager); + appender.resetWithLeftOverData(frame); + if (!appender.appendField(baos.toByteArray(), 0, baos.size())) { + throw new HyracksDataException("Should never happen!"); + } + } + } + } + listFrame.add(frame); + return listFrame; + } + +} diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/MultiThreadTaskEmulator.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/MultiThreadTaskEmulator.java new file mode 100644 index 0000000..022d3a2 --- /dev/null +++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/MultiThreadTaskEmulator.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.tests.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; + +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.IFrameWriter; + +public class MultiThreadTaskEmulator { + + private ExecutorService executor; + + public MultiThreadTaskEmulator() { + this.executor = Executors.newCachedThreadPool((r) -> { + Thread t = new Thread(r); + t.setDaemon(true); + return t; + }); + } + + public void runInParallel(final IFrameWriter[] writers, final List<IFrame>[] inputFrames) throws Exception { + final Semaphore sem = new Semaphore(writers.length - 1); + List<Exception> exceptions = Collections.synchronizedList(new ArrayList<>()); + for (int i = 1; i < writers.length; i++) { + sem.acquire(); + final IFrameWriter writer = writers[i]; + final List<IFrame> inputFrame = inputFrames[i]; + executor.execute(() -> { + executeOneWriter(writer, inputFrame, exceptions); + sem.release(); + }); + } + + final IFrameWriter writer = writers[0]; + final List<IFrame> inputFrame = inputFrames[0]; + executeOneWriter(writer, inputFrame, exceptions); + sem.acquire(writers.length - 1); + + for (int i = 0; i < exceptions.size(); i++) { + exceptions.get(i).printStackTrace(); + if (i == exceptions.size() - 1) { + throw exceptions.get(i); + } + } + } + + private void executeOneWriter(IFrameWriter writer, List<IFrame> inputFrame, List<Exception> exceptions) { + try { + try { + writer.open(); + for (IFrame frame : inputFrame) { + writer.nextFrame(frame.getBuffer()); + } + } catch (Exception ex) { + writer.fail(); + throw ex; + } finally { + writer.close(); + } + } catch (Exception e) { + exceptions.add(e); + } + } +} diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/OutputFrameVerifier.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/OutputFrameVerifier.java new file mode 100644 index 0000000..77b6913 --- /dev/null +++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/OutputFrameVerifier.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.tests.util; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; + +public class OutputFrameVerifier implements IFrameWriter { + + private final RecordDescriptor inputRecordDescriptor; + private final List<Object[]> answerList; + private final FrameTupleAccessor frameAccessor; + private int offset; + private boolean failed; + + public OutputFrameVerifier(RecordDescriptor inputRecordDescriptor, List<Object[]> answerList) { + this.inputRecordDescriptor = inputRecordDescriptor; + this.frameAccessor = new FrameTupleAccessor(inputRecordDescriptor); + this.answerList = answerList; + } + + @Override + public void open() throws HyracksDataException { + this.offset = 0; + this.failed = false; + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + frameAccessor.reset(buffer); + for (int tid = 0; tid < frameAccessor.getTupleCount(); tid++) { + Object[] objects = new Object[inputRecordDescriptor.getFieldCount()]; + for (int fid = 0; fid < inputRecordDescriptor.getFieldCount(); fid++) { + ByteArrayInputStream bais = new ByteArrayInputStream(frameAccessor.getBuffer().array(), + frameAccessor.getAbsoluteFieldStartOffset(tid, fid), + frameAccessor.getFieldLength(tid, fid)); + DataInputStream dis = new DataInputStream(bais); + objects[fid] = inputRecordDescriptor.getFields()[fid].deserialize(dis); + } + if (offset >= answerList.size()) { + throw new HyracksDataException( + "The number of given results is more than expected size:" + answerList.size()); + } + Object[] expected = answerList.get(offset); + for (int i = 0; i < expected.length; i++) { + if (!expected[i].equals(objects[i])) { + throw new HyracksDataException( + "The result object: " + objects[i] + " is different from the expected one:" + expected[i]); + } + } + offset++; + } + } + + @Override + public void fail() throws HyracksDataException { + this.failed = true; + } + + @Override + public void close() throws HyracksDataException { + if (offset < answerList.size()) { + throw new HyracksDataException( + "The number of given results:" + offset + " is less than expected size:" + answerList.size()); + } + } + + public boolean isFailed() { + return failed; + } +} diff --git a/hyracks/hyracks-test-support/pom.xml b/hyracks/hyracks-test-support/pom.xml index f0ec043..404885f 100644 --- a/hyracks/hyracks-test-support/pom.xml +++ b/hyracks/hyracks-test-support/pom.xml @@ -37,7 +37,6 @@ </license> </licenses> - <dependencies> <dependency> <groupId>junit</groupId> -- To view, visit https://asterix-gerrit.ics.uci.edu/577 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ic16c67c529ca19d8b1a5439ddef22760945fd0d7 Gerrit-PatchSet: 8 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: Jianfeng Jia <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Jianfeng Jia <[email protected]> Gerrit-Reviewer: Taewoo Kim <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]>
