Yingyi Bu has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/614

Change subject: Move to non-copy-based evaluator interfaces for scalar 
functions, aggregate functions, running aggregate functions and unnest 
functions.
......................................................................

Move to non-copy-based evaluator interfaces for scalar functions, aggregate 
functions,
running aggregate functions and unnest functions.

Change-Id: I92a630550f3d45a7a5f00cfbc93e7b049b06330d
---
M 
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java
M 
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
M 
algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
M 
algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java
M 
algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java
M 
algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java
M 
algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
D 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java
D 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java
D 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluator.java
D 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java
D 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java
D 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java
D 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java
D 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
D 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
M 
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
M 
algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
M 
algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java
M 
algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
M 
algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
M 
hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
M 
hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
40 files changed, 146 insertions(+), 597 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/14/614/1

diff --git 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java
 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java
index 0996b9c..7c36b93 100644
--- 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java
+++ 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ILogicalExpressionJobGen.java
@@ -22,18 +22,18 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import 
org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import 
org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
 import 
org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import 
org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
 
 public interface ILogicalExpressionJobGen {
 
-    public ICopyEvaluatorFactory createEvaluatorFactory(ILogicalExpression 
expr, IVariableTypeEnvironment env,
+    public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression 
expr, IVariableTypeEnvironment env,
             IOperatorSchema[] inputSchemas, JobGenContext context) throws 
AlgebricksException;
 
-    public ICopyAggregateFunctionFactory 
createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
+    public IAggregateEvaluatorFactory 
createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, 
JobGenContext context)
             throws AlgebricksException;
 
@@ -41,11 +41,11 @@
             AggregateFunctionCallExpression expr, IVariableTypeEnvironment 
env, IOperatorSchema[] inputSchemas,
             JobGenContext context) throws AlgebricksException;
 
-    public ICopyRunningAggregateFunctionFactory 
createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
+    public IRunningAggregateEvaluatorFactory 
createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, 
JobGenContext context)
             throws AlgebricksException;
 
-    public ICopyUnnestingFunctionFactory 
createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
+    public IUnnestingEvaluatorFactory 
createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, 
JobGenContext context)
             throws AlgebricksException;
 
diff --git 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
index 50e4f3d..6cd48a6 100644
--- 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
+++ 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
@@ -22,27 +22,11 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import 
org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import 
org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunction;
-import 
org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunctionFactory;
 import 
org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunction;
-import 
org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
 import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class LogicalExpressionJobGenToExpressionRuntimeProviderAdapter 
implements IExpressionRuntimeProvider {
     private final ILogicalExpressionJobGen lejg;
@@ -54,16 +38,14 @@
     @Override
     public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression 
expr, IVariableTypeEnvironment env,
             IOperatorSchema[] inputSchemas, JobGenContext context) throws 
AlgebricksException {
-        ICopyEvaluatorFactory cef = lejg.createEvaluatorFactory(expr, env, 
inputSchemas, context);
-        return new ScalarEvaluatorFactoryAdapter(cef);
+        return lejg.createEvaluatorFactory(expr, env, inputSchemas, context);
     }
 
     @Override
     public IAggregateEvaluatorFactory 
createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, 
JobGenContext context)
-            throws AlgebricksException {
-        ICopyAggregateFunctionFactory caff = 
lejg.createAggregateFunctionFactory(expr, env, inputSchemas, context);
-        return new AggregateFunctionFactoryAdapter(caff);
+                    throws AlgebricksException {
+        return lejg.createAggregateFunctionFactory(expr, env, inputSchemas, 
context);
     }
 
     @Override
@@ -76,143 +58,14 @@
     @Override
     public IRunningAggregateEvaluatorFactory 
createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, 
JobGenContext context)
-            throws AlgebricksException {
-        ICopyRunningAggregateFunctionFactory craff = 
lejg.createRunningAggregateFunctionFactory(expr, env,
-                inputSchemas, context);
-        return new RunningAggregateFunctionFactoryAdapter(craff);
+                    throws AlgebricksException {
+        return lejg.createRunningAggregateFunctionFactory(expr, env, 
inputSchemas, context);
     }
 
     @Override
     public IUnnestingEvaluatorFactory 
createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, 
JobGenContext context)
-            throws AlgebricksException {
-        ICopyUnnestingFunctionFactory cuff = 
lejg.createUnnestingFunctionFactory(expr, env, inputSchemas, context);
-        return new UnnestingFunctionFactoryAdapter(cuff);
-    }
-
-    public static final class ScalarEvaluatorFactoryAdapter implements 
IScalarEvaluatorFactory {
-        private static final long serialVersionUID = 1L;
-
-        private final ICopyEvaluatorFactory cef;
-
-        public ScalarEvaluatorFactoryAdapter(ICopyEvaluatorFactory cef) {
-            this.cef = cef;
-        }
-
-        @Override
-        public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) 
throws AlgebricksException {
-            final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
-            final ICopyEvaluator ce = cef.createEvaluator(abvs);
-            return new IScalarEvaluator() {
-                @Override
-                public void evaluate(IFrameTupleReference tuple, IPointable 
result) throws AlgebricksException {
-                    abvs.reset();
-                    ce.evaluate(tuple);
-                    result.set(abvs);
-                }
-            };
-        }
-    }
-
-    public static final class AggregateFunctionFactoryAdapter implements 
IAggregateEvaluatorFactory {
-        private static final long serialVersionUID = 1L;
-
-        private final ICopyAggregateFunctionFactory caff;
-
-        public AggregateFunctionFactoryAdapter(ICopyAggregateFunctionFactory 
caff) {
-            this.caff = caff;
-        }
-
-        @Override
-        public IAggregateEvaluator 
createAggregateEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
-            final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
-            final ICopyAggregateFunction caf = 
caff.createAggregateFunction(abvs);
-            return new IAggregateEvaluator() {
-                @Override
-                public void step(IFrameTupleReference tuple) throws 
AlgebricksException {
-                    caf.step(tuple);
-                }
-
-                @Override
-                public void init() throws AlgebricksException {
-                    abvs.reset();
-                    caf.init();
-                }
-
-                @Override
-                public void finishPartial(IPointable result) throws 
AlgebricksException {
-                    caf.finishPartial();
-                    result.set(abvs);
-                }
-
-                @Override
-                public void finish(IPointable result) throws 
AlgebricksException {
-                    caf.finish();
-                    result.set(abvs);
-                }
-
-            };
-        }
-    }
-
-    public static final class RunningAggregateFunctionFactoryAdapter 
implements IRunningAggregateEvaluatorFactory {
-        private static final long serialVersionUID = 1L;
-
-        private final ICopyRunningAggregateFunctionFactory craff;
-
-        public 
RunningAggregateFunctionFactoryAdapter(ICopyRunningAggregateFunctionFactory 
craff) {
-            this.craff = craff;
-        }
-
-        @Override
-        public IRunningAggregateEvaluator createRunningAggregateEvaluator() 
throws AlgebricksException {
-            final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
-            final ICopyRunningAggregateFunction craf = 
craff.createRunningAggregateFunction(abvs);
-            return new IRunningAggregateEvaluator() {
-                @Override
-                public void step(IFrameTupleReference tuple, IPointable 
result) throws AlgebricksException {
-                    abvs.reset();
-                    craf.step(tuple);
-                    result.set(abvs);
-                }
-
-                @Override
-                public void init() throws AlgebricksException {
-                    craf.init();
-                }
-            };
-        }
-    }
-
-    public static final class UnnestingFunctionFactoryAdapter implements 
IUnnestingEvaluatorFactory {
-        private static final long serialVersionUID = 1L;
-
-        private final ICopyUnnestingFunctionFactory cuff;
-
-        public UnnestingFunctionFactoryAdapter(ICopyUnnestingFunctionFactory 
cuff) {
-            this.cuff = cuff;
-        }
-
-        @Override
-        public IUnnestingEvaluator 
createUnnestingEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
-            final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
-            final ICopyUnnestingFunction cuf = 
cuff.createUnnestingFunction(abvs);
-            return new IUnnestingEvaluator() {
-                @Override
-                public boolean step(IPointable result) throws 
AlgebricksException {
-                    abvs.reset();
-                    if (cuf.step()) {
-                        result.set(abvs);
-                        return true;
-                    }
-                    return false;
-                }
-
-                @Override
-                public void init(IFrameTupleReference tuple) throws 
AlgebricksException {
-                    cuf.init(tuple);
-                }
-            };
-        }
+                    throws AlgebricksException {
+        return lejg.createUnnestingFunctionFactory(expr, env, inputSchemas, 
context);
     }
 }
