Yingyi Bu has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/614
Change subject: Move to non-copy-based evaluator interfaces for scalar functions, aggregate functions, running aggregate functions and unnest functions. ...................................................................... Move to non-copy-based evaluator interfaces for scalar functions, aggregate functions, running aggregate functions and unnest functions. Change-Id: I92a630550f3d45a7a5f00cfbc93e7b049b06330d --- M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java M algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java M algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java M algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java M algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java D algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java D algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java D algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluator.java D algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java D algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java D algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java D algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java D algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java D algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java M algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java M algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java M algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java M algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java M hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java M hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java 40 files changed, 146 insertions(+), 597 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/14/614/1 diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java index 0996b9c..7c36b93 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java @@ -22,18 +22,18 @@ import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; -import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory; -import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunctionFactory; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory; -import org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory; +import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory; public interface ILogicalExpressionJobGen { - public ICopyEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env, + public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException; - public ICopyAggregateFunctionFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr, + public IAggregateEvaluatorFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException; @@ -41,11 +41,11 @@ AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException; - public ICopyRunningAggregateFunctionFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr, + public IRunningAggregateEvaluatorFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException; - public ICopyUnnestingFunctionFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr, + public IUnnestingEvaluatorFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException; diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java index 50e4f3d..6cd48a6 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java @@ -22,27 +22,11 @@ import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; -import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator; import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunction; -import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory; -import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator; -import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunction; -import org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunctionFactory; import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory; -import org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunction; -import org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory; -import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator; import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluator; import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; -import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; public class LogicalExpressionJobGenToExpressionRuntimeProviderAdapter implements IExpressionRuntimeProvider { private final ILogicalExpressionJobGen lejg; @@ -54,16 +38,14 @@ @Override public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException { - ICopyEvaluatorFactory cef = lejg.createEvaluatorFactory(expr, env, inputSchemas, context); - return new ScalarEvaluatorFactoryAdapter(cef); + return lejg.createEvaluatorFactory(expr, env, inputSchemas, context); } @Override public IAggregateEvaluatorFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) - throws AlgebricksException { - ICopyAggregateFunctionFactory caff = lejg.createAggregateFunctionFactory(expr, env, inputSchemas, context); - return new AggregateFunctionFactoryAdapter(caff); + throws AlgebricksException { + return lejg.createAggregateFunctionFactory(expr, env, inputSchemas, context); } @Override @@ -76,143 +58,14 @@ @Override public IRunningAggregateEvaluatorFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) - throws AlgebricksException { - ICopyRunningAggregateFunctionFactory craff = lejg.createRunningAggregateFunctionFactory(expr, env, - inputSchemas, context); - return new RunningAggregateFunctionFactoryAdapter(craff); + throws AlgebricksException { + return lejg.createRunningAggregateFunctionFactory(expr, env, inputSchemas, context); } @Override public IUnnestingEvaluatorFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) - throws AlgebricksException { - ICopyUnnestingFunctionFactory cuff = lejg.createUnnestingFunctionFactory(expr, env, inputSchemas, context); - return new UnnestingFunctionFactoryAdapter(cuff); - } - - public static final class ScalarEvaluatorFactoryAdapter implements IScalarEvaluatorFactory { - private static final long serialVersionUID = 1L; - - private final ICopyEvaluatorFactory cef; - - public ScalarEvaluatorFactoryAdapter(ICopyEvaluatorFactory cef) { - this.cef = cef; - } - - @Override - public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException { - final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage(); - final ICopyEvaluator ce = cef.createEvaluator(abvs); - return new IScalarEvaluator() { - @Override - public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException { - abvs.reset(); - ce.evaluate(tuple); - result.set(abvs); - } - }; - } - } - - public static final class AggregateFunctionFactoryAdapter implements IAggregateEvaluatorFactory { - private static final long serialVersionUID = 1L; - - private final ICopyAggregateFunctionFactory caff; - - public AggregateFunctionFactoryAdapter(ICopyAggregateFunctionFactory caff) { - this.caff = caff; - } - - @Override - public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException { - final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage(); - final ICopyAggregateFunction caf = caff.createAggregateFunction(abvs); - return new IAggregateEvaluator() { - @Override - public void step(IFrameTupleReference tuple) throws AlgebricksException { - caf.step(tuple); - } - - @Override - public void init() throws AlgebricksException { - abvs.reset(); - caf.init(); - } - - @Override - public void finishPartial(IPointable result) throws AlgebricksException { - caf.finishPartial(); - result.set(abvs); - } - - @Override - public void finish(IPointable result) throws AlgebricksException { - caf.finish(); - result.set(abvs); - } - - }; - } - } - - public static final class RunningAggregateFunctionFactoryAdapter implements IRunningAggregateEvaluatorFactory { - private static final long serialVersionUID = 1L; - - private final ICopyRunningAggregateFunctionFactory craff; - - public RunningAggregateFunctionFactoryAdapter(ICopyRunningAggregateFunctionFactory craff) { - this.craff = craff; - } - - @Override - public IRunningAggregateEvaluator createRunningAggregateEvaluator() throws AlgebricksException { - final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage(); - final ICopyRunningAggregateFunction craf = craff.createRunningAggregateFunction(abvs); - return new IRunningAggregateEvaluator() { - @Override - public void step(IFrameTupleReference tuple, IPointable result) throws AlgebricksException { - abvs.reset(); - craf.step(tuple); - result.set(abvs); - } - - @Override - public void init() throws AlgebricksException { - craf.init(); - } - }; - } - } - - public static final class UnnestingFunctionFactoryAdapter implements IUnnestingEvaluatorFactory { - private static final long serialVersionUID = 1L; - - private final ICopyUnnestingFunctionFactory cuff; - - public UnnestingFunctionFactoryAdapter(ICopyUnnestingFunctionFactory cuff) { - this.cuff = cuff; - } - - @Override - public IUnnestingEvaluator createUnnestingEvaluator(IHyracksTaskContext ctx) throws AlgebricksException { - final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage(); - final ICopyUnnestingFunction cuf = cuff.createUnnestingFunction(abvs); - return new IUnnestingEvaluator() { - @Override - public boolean step(IPointable result) throws AlgebricksException { - abvs.reset(); - if (cuf.step()) { - result.set(abvs); - return true; - } - return false; - } - - @Override - public void init(IFrameTupleReference tuple) throws AlgebricksException { - cuf.init(tuple); - } - }; - } + throws AlgebricksException { + return lejg.createUnnestingFunctionFactory(expr, env, inputSchemas, context); } } \ No newline at end of file diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java index 7ff15d7..4b4a3b5 100644 --- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java +++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java @@ -212,7 +212,7 @@ int innerIndex) throws HyracksDataException { if (condEvaluator == null) { try { - this.condEvaluator = condFactory.createScalarEvaluator(ctx); + this.condEvaluator = condFactory.createEvaluator(ctx); } catch (AlgebricksException ae) { throw new HyracksDataException(ae); } diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java index 1c3f9b8..e36e037 100644 --- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java +++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java @@ -41,11 +41,11 @@ import org.apache.hyracks.algebricks.examples.piglet.exceptions.PigletException; import org.apache.hyracks.algebricks.examples.piglet.runtime.functions.PigletFunctionRegistry; import org.apache.hyracks.algebricks.examples.piglet.types.Type; -import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory; -import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunctionFactory; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory; -import org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory; +import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory; import org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvalFactory; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; @@ -56,7 +56,7 @@ private final UTF8StringSerializerDeserializer utf8SerDer = new UTF8StringSerializerDeserializer(); @Override - public ICopyEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env, + public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException { switch (expr.getExpressionTag()) { case CONSTANT: { @@ -92,12 +92,12 @@ ScalarFunctionCallExpression sfce = (ScalarFunctionCallExpression) expr; List<Mutable<ILogicalExpression>> argExprs = sfce.getArguments(); - ICopyEvaluatorFactory argEvalFactories[] = new ICopyEvaluatorFactory[argExprs.size()]; + IScalarEvaluatorFactory argEvalFactories[] = new IScalarEvaluatorFactory[argExprs.size()]; for (int i = 0; i < argEvalFactories.length; ++i) { Mutable<ILogicalExpression> er = argExprs.get(i); argEvalFactories[i] = createEvaluatorFactory(er.getValue(), env, inputSchemas, context); } - ICopyEvaluatorFactory funcEvalFactory; + IScalarEvaluatorFactory funcEvalFactory; try { funcEvalFactory = PigletFunctionRegistry.createFunctionEvaluatorFactory( sfce.getFunctionIdentifier(), argEvalFactories); @@ -117,7 +117,7 @@ } @Override - public ICopyAggregateFunctionFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr, + public IAggregateEvaluatorFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException { throw new UnsupportedOperationException(); @@ -131,14 +131,14 @@ } @Override - public ICopyRunningAggregateFunctionFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr, + public IRunningAggregateEvaluatorFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException { throw new UnsupportedOperationException(); } @Override - public ICopyUnnestingFunctionFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr, + public IUnnestingEvaluatorFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException { throw new UnsupportedOperationException(); diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java index c6624eb..6af6177 100644 --- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java +++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java @@ -19,8 +19,8 @@ package org.apache.hyracks.algebricks.examples.piglet.runtime.functions; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; -import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; public interface IPigletFunctionEvaluatorFactoryBuilder { - public ICopyEvaluatorFactory buildEvaluatorFactory(FunctionIdentifier fid, ICopyEvaluatorFactory[] arguments); + public IScalarEvaluatorFactory buildEvaluatorFactory(FunctionIdentifier fid, IScalarEvaluatorFactory[] arguments); } \ No newline at end of file diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java index 534fc33..2e95cd4 100644 --- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java +++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java @@ -18,53 +18,45 @@ */ package org.apache.hyracks.algebricks.examples.piglet.runtime.functions; -import java.io.DataOutput; -import java.io.IOException; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator; -import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory; -import org.apache.hyracks.data.std.api.IDataOutputProvider; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.primitive.IntegerPointable; -import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; -import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; -public class IntegerEqFunctionEvaluatorFactory implements ICopyEvaluatorFactory { +public class IntegerEqFunctionEvaluatorFactory implements IScalarEvaluatorFactory { private static final long serialVersionUID = 1L; - private final ICopyEvaluatorFactory arg1Factory; + private final IScalarEvaluatorFactory arg1Factory; - private final ICopyEvaluatorFactory arg2Factory; + private final IScalarEvaluatorFactory arg2Factory; - public IntegerEqFunctionEvaluatorFactory(ICopyEvaluatorFactory arg1Factory, ICopyEvaluatorFactory arg2Factory) { + public IntegerEqFunctionEvaluatorFactory(IScalarEvaluatorFactory arg1Factory, IScalarEvaluatorFactory arg2Factory) { this.arg1Factory = arg1Factory; this.arg2Factory = arg2Factory; } @Override - public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException { - return new ICopyEvaluator() { - private DataOutput dataout = output.getDataOutput(); - private ArrayBackedValueStorage out1 = new ArrayBackedValueStorage(); - private ArrayBackedValueStorage out2 = new ArrayBackedValueStorage(); - private ICopyEvaluator eval1 = arg1Factory.createEvaluator(out1); - private ICopyEvaluator eval2 = arg2Factory.createEvaluator(out2); + public IScalarEvaluator createEvaluator(final IHyracksTaskContext context) throws AlgebricksException { + return new IScalarEvaluator() { + private IPointable out1 = new VoidPointable(); + private IPointable out2 = new VoidPointable(); + private IScalarEvaluator eval1 = arg1Factory.createEvaluator(context); + private IScalarEvaluator eval2 = arg2Factory.createEvaluator(context); + private byte[] resultData = new byte[1]; @Override - public void evaluate(IFrameTupleReference tuple) throws AlgebricksException { - out1.reset(); - eval1.evaluate(tuple); - out2.reset(); - eval2.evaluate(tuple); + public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException { + eval1.evaluate(tuple, out1); + eval2.evaluate(tuple, out2); int v1 = IntegerPointable.getInteger(out1.getByteArray(), 0); int v2 = IntegerPointable.getInteger(out2.getByteArray(), 0); boolean r = v1 == v2; - try { - dataout.writeBoolean(r); - } catch (IOException ioe) { - throw new AlgebricksException(ioe); - } + resultData[0] = r ? (byte) 1 : (byte) 0; + result.set(resultData, 0, 1); } }; } diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java index 1ebe9cf..340e7f5 100644 --- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java +++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java @@ -25,7 +25,7 @@ import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.examples.piglet.exceptions.PigletException; -import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; public class PigletFunctionRegistry { private static final Map<FunctionIdentifier, IPigletFunctionEvaluatorFactoryBuilder> builderMap; @@ -35,7 +35,7 @@ temp.put(AlgebricksBuiltinFunctions.EQ, new IPigletFunctionEvaluatorFactoryBuilder() { @Override - public ICopyEvaluatorFactory buildEvaluatorFactory(FunctionIdentifier fid, ICopyEvaluatorFactory[] arguments) { + public IScalarEvaluatorFactory buildEvaluatorFactory(FunctionIdentifier fid, IScalarEvaluatorFactory[] arguments) { return new IntegerEqFunctionEvaluatorFactory(arguments[0], arguments[1]); } }); @@ -43,7 +43,7 @@ builderMap = Collections.unmodifiableMap(temp); } - public static ICopyEvaluatorFactory createFunctionEvaluatorFactory(FunctionIdentifier fid, ICopyEvaluatorFactory[] args) + public static IScalarEvaluatorFactory createFunctionEvaluatorFactory(FunctionIdentifier fid, IScalarEvaluatorFactory[] args) throws PigletException { IPigletFunctionEvaluatorFactoryBuilder builder = builderMap.get(fid); if (builder == null) { diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java index 9b2919e..48b4eec 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java @@ -23,6 +23,7 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator; import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; @@ -32,7 +33,8 @@ private static final long serialVersionUID = 1L; @Override - public IRunningAggregateEvaluator createRunningAggregateEvaluator() throws AlgebricksException { + public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext context) + throws AlgebricksException { final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage(); return new IRunningAggregateEvaluator() { diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java index 41e2fee..08aea9a 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java @@ -23,11 +23,12 @@ import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; public interface IAggregateEvaluator { + /** should be called each time a new aggregate value is computed */ public void init() throws AlgebricksException; public void step(IFrameTupleReference tuple) throws AlgebricksException; - public void finishPartial(IPointable result) throws AlgebricksException; - public void finish(IPointable result) throws AlgebricksException; -} \ No newline at end of file + + public void finishPartial(IPointable result) throws AlgebricksException; +} diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java index 8bdbed7..fef023a 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java @@ -24,5 +24,5 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; public interface IAggregateEvaluatorFactory extends Serializable { - public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException; -} \ No newline at end of file + public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext context) throws AlgebricksException; +} diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java deleted file mode 100644 index 2222e0b..0000000 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.algebricks.runtime.base; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; - -public interface ICopyAggregateFunction { - /** should be called each time a new aggregate value is computed */ - public void init() throws AlgebricksException; - - public void step(IFrameTupleReference tuple) throws AlgebricksException; - - public void finish() throws AlgebricksException; - - public void finishPartial() throws AlgebricksException; -} diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java deleted file mode 100644 index cbb6732..0000000 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.algebricks.runtime.base; - -import java.io.Serializable; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.data.std.api.IDataOutputProvider; - -public interface ICopyAggregateFunctionFactory extends Serializable { - public ICopyAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws AlgebricksException; -} diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluator.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluator.java deleted file mode 100644 index 03480e8..0000000 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluator.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.algebricks.runtime.base; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; - -public interface ICopyEvaluator { - public void evaluate(IFrameTupleReference tuple) throws AlgebricksException; -} \ No newline at end of file diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java deleted file mode 100644 index a81f351..0000000 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.algebricks.runtime.base; - -import java.io.Serializable; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.data.std.api.IDataOutputProvider; - -public interface ICopyEvaluatorFactory extends Serializable { - public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException; -} \ No newline at end of file diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java deleted file mode 100644 index 19cab14..0000000 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.algebricks.runtime.base; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; - -public interface ICopyRunningAggregateFunction { - public void init() throws AlgebricksException; - - public void step(IFrameTupleReference tuple) throws AlgebricksException; -} diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java deleted file mode 100644 index 1fe3595..0000000 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.algebricks.runtime.base; - -import java.io.Serializable; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.data.std.api.IDataOutputProvider; - -public interface ICopyRunningAggregateFunctionFactory extends Serializable { - public ICopyRunningAggregateFunction createRunningAggregateFunction(IDataOutputProvider provider) - throws AlgebricksException; -} diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java index 0959811..3fc8255 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java @@ -21,7 +21,8 @@ import java.io.Serializable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.context.IHyracksTaskContext; public interface ICopySerializableAggregateFunctionFactory extends Serializable { - public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException; + public ICopySerializableAggregateFunction createAggregateFunction(IHyracksTaskContext context) throws AlgebricksException; } diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java deleted file mode 100644 index f4e3aea..0000000 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.algebricks.runtime.base; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; - -public interface ICopyUnnestingFunction { - public void init(IFrameTupleReference tuple) throws AlgebricksException; - - public boolean step() throws AlgebricksException; - -} diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java deleted file mode 100644 index 1a09fcf..0000000 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.algebricks.runtime.base; - -import java.io.Serializable; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.data.std.api.IDataOutputProvider; - -public interface ICopyUnnestingFunctionFactory extends Serializable { - public ICopyUnnestingFunction createUnnestingFunction(IDataOutputProvider provider) throws AlgebricksException; -} diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java index 0fe86a8..3d2bb53 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java @@ -21,7 +21,9 @@ import java.io.Serializable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.context.IHyracksTaskContext; public interface IRunningAggregateEvaluatorFactory extends Serializable { - public IRunningAggregateEvaluator createRunningAggregateEvaluator() throws AlgebricksException; -} \ No newline at end of file + public IRunningAggregateEvaluator createRunningAggregateEvaluator(IHyracksTaskContext context) + throws AlgebricksException; +} diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java index f14860b..d7df00b 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java @@ -24,5 +24,5 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; public interface IScalarEvaluatorFactory extends Serializable { - public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException; + public IScalarEvaluator createEvaluator(IHyracksTaskContext context) throws AlgebricksException; } \ No newline at end of file diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java index f29e65e..67aede4 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java @@ -26,4 +26,5 @@ public void init(IFrameTupleReference tuple) throws AlgebricksException; public boolean step(IPointable result) throws AlgebricksException; -} \ No newline at end of file + +} diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java index eef98b5..3ee9271 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java @@ -24,5 +24,5 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; public interface IUnnestingEvaluatorFactory extends Serializable { - public IUnnestingEvaluator createUnnestingEvaluator(IHyracksTaskContext ctx) throws AlgebricksException; -} \ No newline at end of file + public IUnnestingEvaluator createUnnestingEvaluator(IHyracksTaskContext context) throws AlgebricksException; +} diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java index 5bb206c..9aecf6a 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java @@ -18,16 +18,14 @@ */ package org.apache.hyracks.algebricks.runtime.evaluators; -import java.io.DataOutput; -import java.io.IOException; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator; -import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory; -import org.apache.hyracks.data.std.api.IDataOutputProvider; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; -public class ColumnAccessEvalFactory implements ICopyEvaluatorFactory { +public class ColumnAccessEvalFactory implements IScalarEvaluatorFactory { private static final long serialVersionUID = 1L; @@ -43,21 +41,15 @@ } @Override - public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException { - return new ICopyEvaluator() { - - private DataOutput out = output.getDataOutput(); + public IScalarEvaluator createEvaluator(final IHyracksTaskContext context) throws AlgebricksException { + return new IScalarEvaluator() { @Override - public void evaluate(IFrameTupleReference tuple) throws AlgebricksException { + public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException { byte[] buffer = tuple.getFieldData(fieldIndex); int start = tuple.getFieldStart(fieldIndex); int length = tuple.getFieldLength(fieldIndex); - try { - out.write(buffer, start, length); - } catch (IOException ioe) { - throw new AlgebricksException(ioe); - } + result.set(buffer, start, length); } }; } diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java index b0eebd9..48ea434 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java @@ -18,16 +18,14 @@ */ package org.apache.hyracks.algebricks.runtime.evaluators; -import java.io.DataOutput; -import java.io.IOException; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator; -import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory; -import org.apache.hyracks.data.std.api.IDataOutputProvider; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; -public class ConstantEvalFactory implements ICopyEvaluatorFactory { +public class ConstantEvalFactory implements IScalarEvaluatorFactory { private static final long serialVersionUID = 1L; private byte[] value; @@ -42,18 +40,12 @@ } @Override - public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException { - return new ICopyEvaluator() { - - private DataOutput out = output.getDataOutput(); + public IScalarEvaluator createEvaluator(final IHyracksTaskContext context) throws AlgebricksException { + return new IScalarEvaluator() { @Override - public void evaluate(IFrameTupleReference tuple) throws AlgebricksException { - try { - out.write(value, 0, value.length); - } catch (IOException ioe) { - throw new AlgebricksException(ioe); - } + public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException { + result.set(value, 0, value.length); } }; } diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java deleted file mode 100644 index 05229fc..0000000 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.algebricks.runtime.evaluators; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; - -public class ConstantEvaluatorFactory implements IScalarEvaluatorFactory { - private static final long serialVersionUID = 1L; - - private byte[] value; - - public ConstantEvaluatorFactory(byte[] value) { - this.value = value; - } - - @Override - public String toString() { - return "Constant"; - } - - @Override - public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException { - return new IScalarEvaluator() { - @Override - public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException { - result.set(value, 0, value.length); - } - }; - } - -} \ No newline at end of file diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java index 7c80b87..1253393 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java @@ -35,7 +35,7 @@ } @Override - public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException { + public IScalarEvaluator createEvaluator(IHyracksTaskContext ctx) throws AlgebricksException { return new IScalarEvaluator() { @Override public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException { diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java index 82e5f50..67b099d 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java @@ -44,7 +44,7 @@ @Override public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults) - throws HyracksDataException { + throws HyracksDataException { final int[] keys = keyFields; /** @@ -70,7 +70,7 @@ try { int begin = tb.getSize(); if (aggs[i] == null) { - aggs[i] = aggFactories[i].createAggregateFunction(); + aggs[i] = aggFactories[i].createAggregateFunction(ctx); } aggs[i].init(output); tb.addFieldEndOffset(); diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java index 1877d64..0a3347d 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java @@ -106,7 +106,7 @@ int n = evalFactories.length; for (int i = 0; i < n; i++) { try { - eval[i] = evalFactories[i].createScalarEvaluator(ctx); + eval[i] = evalFactories[i].createEvaluator(ctx); } catch (AlgebricksException ae) { throw new HyracksDataException(ae); } @@ -139,8 +139,8 @@ tRef.reset(tAccess, t); produceTuple(tupleBuilder, tAccess, t, tRef); if (flushFramesRapidly) { - // Whenever all the tuples in the incoming frame have been consumed, the assign operator - // will push its frame to the next operator; i.e., it won't wait until the frame gets full. + // Whenever all the tuples in the incoming frame have been consumed, the assign operator + // will push its frame to the next operator; i.e., it won't wait until the frame gets full. appendToFrameFromTupleBuilder(tupleBuilder, true); } else { appendToFrameFromTupleBuilder(tupleBuilder); diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java index b7f11d8..018cfd2 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java @@ -24,8 +24,8 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector; -import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator; -import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; @@ -35,7 +35,8 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; -import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; @@ -48,12 +49,13 @@ private static final long serialVersionUID = 1L; public static int NO_DEFAULT_BRANCH = -1; - private final ICopyEvaluatorFactory[] evalFactories; + private final IScalarEvaluatorFactory[] evalFactories; private final IBinaryBooleanInspector boolInspector; private final int defaultBranchIndex; - public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec, ICopyEvaluatorFactory[] evalFactories, - IBinaryBooleanInspector boolInspector, int defaultBranchIndex, RecordDescriptor rDesc) { + public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec, + IScalarEvaluatorFactory[] evalFactories, IBinaryBooleanInspector boolInspector, int defaultBranchIndex, + RecordDescriptor rDesc) { super(spec, 1, (defaultBranchIndex == evalFactories.length) ? evalFactories.length + 1 : evalFactories.length); for (int i = 0; i < evalFactories.length; i++) { recordDescriptors[i] = rDesc; @@ -71,8 +73,8 @@ private final IFrameWriter[] writers = new IFrameWriter[outputArity]; private final boolean[] isOpen = new boolean[outputArity]; private final IFrame[] writeBuffers = new IFrame[outputArity]; - private final ICopyEvaluator[] evals = new ICopyEvaluator[outputArity]; - private final ArrayBackedValueStorage evalBuf = new ArrayBackedValueStorage(); + private final IScalarEvaluator[] evals = new IScalarEvaluator[outputArity]; + private final IPointable evalPointable = new VoidPointable(); private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0); private final FrameTupleAccessor accessor = new FrameTupleAccessor(inOutRecDesc); @@ -149,12 +151,11 @@ boolean found = false; for (int j = 0; j < evals.length; j++) { try { - evalBuf.reset(); - evals[j].evaluate(frameTuple); + evals[j].evaluate(frameTuple, evalPointable); } catch (AlgebricksException e) { throw new HyracksDataException(e); } - found = boolInspector.getBooleanValue(evalBuf.getByteArray(), 0, 1); + found = boolInspector.getBooleanValue(evalPointable.getByteArray(), 0, 1); if (found) { copyAndAppendTuple(j); break; @@ -199,7 +200,7 @@ // Create evaluators for partitioning. try { for (int i = 0; i < evalFactories.length; i++) { - evals[i] = evalFactories[i].createEvaluator(evalBuf); + evals[i] = evalFactories[i].createEvaluator(ctx); } } catch (AlgebricksException e) { throw new HyracksDataException(e); diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java index 5a26f36..bb6cc73 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java @@ -100,7 +100,7 @@ int n = runningAggregates.length; for (int i = 0; i < n; i++) { try { - raggs[i] = runningAggregates[i].createRunningAggregateEvaluator(); + raggs[i] = runningAggregates[i].createRunningAggregateEvaluator(ctx); } catch (AlgebricksException ae) { throw new HyracksDataException(ae); } diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java index 11f47ac..2bccd11 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java @@ -77,9 +77,9 @@ if (evalMaxObjects == null) { initAccessAppendRef(ctx); try { - evalMaxObjects = maxObjectsEvalFactory.createScalarEvaluator(ctx); + evalMaxObjects = maxObjectsEvalFactory.createEvaluator(ctx); if (offsetEvalFactory != null) { - evalOffset = offsetEvalFactory.createScalarEvaluator(ctx); + evalOffset = offsetEvalFactory.createEvaluator(ctx); } } catch (AlgebricksException ae) { throw new HyracksDataException(ae); diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java index 5eb4604..af42f22 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java @@ -91,7 +91,7 @@ if (eval == null) { initAccessAppendFieldRef(ctx); try { - eval = cond.createScalarEvaluator(ctx); + eval = cond.createEvaluator(ctx); } catch (AlgebricksException ae) { throw new HyracksDataException(ae); } diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java index 2c04003..9c398ff 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java @@ -27,7 +27,7 @@ import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluator; import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.base.IUnnestingPositionWriter; -import org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvalFactory; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -73,7 +73,7 @@ this.positionWriter = positionWriter; this.posOffsetEvalFactory = posOffsetEvalFactory; if (this.posOffsetEvalFactory == null) { - this.posOffsetEvalFactory = new ConstantEvaluatorFactory(new byte[5]); + this.posOffsetEvalFactory = new ConstantEvalFactory(new byte[5]); } } @@ -88,17 +88,17 @@ return new AbstractOneInputOneOutputOneFramePushRuntime() { private IPointable p = VoidPointable.FACTORY.createPointable(); - private IUnnestingEvaluator agg; + private IUnnestingEvaluator unnest; private ArrayTupleBuilder tupleBuilder; - private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx); + private IScalarEvaluator offsetEval = posOffsetEvalFactory.createEvaluator(ctx); @Override public void open() throws HyracksDataException { writer.open(); initAccessAppendRef(ctx); try { - agg = unnestingFactory.createUnnestingEvaluator(ctx); + unnest = unnestingFactory.createUnnestingEvaluator(ctx); } catch (AlgebricksException ae) { throw new HyracksDataException(ae); } @@ -118,7 +118,7 @@ } int offset = IntegerPointable.getInteger(p.getByteArray(), p.getStartOffset()); try { - agg.init(tRef); + unnest.init(tRef); // assume that when unnesting the tuple, each step() call for each element // in the tuple will increase the positionIndex, and the positionIndex will // be reset when a new tuple is to be processed. @@ -126,7 +126,7 @@ boolean goon = true; do { tupleBuilder.reset(); - if (!agg.step(p)) { + if (!unnest.step(p)) { goon = false; } else { diff --git a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java index fd979fc..cca2f6f 100644 --- a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java +++ b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java @@ -29,7 +29,6 @@ import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; -import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; public class IntegerAddEvalFactory implements IScalarEvaluatorFactory { @@ -44,15 +43,14 @@ } @Override - public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException { + public IScalarEvaluator createEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException { return new IScalarEvaluator() { private IPointable p = VoidPointable.FACTORY.createPointable(); private ArrayBackedValueStorage argOut = new ArrayBackedValueStorage(); - private IScalarEvaluator evalLeft = evalLeftFactory.createScalarEvaluator(ctx); - private IScalarEvaluator evalRight = evalRightFactory.createScalarEvaluator(ctx); + private IScalarEvaluator evalLeft = evalLeftFactory.createEvaluator(ctx); + private IScalarEvaluator evalRight = evalRightFactory.createEvaluator(ctx); - @SuppressWarnings("static-access") @Override public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException { evalLeft.evaluate(tuple, p); diff --git a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java index dc8f832..a976be6 100644 --- a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java +++ b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java @@ -43,7 +43,7 @@ } @Override - public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException { + public IScalarEvaluator createEvaluator(IHyracksTaskContext ctx) throws AlgebricksException { return new IScalarEvaluator() { private ArrayBackedValueStorage buf = new ArrayBackedValueStorage(); diff --git a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java index ea415c8..6384161 100644 --- a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java +++ b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java @@ -27,7 +27,6 @@ import org.apache.hyracks.data.std.primitive.IntegerPointable; import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; -import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; public class IntegerEqualsEvalFactory implements IScalarEvaluatorFactory { @@ -41,11 +40,11 @@ } @Override - public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException { + public IScalarEvaluator createEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException { return new IScalarEvaluator() { private IPointable p = VoidPointable.FACTORY.createPointable(); - private IScalarEvaluator eval1 = evalFact1.createScalarEvaluator(ctx); - private IScalarEvaluator eval2 = evalFact2.createScalarEvaluator(ctx); + private IScalarEvaluator eval1 = evalFact1.createEvaluator(ctx); + private IScalarEvaluator eval2 = evalFact2.createEvaluator(ctx); private byte[] rBytes = new byte[1]; @Override diff --git a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java index aebc406..45152f5 100644 --- a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java +++ b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java @@ -27,7 +27,6 @@ import org.apache.hyracks.data.std.primitive.IntegerPointable; import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; -import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; public class IntegerGreaterThanEvalFactory implements IScalarEvaluatorFactory { @@ -41,11 +40,11 @@ } @Override - public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException { + public IScalarEvaluator createEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException { return new IScalarEvaluator() { private IPointable p = VoidPointable.FACTORY.createPointable(); - private IScalarEvaluator eval1 = evalFact1.createScalarEvaluator(ctx); - private IScalarEvaluator eval2 = evalFact2.createScalarEvaluator(ctx); + private IScalarEvaluator eval1 = evalFact1.createEvaluator(ctx); + private IScalarEvaluator eval2 = evalFact2.createEvaluator(ctx); private byte[] rBytes = new byte[1]; @Override diff --git a/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java b/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java index e311fa6..7e834db 100644 --- a/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java +++ b/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java @@ -94,7 +94,8 @@ * Returns the character at the given byte offset. The caller is responsible for making sure that * the provided offset is within bounds and points to the beginning of a valid UTF8 character. * - * @param offset - Byte offset + * @param offset + * - Byte offset * @return Character at the given offset. */ public char charAt(int offset) { @@ -157,6 +158,7 @@ UTF8StringUtil.toString(buffer, bytes, start); } + @Override public String toString() { return new String(this.bytes, this.getCharStartOffset(), this.getUTF8Length(), Charset.forName("UTF-8")); } @@ -166,8 +168,8 @@ */ public int ignoreCaseCompareTo(UTF8StringPointable other) { - return UTF8StringUtil.lowerCaseCompareTo(this.getByteArray(), this.getStartOffset(), - other.getByteArray(), other.getStartOffset()); + return UTF8StringUtil.lowerCaseCompareTo(this.getByteArray(), this.getStartOffset(), other.getByteArray(), + other.getStartOffset()); } public int find(UTF8StringPointable pattern, boolean ignoreCase) { @@ -228,8 +230,9 @@ public static boolean startsWith(UTF8StringPointable src, UTF8StringPointable pattern, boolean ignoreCase) { int utflen1 = src.getUTF8Length(); int utflen2 = pattern.getUTF8Length(); - if (utflen2 > utflen1) + if (utflen2 > utflen1) { return false; + } int s1Start = src.getMetaDataLength(); int s2Start = pattern.getMetaDataLength(); @@ -257,8 +260,9 @@ public static boolean endsWith(UTF8StringPointable src, UTF8StringPointable pattern, boolean ignoreCase) { int len1 = src.getUTF8Length(); int len2 = pattern.getUTF8Length(); - if (len2 > len1) + if (len2 > len1) { return false; + } int s1Start = src.getMetaDataLength(); int s2Start = pattern.getMetaDataLength(); @@ -351,10 +355,7 @@ * @param out * @throws IOException */ - public static void substrBefore( - UTF8StringPointable src, - UTF8StringPointable match, - UTF8StringBuilder builder, + public static void substrBefore(UTF8StringPointable src, UTF8StringPointable match, UTF8StringBuilder builder, GrowableArray out) throws IOException { int byteOffset = find(src, match, false); @@ -367,7 +368,7 @@ final int srcMetaLen = src.getMetaDataLength(); builder.reset(out, byteOffset); - for (int idx = 0; idx < byteOffset; ) { + for (int idx = 0; idx < byteOffset;) { builder.appendChar(src.charAt(srcMetaLen + idx)); idx += src.charSize(srcMetaLen + idx); } @@ -387,10 +388,7 @@ * @param builder * @param out */ - public static void substrAfter( - UTF8StringPointable src, - UTF8StringPointable match, - UTF8StringBuilder builder, + public static void substrAfter(UTF8StringPointable src, UTF8StringPointable match, UTF8StringBuilder builder, GrowableArray out) throws IOException { int byteOffset = find(src, match, false); diff --git a/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java b/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java index 5a716b4..207ab8d 100644 --- a/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java +++ b/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java @@ -24,17 +24,14 @@ /** * Encodes positive integers in a variable-bytes format. - * * Each byte stores seven bits of the number. The first bit of each byte notifies if it is the last byte. * Specifically, if the first bit is set, then we need to shift the current value by seven and * continue to read the next byte util we meet a byte whose first byte is unset. - * * e.g. if the number is < 128, it will be stored using one byte and the byte value keeps as original. * To store the number 255 (0xff) , it will be encoded as [0x81,0x7f]. To decode that value, it reads the 0x81 * to know that the current value is (0x81 & 0x7f)= 0x01, and the first bit tells that there are more bytes to * be read. When it meets 0x7f, whose first flag is unset, it knows that it is the final byte to decode. * Finally it will return ( 0x01 << 7) + 0x7f === 255. - * */ public class VarLenIntEncoderDecoder { // sometimes the dec number is easier to get the sense of how big it is. @@ -75,11 +72,13 @@ public static int decode(byte[] srcBytes, int startPos) { int sum = 0; - while ((srcBytes[startPos] & CONTINUE_CHUNK) == CONTINUE_CHUNK) { + while (startPos < srcBytes.length && (srcBytes[startPos] & CONTINUE_CHUNK) == CONTINUE_CHUNK) { sum = (sum + (srcBytes[startPos] & DECODE_MASK)) << 7; startPos++; } - sum += srcBytes[startPos++]; + if (startPos < srcBytes.length) { + sum += srcBytes[startPos]; + } return sum; } -- To view, visit https://asterix-gerrit.ics.uci.edu/614 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I92a630550f3d45a7a5f00cfbc93e7b049b06330d Gerrit-PatchSet: 1 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: Yingyi Bu <buyin...@gmail.com>