\ No newline at end of file
diff --git 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index 7ff15d7..4b4a3b5 100644
--- 
a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ 
b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -212,7 +212,7 @@
                 int innerIndex) throws HyracksDataException {
             if (condEvaluator == null) {
                 try {
-                    this.condEvaluator = 
condFactory.createScalarEvaluator(ctx);
+                    this.condEvaluator = condFactory.createEvaluator(ctx);
                 } catch (AlgebricksException ae) {
                     throw new HyracksDataException(ae);
                 }
diff --git 
a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java
 
b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java
index 1c3f9b8..e36e037 100644
--- 
a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java
+++ 
b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/PigletExpressionJobGen.java
@@ -41,11 +41,11 @@
 import 
org.apache.hyracks.algebricks.examples.piglet.exceptions.PigletException;
 import 
org.apache.hyracks.algebricks.examples.piglet.runtime.functions.PigletFunctionRegistry;
 import org.apache.hyracks.algebricks.examples.piglet.types.Type;
-import 
org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import 
org.apache.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
 import 
org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import 
org.apache.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
 import 
org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
 import org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvalFactory;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -56,7 +56,7 @@
     private final UTF8StringSerializerDeserializer utf8SerDer = new 
UTF8StringSerializerDeserializer();
 
     @Override
-    public ICopyEvaluatorFactory createEvaluatorFactory(ILogicalExpression 
expr, IVariableTypeEnvironment env,
+    public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression 
expr, IVariableTypeEnvironment env,
             IOperatorSchema[] inputSchemas, JobGenContext context) throws 
AlgebricksException {
         switch (expr.getExpressionTag()) {
             case CONSTANT: {
@@ -92,12 +92,12 @@
                 ScalarFunctionCallExpression sfce = 
(ScalarFunctionCallExpression) expr;
 
                 List<Mutable<ILogicalExpression>> argExprs = 
sfce.getArguments();
-                ICopyEvaluatorFactory argEvalFactories[] = new 
ICopyEvaluatorFactory[argExprs.size()];
+                IScalarEvaluatorFactory argEvalFactories[] = new 
IScalarEvaluatorFactory[argExprs.size()];
                 for (int i = 0; i < argEvalFactories.length; ++i) {
                     Mutable<ILogicalExpression> er = argExprs.get(i);
                     argEvalFactories[i] = 
createEvaluatorFactory(er.getValue(), env, inputSchemas, context);
                 }
-                ICopyEvaluatorFactory funcEvalFactory;
+                IScalarEvaluatorFactory funcEvalFactory;
                 try {
                     funcEvalFactory = 
PigletFunctionRegistry.createFunctionEvaluatorFactory(
                             sfce.getFunctionIdentifier(), argEvalFactories);
@@ -117,7 +117,7 @@
     }
 
     @Override
-    public ICopyAggregateFunctionFactory 
createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
+    public IAggregateEvaluatorFactory 
createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, 
JobGenContext context)
             throws AlgebricksException {
         throw new UnsupportedOperationException();
@@ -131,14 +131,14 @@
     }
 
     @Override
-    public ICopyRunningAggregateFunctionFactory 
createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
+    public IRunningAggregateEvaluatorFactory 
createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, 
JobGenContext context)
             throws AlgebricksException {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public ICopyUnnestingFunctionFactory 
createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
+    public IUnnestingEvaluatorFactory 
createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
             IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, 
JobGenContext context)
             throws AlgebricksException {
         throw new UnsupportedOperationException();
diff --git 
a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java
 
b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java
index c6624eb..6af6177 100644
--- 
a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java
+++ 
b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IPigletFunctionEvaluatorFactoryBuilder.java
@@ -19,8 +19,8 @@
 package org.apache.hyracks.algebricks.examples.piglet.runtime.functions;
 
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 
 public interface IPigletFunctionEvaluatorFactoryBuilder {
-    public ICopyEvaluatorFactory buildEvaluatorFactory(FunctionIdentifier fid, 
ICopyEvaluatorFactory[] arguments);
+    public IScalarEvaluatorFactory buildEvaluatorFactory(FunctionIdentifier 
fid, IScalarEvaluatorFactory[] arguments);
 }
\ No newline at end of file
diff --git 
a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java
 
b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java
index 534fc33..2e95cd4 100644
--- 
a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java
+++ 
b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/IntegerEqFunctionEvaluatorFactory.java
@@ -18,53 +18,45 @@
  */
 package org.apache.hyracks.algebricks.examples.piglet.runtime.functions;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
-public class IntegerEqFunctionEvaluatorFactory implements 
ICopyEvaluatorFactory {
+public class IntegerEqFunctionEvaluatorFactory implements 
IScalarEvaluatorFactory {
     private static final long serialVersionUID = 1L;
 
-    private final ICopyEvaluatorFactory arg1Factory;
+    private final IScalarEvaluatorFactory arg1Factory;
 
-    private final ICopyEvaluatorFactory arg2Factory;
+    private final IScalarEvaluatorFactory arg2Factory;
 
-    public IntegerEqFunctionEvaluatorFactory(ICopyEvaluatorFactory 
arg1Factory, ICopyEvaluatorFactory arg2Factory) {
+    public IntegerEqFunctionEvaluatorFactory(IScalarEvaluatorFactory 
arg1Factory, IScalarEvaluatorFactory arg2Factory) {
         this.arg1Factory = arg1Factory;
         this.arg2Factory = arg2Factory;
     }
 
     @Override
-    public ICopyEvaluator createEvaluator(final IDataOutputProvider output) 
throws AlgebricksException {
-        return new ICopyEvaluator() {
-            private DataOutput dataout = output.getDataOutput();
-            private ArrayBackedValueStorage out1 = new 
ArrayBackedValueStorage();
-            private ArrayBackedValueStorage out2 = new 
ArrayBackedValueStorage();
-            private ICopyEvaluator eval1 = arg1Factory.createEvaluator(out1);
-            private ICopyEvaluator eval2 = arg2Factory.createEvaluator(out2);
+    public IScalarEvaluator createEvaluator(final IHyracksTaskContext context) 
throws AlgebricksException {
+        return new IScalarEvaluator() {
+            private IPointable out1 = new VoidPointable();
+            private IPointable out2 = new VoidPointable();
+            private IScalarEvaluator eval1 = 
arg1Factory.createEvaluator(context);
+            private IScalarEvaluator eval2 = 
arg2Factory.createEvaluator(context);
+            private byte[] resultData = new byte[1];
 
             @Override
-            public void evaluate(IFrameTupleReference tuple) throws 
AlgebricksException {
-                out1.reset();
-                eval1.evaluate(tuple);
-                out2.reset();
-                eval2.evaluate(tuple);
+            public void evaluate(IFrameTupleReference tuple, IPointable 
result) throws AlgebricksException {
+                eval1.evaluate(tuple, out1);
+                eval2.evaluate(tuple, out2);
                 int v1 = IntegerPointable.getInteger(out1.getByteArray(), 0);
                 int v2 = IntegerPointable.getInteger(out2.getByteArray(), 0);
                 boolean r = v1 == v2;
-                try {
-                    dataout.writeBoolean(r);
-                } catch (IOException ioe) {
-                    throw new AlgebricksException(ioe);
-                }
+                resultData[0] = r ? (byte) 1 : (byte) 0;
+                result.set(resultData, 0, 1);
             }
         };
     }
diff --git 
a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java
 
b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java
index 1ebe9cf..340e7f5 100644
--- 
a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java
+++ 
b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/runtime/functions/PigletFunctionRegistry.java
@@ -25,7 +25,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import 
org.apache.hyracks.algebricks.examples.piglet.exceptions.PigletException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 
 public class PigletFunctionRegistry {
     private static final Map<FunctionIdentifier, 
IPigletFunctionEvaluatorFactoryBuilder> builderMap;
@@ -35,7 +35,7 @@
 
         temp.put(AlgebricksBuiltinFunctions.EQ, new 
IPigletFunctionEvaluatorFactoryBuilder() {
             @Override
-            public ICopyEvaluatorFactory 
buildEvaluatorFactory(FunctionIdentifier fid, ICopyEvaluatorFactory[] 
arguments) {
+            public IScalarEvaluatorFactory 
buildEvaluatorFactory(FunctionIdentifier fid, IScalarEvaluatorFactory[] 
arguments) {
                 return new IntegerEqFunctionEvaluatorFactory(arguments[0], 
arguments[1]);
             }
         });
@@ -43,7 +43,7 @@
         builderMap = Collections.unmodifiableMap(temp);
     }
 
-    public static ICopyEvaluatorFactory 
createFunctionEvaluatorFactory(FunctionIdentifier fid, ICopyEvaluatorFactory[] 
args)
+    public static IScalarEvaluatorFactory 
createFunctionEvaluatorFactory(FunctionIdentifier fid, 
IScalarEvaluatorFactory[] args)
             throws PigletException {
         IPigletFunctionEvaluatorFactoryBuilder builder = builderMap.get(fid);
         if (builder == null) {
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
index 9b2919e..48b4eec 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/aggregators/TupleCountRunningAggregateFunctionFactory.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluator;
 import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -32,7 +33,8 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IRunningAggregateEvaluator createRunningAggregateEvaluator() throws 
AlgebricksException {
+    public IRunningAggregateEvaluator 
createRunningAggregateEvaluator(IHyracksTaskContext context)
+            throws AlgebricksException {
         final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
         return new IRunningAggregateEvaluator() {
 
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
index 41e2fee..08aea9a 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluator.java
@@ -23,11 +23,12 @@
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public interface IAggregateEvaluator {
+    /** should be called each time a new aggregate value is computed */
     public void init() throws AlgebricksException;
 
     public void step(IFrameTupleReference tuple) throws AlgebricksException;
 
-    public void finishPartial(IPointable result) throws AlgebricksException;
-
     public void finish(IPointable result) throws AlgebricksException;
-}
\ No newline at end of file
+
+    public void finishPartial(IPointable result) throws AlgebricksException;
+}
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
index 8bdbed7..fef023a 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IAggregateEvaluatorFactory.java
@@ -24,5 +24,5 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public interface IAggregateEvaluatorFactory extends Serializable {
-    public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext 
ctx) throws AlgebricksException;
-}
\ No newline at end of file
+    public IAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext 
context) throws AlgebricksException;
+}
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java
deleted file mode 100644
index 2222e0b..0000000
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunction.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public interface ICopyAggregateFunction {
-    /** should be called each time a new aggregate value is computed */
-    public void init() throws AlgebricksException;
-
-    public void step(IFrameTupleReference tuple) throws AlgebricksException;
-
-    public void finish() throws AlgebricksException;
-
-    public void finishPartial() throws AlgebricksException;
-}
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java
deleted file mode 100644
index cbb6732..0000000
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyAggregateFunctionFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
-
-public interface ICopyAggregateFunctionFactory extends Serializable {
-    public ICopyAggregateFunction createAggregateFunction(IDataOutputProvider 
provider) throws AlgebricksException;
-}
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluator.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluator.java
deleted file mode 100644
index 03480e8..0000000
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public interface ICopyEvaluator {
-    public void evaluate(IFrameTupleReference tuple) throws 
AlgebricksException;
-}
\ No newline at end of file
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java
deleted file mode 100644
index a81f351..0000000
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyEvaluatorFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
-
-public interface ICopyEvaluatorFactory extends Serializable {
-    public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws 
AlgebricksException;
-}
\ No newline at end of file
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java
deleted file mode 100644
index 19cab14..0000000
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunction.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public interface ICopyRunningAggregateFunction {
-    public void init() throws AlgebricksException;
-
-    public void step(IFrameTupleReference tuple) throws AlgebricksException;
-}
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java
deleted file mode 100644
index 1fe3595..0000000
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyRunningAggregateFunctionFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
-
-public interface ICopyRunningAggregateFunctionFactory extends Serializable {
-    public ICopyRunningAggregateFunction 
createRunningAggregateFunction(IDataOutputProvider provider)
-            throws AlgebricksException;
-}
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java
index 0959811..3fc8255 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopySerializableAggregateFunctionFactory.java
@@ -21,7 +21,8 @@
 import java.io.Serializable;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public interface ICopySerializableAggregateFunctionFactory extends 
Serializable {
-    public ICopySerializableAggregateFunction createAggregateFunction() throws 
AlgebricksException;
+    public ICopySerializableAggregateFunction 
createAggregateFunction(IHyracksTaskContext context) throws AlgebricksException;
 }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java
deleted file mode 100644
index f4e3aea..0000000
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunction.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public interface ICopyUnnestingFunction {
-    public void init(IFrameTupleReference tuple) throws AlgebricksException;
-
-    public boolean step() throws AlgebricksException;
-
-}
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java
deleted file mode 100644
index 1a09fcf..0000000
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ICopyUnnestingFunctionFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.base;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
-
-public interface ICopyUnnestingFunctionFactory extends Serializable {
-    public ICopyUnnestingFunction createUnnestingFunction(IDataOutputProvider 
provider) throws AlgebricksException;
-}
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java
index 0fe86a8..3d2bb53 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IRunningAggregateEvaluatorFactory.java
@@ -21,7 +21,9 @@
 import java.io.Serializable;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public interface IRunningAggregateEvaluatorFactory extends Serializable {
-    public IRunningAggregateEvaluator createRunningAggregateEvaluator() throws 
AlgebricksException;
-}
\ No newline at end of file
+    public IRunningAggregateEvaluator 
createRunningAggregateEvaluator(IHyracksTaskContext context)
+            throws AlgebricksException;
+}
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java
index f14860b..d7df00b 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IScalarEvaluatorFactory.java
@@ -24,5 +24,5 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public interface IScalarEvaluatorFactory extends Serializable {
-    public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) 
throws AlgebricksException;
+    public IScalarEvaluator createEvaluator(IHyracksTaskContext context) 
throws AlgebricksException;
 }
\ No newline at end of file
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java
index f29e65e..67aede4 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluator.java
@@ -26,4 +26,5 @@
     public void init(IFrameTupleReference tuple) throws AlgebricksException;
 
     public boolean step(IPointable result) throws AlgebricksException;
-}
\ No newline at end of file
+
+}
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
index eef98b5..3ee9271 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IUnnestingEvaluatorFactory.java
@@ -24,5 +24,5 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public interface IUnnestingEvaluatorFactory extends Serializable {
-    public IUnnestingEvaluator createUnnestingEvaluator(IHyracksTaskContext 
ctx) throws AlgebricksException;
-}
\ No newline at end of file
+    public IUnnestingEvaluator createUnnestingEvaluator(IHyracksTaskContext 
context) throws AlgebricksException;
+}
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
index 5bb206c..9aecf6a 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ColumnAccessEvalFactory.java
@@ -18,16 +18,14 @@
  */
 package org.apache.hyracks.algebricks.runtime.evaluators;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class ColumnAccessEvalFactory implements ICopyEvaluatorFactory {
+public class ColumnAccessEvalFactory implements IScalarEvaluatorFactory {
 
     private static final long serialVersionUID = 1L;
 
@@ -43,21 +41,15 @@
     }
 
     @Override
-    public ICopyEvaluator createEvaluator(final IDataOutputProvider output) 
throws AlgebricksException {
-        return new ICopyEvaluator() {
-
-            private DataOutput out = output.getDataOutput();
+    public IScalarEvaluator createEvaluator(final IHyracksTaskContext context) 
throws AlgebricksException {
+        return new IScalarEvaluator() {
 
             @Override
-            public void evaluate(IFrameTupleReference tuple) throws 
AlgebricksException {
+            public void evaluate(IFrameTupleReference tuple, IPointable 
result) throws AlgebricksException {
                 byte[] buffer = tuple.getFieldData(fieldIndex);
                 int start = tuple.getFieldStart(fieldIndex);
                 int length = tuple.getFieldLength(fieldIndex);
-                try {
-                    out.write(buffer, start, length);
-                } catch (IOException ioe) {
-                    throw new AlgebricksException(ioe);
-                }
+                result.set(buffer, start, length);
             }
         };
     }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
index b0eebd9..48ea434 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvalFactory.java
@@ -18,16 +18,14 @@
  */
 package org.apache.hyracks.algebricks.runtime.evaluators;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class ConstantEvalFactory implements ICopyEvaluatorFactory {
+public class ConstantEvalFactory implements IScalarEvaluatorFactory {
     private static final long serialVersionUID = 1L;
 
     private byte[] value;
@@ -42,18 +40,12 @@
     }
 
     @Override
-    public ICopyEvaluator createEvaluator(final IDataOutputProvider output) 
throws AlgebricksException {
-        return new ICopyEvaluator() {
-
-            private DataOutput out = output.getDataOutput();
+    public IScalarEvaluator createEvaluator(final IHyracksTaskContext context) 
throws AlgebricksException {
+        return new IScalarEvaluator() {
 
             @Override
-            public void evaluate(IFrameTupleReference tuple) throws 
AlgebricksException {
-                try {
-                    out.write(value, 0, value.length);
-                } catch (IOException ioe) {
-                    throw new AlgebricksException(ioe);
-                }
+            public void evaluate(IFrameTupleReference tuple, IPointable 
result) throws AlgebricksException {
+                result.set(value, 0, value.length);
             }
         };
     }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java
deleted file mode 100644
index 05229fc..0000000
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/ConstantEvaluatorFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.evaluators;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public class ConstantEvaluatorFactory implements IScalarEvaluatorFactory {
-    private static final long serialVersionUID = 1L;
-
-    private byte[] value;
-
-    public ConstantEvaluatorFactory(byte[] value) {
-        this.value = value;
-    }
-
-    @Override
-    public String toString() {
-        return "Constant";
-    }
-
-    @Override
-    public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) 
throws AlgebricksException {
-        return new IScalarEvaluator() {
-            @Override
-            public void evaluate(IFrameTupleReference tuple, IPointable 
result) throws AlgebricksException {
-                result.set(value, 0, value.length);
-            }
-        };
-    }
-
-}
\ No newline at end of file
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java
index 7c80b87..1253393 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/TupleFieldEvaluatorFactory.java
@@ -35,7 +35,7 @@
     }
 
     @Override
-    public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) 
throws AlgebricksException {
+    public IScalarEvaluator createEvaluator(IHyracksTaskContext ctx) throws 
AlgebricksException {
         return new IScalarEvaluator() {
             @Override
             public void evaluate(IFrameTupleReference tuple, IPointable 
result) throws AlgebricksException {
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index 82e5f50..67b099d 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -44,7 +44,7 @@
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, 
RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] 
keyFieldsInPartialResults)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         final int[] keys = keyFields;
 
         /**
@@ -70,7 +70,7 @@
                     try {
                         int begin = tb.getSize();
                         if (aggs[i] == null) {
-                            aggs[i] = 
aggFactories[i].createAggregateFunction();
+                            aggs[i] = 
aggFactories[i].createAggregateFunction(ctx);
                         }
                         aggs[i].init(output);
                         tb.addFieldEndOffset();
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index 1877d64..0a3347d 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -106,7 +106,7 @@
                     int n = evalFactories.length;
                     for (int i = 0; i < n; i++) {
                         try {
-                            eval[i] = 
evalFactories[i].createScalarEvaluator(ctx);
+                            eval[i] = evalFactories[i].createEvaluator(ctx);
                         } catch (AlgebricksException ae) {
                             throw new HyracksDataException(ae);
                         }
@@ -139,8 +139,8 @@
                 tRef.reset(tAccess, t);
                 produceTuple(tupleBuilder, tAccess, t, tRef);
                 if (flushFramesRapidly) {
-                    // Whenever all the tuples in the incoming frame have been 
consumed, the assign operator 
-                    // will push its frame to the next operator; i.e., it 
won't wait until the frame gets full. 
+                    // Whenever all the tuples in the incoming frame have been 
consumed, the assign operator
+                    // will push its frame to the next operator; i.e., it 
won't wait until the frame gets full.
                     appendToFrameFromTupleBuilder(tupleBuilder, true);
                 } else {
                     appendToFrameFromTupleBuilder(tupleBuilder);
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
index b7f11d8..018cfd2 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
@@ -24,8 +24,8 @@
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -35,7 +35,8 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -48,12 +49,13 @@
     private static final long serialVersionUID = 1L;
     public static int NO_DEFAULT_BRANCH = -1;
 
-    private final ICopyEvaluatorFactory[] evalFactories;
+    private final IScalarEvaluatorFactory[] evalFactories;
     private final IBinaryBooleanInspector boolInspector;
     private final int defaultBranchIndex;
 
-    public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry 
spec, ICopyEvaluatorFactory[] evalFactories,
-            IBinaryBooleanInspector boolInspector, int defaultBranchIndex, 
RecordDescriptor rDesc) {
+    public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry 
spec,
+            IScalarEvaluatorFactory[] evalFactories, IBinaryBooleanInspector 
boolInspector, int defaultBranchIndex,
+            RecordDescriptor rDesc) {
         super(spec, 1, (defaultBranchIndex == evalFactories.length) ? 
evalFactories.length + 1 : evalFactories.length);
         for (int i = 0; i < evalFactories.length; i++) {
             recordDescriptors[i] = rDesc;
@@ -71,8 +73,8 @@
             private final IFrameWriter[] writers = new 
IFrameWriter[outputArity];
             private final boolean[] isOpen = new boolean[outputArity];
             private final IFrame[] writeBuffers = new IFrame[outputArity];
-            private final ICopyEvaluator[] evals = new 
ICopyEvaluator[outputArity];
-            private final ArrayBackedValueStorage evalBuf = new 
ArrayBackedValueStorage();
+            private final IScalarEvaluator[] evals = new 
IScalarEvaluator[outputArity];
+            private final IPointable evalPointable = new VoidPointable();
             private final RecordDescriptor inOutRecDesc = 
recordDescProvider.getInputRecordDescriptor(getActivityId(),
                     0);
             private final FrameTupleAccessor accessor = new 
FrameTupleAccessor(inOutRecDesc);
@@ -149,12 +151,11 @@
                     boolean found = false;
                     for (int j = 0; j < evals.length; j++) {
                         try {
-                            evalBuf.reset();
-                            evals[j].evaluate(frameTuple);
+                            evals[j].evaluate(frameTuple, evalPointable);
                         } catch (AlgebricksException e) {
                             throw new HyracksDataException(e);
                         }
-                        found = 
boolInspector.getBooleanValue(evalBuf.getByteArray(), 0, 1);
+                        found = 
boolInspector.getBooleanValue(evalPointable.getByteArray(), 0, 1);
                         if (found) {
                             copyAndAppendTuple(j);
                             break;
@@ -199,7 +200,7 @@
                 // Create evaluators for partitioning.
                 try {
                     for (int i = 0; i < evalFactories.length; i++) {
-                        evals[i] = evalFactories[i].createEvaluator(evalBuf);
+                        evals[i] = evalFactories[i].createEvaluator(ctx);
                     }
                 } catch (AlgebricksException e) {
                     throw new HyracksDataException(e);
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 5a26f36..bb6cc73 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -100,7 +100,7 @@
                     int n = runningAggregates.length;
                     for (int i = 0; i < n; i++) {
                         try {
-                            raggs[i] = 
runningAggregates[i].createRunningAggregateEvaluator();
+                            raggs[i] = 
runningAggregates[i].createRunningAggregateEvaluator(ctx);
                         } catch (AlgebricksException ae) {
                             throw new HyracksDataException(ae);
                         }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
index 11f47ac..2bccd11 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -77,9 +77,9 @@
                 if (evalMaxObjects == null) {
                     initAccessAppendRef(ctx);
                     try {
-                        evalMaxObjects = 
maxObjectsEvalFactory.createScalarEvaluator(ctx);
+                        evalMaxObjects = 
maxObjectsEvalFactory.createEvaluator(ctx);
                         if (offsetEvalFactory != null) {
-                            evalOffset = 
offsetEvalFactory.createScalarEvaluator(ctx);
+                            evalOffset = 
offsetEvalFactory.createEvaluator(ctx);
                         }
                     } catch (AlgebricksException ae) {
                         throw new HyracksDataException(ae);
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 5eb4604..af42f22 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -91,7 +91,7 @@
                 if (eval == null) {
                     initAccessAppendFieldRef(ctx);
                     try {
-                        eval = cond.createScalarEvaluator(ctx);
+                        eval = cond.createEvaluator(ctx);
                     } catch (AlgebricksException ae) {
                         throw new HyracksDataException(ae);
                     }
diff --git 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 2c04003..9c398ff 100644
--- 
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ 
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -27,7 +27,7 @@
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IUnnestingPositionWriter;
-import 
org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.ConstantEvalFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -73,7 +73,7 @@
         this.positionWriter = positionWriter;
         this.posOffsetEvalFactory = posOffsetEvalFactory;
         if (this.posOffsetEvalFactory == null) {
-            this.posOffsetEvalFactory = new ConstantEvaluatorFactory(new 
byte[5]);
+            this.posOffsetEvalFactory = new ConstantEvalFactory(new byte[5]);
         }
     }
 
@@ -88,17 +88,17 @@
 
         return new AbstractOneInputOneOutputOneFramePushRuntime() {
             private IPointable p = VoidPointable.FACTORY.createPointable();
-            private IUnnestingEvaluator agg;
+            private IUnnestingEvaluator unnest;
             private ArrayTupleBuilder tupleBuilder;
 
-            private IScalarEvaluator offsetEval = 
posOffsetEvalFactory.createScalarEvaluator(ctx);
+            private IScalarEvaluator offsetEval = 
posOffsetEvalFactory.createEvaluator(ctx);
 
             @Override
             public void open() throws HyracksDataException {
                 writer.open();
                 initAccessAppendRef(ctx);
                 try {
-                    agg = unnestingFactory.createUnnestingEvaluator(ctx);
+                    unnest = unnestingFactory.createUnnestingEvaluator(ctx);
                 } catch (AlgebricksException ae) {
                     throw new HyracksDataException(ae);
                 }
@@ -118,7 +118,7 @@
                     }
                     int offset = IntegerPointable.getInteger(p.getByteArray(), 
p.getStartOffset());
                     try {
-                        agg.init(tRef);
+                        unnest.init(tRef);
                         // assume that when unnesting the tuple, each step() 
call for each element
                         // in the tuple will increase the positionIndex, and 
the positionIndex will
                         // be reset when a new tuple is to be processed.
@@ -126,7 +126,7 @@
                         boolean goon = true;
                         do {
                             tupleBuilder.reset();
-                            if (!agg.step(p)) {
+                            if (!unnest.step(p)) {
                                 goon = false;
                             } else {
 
diff --git 
a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
 
b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
index fd979fc..cca2f6f 100644
--- 
a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
+++ 
b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerAddEvalFactory.java
@@ -29,7 +29,6 @@
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class IntegerAddEvalFactory implements IScalarEvaluatorFactory {
 
@@ -44,15 +43,14 @@
     }
 
     @Override
-    public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext 
ctx) throws AlgebricksException {
+    public IScalarEvaluator createEvaluator(final IHyracksTaskContext ctx) 
throws AlgebricksException {
         return new IScalarEvaluator() {
             private IPointable p = VoidPointable.FACTORY.createPointable();
             private ArrayBackedValueStorage argOut = new 
ArrayBackedValueStorage();
 
-            private IScalarEvaluator evalLeft = 
evalLeftFactory.createScalarEvaluator(ctx);
-            private IScalarEvaluator evalRight = 
evalRightFactory.createScalarEvaluator(ctx);
+            private IScalarEvaluator evalLeft = 
evalLeftFactory.createEvaluator(ctx);
+            private IScalarEvaluator evalRight = 
evalRightFactory.createEvaluator(ctx);
 
-            @SuppressWarnings("static-access")
             @Override
             public void evaluate(IFrameTupleReference tuple, IPointable 
result) throws AlgebricksException {
                 evalLeft.evaluate(tuple, p);
diff --git 
a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java
 
b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java
index dc8f832..a976be6 100644
--- 
a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java
+++ 
b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerConstantEvalFactory.java
@@ -43,7 +43,7 @@
     }
 
     @Override
-    public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) 
throws AlgebricksException {
+    public IScalarEvaluator createEvaluator(IHyracksTaskContext ctx) throws 
AlgebricksException {
         return new IScalarEvaluator() {
 
             private ArrayBackedValueStorage buf = new 
ArrayBackedValueStorage();
diff --git 
a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
 
b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
index ea415c8..6384161 100644
--- 
a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
+++ 
b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerEqualsEvalFactory.java
@@ -27,7 +27,6 @@
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class IntegerEqualsEvalFactory implements IScalarEvaluatorFactory {
 
@@ -41,11 +40,11 @@
     }
 
     @Override
-    public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext 
ctx) throws AlgebricksException {
+    public IScalarEvaluator createEvaluator(final IHyracksTaskContext ctx) 
throws AlgebricksException {
         return new IScalarEvaluator() {
             private IPointable p = VoidPointable.FACTORY.createPointable();
-            private IScalarEvaluator eval1 = 
evalFact1.createScalarEvaluator(ctx);
-            private IScalarEvaluator eval2 = 
evalFact2.createScalarEvaluator(ctx);
+            private IScalarEvaluator eval1 = evalFact1.createEvaluator(ctx);
+            private IScalarEvaluator eval2 = evalFact2.createEvaluator(ctx);
             private byte[] rBytes = new byte[1];
 
             @Override
diff --git 
a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
 
b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
index aebc406..45152f5 100644
--- 
a/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
+++ 
b/algebricks/algebricks-tests/src/main/java/org/apache/hyracks/algebricks/tests/pushruntime/IntegerGreaterThanEvalFactory.java
@@ -27,7 +27,6 @@
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class IntegerGreaterThanEvalFactory implements IScalarEvaluatorFactory {
 
@@ -41,11 +40,11 @@
     }
 
     @Override
-    public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext 
ctx) throws AlgebricksException {
+    public IScalarEvaluator createEvaluator(final IHyracksTaskContext ctx) 
throws AlgebricksException {
         return new IScalarEvaluator() {
             private IPointable p = VoidPointable.FACTORY.createPointable();
-            private IScalarEvaluator eval1 = 
evalFact1.createScalarEvaluator(ctx);
-            private IScalarEvaluator eval2 = 
evalFact2.createScalarEvaluator(ctx);
+            private IScalarEvaluator eval1 = evalFact1.createEvaluator(ctx);
+            private IScalarEvaluator eval2 = evalFact2.createEvaluator(ctx);
             private byte[] rBytes = new byte[1];
 
             @Override
diff --git 
a/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
 
b/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
index e311fa6..7e834db 100644
--- 
a/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
+++ 
b/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
@@ -94,7 +94,8 @@
      * Returns the character at the given byte offset. The caller is 
responsible for making sure that
      * the provided offset is within bounds and points to the beginning of a 
valid UTF8 character.
      *
-     * @param offset - Byte offset
+     * @param offset
+     *            - Byte offset
      * @return Character at the given offset.
      */
     public char charAt(int offset) {
@@ -157,6 +158,7 @@
         UTF8StringUtil.toString(buffer, bytes, start);
     }
 
+    @Override
     public String toString() {
         return new String(this.bytes, this.getCharStartOffset(), 
this.getUTF8Length(), Charset.forName("UTF-8"));
     }
@@ -166,8 +168,8 @@
      */
 
     public int ignoreCaseCompareTo(UTF8StringPointable other) {
-        return UTF8StringUtil.lowerCaseCompareTo(this.getByteArray(), 
this.getStartOffset(),
-                other.getByteArray(), other.getStartOffset());
+        return UTF8StringUtil.lowerCaseCompareTo(this.getByteArray(), 
this.getStartOffset(), other.getByteArray(),
+                other.getStartOffset());
     }
 
     public int find(UTF8StringPointable pattern, boolean ignoreCase) {
@@ -228,8 +230,9 @@
     public static boolean startsWith(UTF8StringPointable src, 
UTF8StringPointable pattern, boolean ignoreCase) {
         int utflen1 = src.getUTF8Length();
         int utflen2 = pattern.getUTF8Length();
-        if (utflen2 > utflen1)
+        if (utflen2 > utflen1) {
             return false;
+        }
 
         int s1Start = src.getMetaDataLength();
         int s2Start = pattern.getMetaDataLength();
@@ -257,8 +260,9 @@
     public static boolean endsWith(UTF8StringPointable src, 
UTF8StringPointable pattern, boolean ignoreCase) {
         int len1 = src.getUTF8Length();
         int len2 = pattern.getUTF8Length();
-        if (len2 > len1)
+        if (len2 > len1) {
             return false;
+        }
 
         int s1Start = src.getMetaDataLength();
         int s2Start = pattern.getMetaDataLength();
@@ -351,10 +355,7 @@
      * @param out
      * @throws IOException
      */
-    public static void substrBefore(
-            UTF8StringPointable src,
-            UTF8StringPointable match,
-            UTF8StringBuilder builder,
+    public static void substrBefore(UTF8StringPointable src, 
UTF8StringPointable match, UTF8StringBuilder builder,
             GrowableArray out) throws IOException {
 
         int byteOffset = find(src, match, false);
@@ -367,7 +368,7 @@
         final int srcMetaLen = src.getMetaDataLength();
 
         builder.reset(out, byteOffset);
-        for (int idx = 0; idx < byteOffset; ) {
+        for (int idx = 0; idx < byteOffset;) {
             builder.appendChar(src.charAt(srcMetaLen + idx));
             idx += src.charSize(srcMetaLen + idx);
         }
@@ -387,10 +388,7 @@
      * @param builder
      * @param out
      */
-    public static void substrAfter(
-            UTF8StringPointable src,
-            UTF8StringPointable match,
-            UTF8StringBuilder builder,
+    public static void substrAfter(UTF8StringPointable src, 
UTF8StringPointable match, UTF8StringBuilder builder,
             GrowableArray out) throws IOException {
 
         int byteOffset = find(src, match, false);
diff --git 
a/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
 
b/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
index 5a716b4..207ab8d 100644
--- 
a/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
+++ 
b/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/encoding/VarLenIntEncoderDecoder.java
@@ -24,17 +24,14 @@
 
 /**
  * Encodes positive integers in a variable-bytes format.
- *
  * Each byte stores seven bits of the number. The first bit of each byte 
notifies if it is the last byte.
  * Specifically, if the first bit is set, then we need to shift the current 
value by seven and
  * continue to read the next byte util we meet a byte whose first byte is 
unset.
- *
  * e.g. if the number is < 128, it will be stored using one byte and the byte 
value keeps as original.
  * To store the number 255 (0xff) , it will be encoded as [0x81,0x7f]. To 
decode that value, it reads the 0x81
  * to know that the current value is (0x81 & 0x7f)= 0x01, and the first bit 
tells that there are more bytes to
  * be read. When it meets 0x7f, whose first flag is unset, it knows that it is 
the final byte to decode.
  * Finally it will return ( 0x01 << 7) + 0x7f === 255.
- *
  */
 public class VarLenIntEncoderDecoder {
     // sometimes the dec number is easier to get the sense of how big it is.
@@ -75,11 +72,13 @@
 
     public static int decode(byte[] srcBytes, int startPos) {
         int sum = 0;
-        while ((srcBytes[startPos] & CONTINUE_CHUNK) == CONTINUE_CHUNK) {
+        while (startPos < srcBytes.length && (srcBytes[startPos] & 
CONTINUE_CHUNK) == CONTINUE_CHUNK) {
             sum = (sum + (srcBytes[startPos] & DECODE_MASK)) << 7;
             startPos++;
         }
-        sum += srcBytes[startPos++];
+        if (startPos < srcBytes.length) {
+            sum += srcBytes[startPos];
+        }
         return sum;
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/614
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I92a630550f3d45a7a5f00cfbc93e7b049b06330d
Gerrit-PatchSet: 1
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <buyin...@gmail.com>

Reply via email to