>From Ritik Raj <raj.ritik9...@gmail.com>:

Ritik Raj has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18249 )


Change subject: [NO ISSUE][*DB] Refactoring AssignRuntime Factory
......................................................................

[NO ISSUE][*DB] Refactoring AssignRuntime Factory

Change-Id: Ieb583580f7bc8a40a15839ce15c492aa0bfb410c
---
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
1 file changed, 106 insertions(+), 76 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/49/18249/1

diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index 5343069..300fac6 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -39,10 +39,9 @@
 public class AssignRuntimeFactory extends 
AbstractOneInputOneOutputRuntimeFactory {

     private static final long serialVersionUID = 1L;
-
-    private int[] outColumns;
-    private IScalarEvaluatorFactory[] evalFactories;
-    private final boolean flushFramesRapidly;
+    protected int[] outColumns;
+    protected IScalarEvaluatorFactory[] evalFactories;
+    protected final boolean flushFramesRapidly;

     /**
      * @param outColumns
@@ -64,6 +63,14 @@
         this.flushFramesRapidly = flushFramesRapidly;
     }

+    public int[] getOutColumns() {
+        return outColumns;
+    }
+
+    public IScalarEvaluatorFactory[] getEvalFactories() {
+        return evalFactories;
+    }
+
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
@@ -88,92 +95,106 @@
     @Override
     public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
             throws HyracksDataException {
-        IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
-        final int[] projectionToOutColumns = new int[projectionList.length];
-        for (int j = 0; j < projectionList.length; j++) {
-            projectionToOutColumns[j] = Arrays.binarySearch(outColumns, 
projectionList[j]);
+        return new AssignRuntime(ctx);
+    }
+
+    public int[] getProjectionList() {
+        return projectionList;
+    }
+
+    public class AssignRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {
+        private IPointable result;
+        private IScalarEvaluator[] eval;
+        protected ArrayTupleBuilder tupleBuilder;
+        private final int[] projectionToOutColumns;
+        private boolean first = true;
+        protected int tupleIndex = 0;
+        protected final IHyracksTaskContext ctx;
+        protected final IEvaluatorContext evalCtx;
+
+        public AssignRuntime(IHyracksTaskContext ctx) {
+            this.ctx = ctx;
+            this.evalCtx = new EvaluatorContext(ctx);
+            projectionToOutColumns = new int[projectionList.length];
+            for (int j = 0; j < projectionList.length; j++) {
+                projectionToOutColumns[j] = Arrays.binarySearch(outColumns, 
projectionList[j]);
+            }
+            tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+            eval = new IScalarEvaluator[evalFactories.length];
+            result = VoidPointable.FACTORY.createPointable();
         }

-        return new AbstractOneInputOneOutputOneFramePushRuntime() {
-            private IPointable result = 
VoidPointable.FACTORY.createPointable();
-            private IScalarEvaluator[] eval = new 
IScalarEvaluator[evalFactories.length];
-            private ArrayTupleBuilder tupleBuilder = new 
ArrayTupleBuilder(projectionList.length);
-            private boolean first = true;
-            private int tupleIndex = 0;
-
-            @Override
-            public void open() throws HyracksDataException {
-                if (first) {
-                    initAccessAppendRef(ctx);
-                    first = false;
-                    int n = evalFactories.length;
-                    for (int i = 0; i < n; i++) {
-                        eval[i] = 
evalFactories[i].createScalarEvaluator(evalCtx);
-                    }
+        @Override
+        public void open() throws HyracksDataException {
+            if (first) {
+                initAccessAppendRef(ctx);
+                first = false;
+                int n = evalFactories.length;
+                for (int i = 0; i < n; i++) {
+                    eval[i] = evalFactories[i].createScalarEvaluator(evalCtx);
                 }
-                super.open();
             }
+            super.open();
+        }

-            @Override
-            public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
-                // what if nTuple is 0?
-                tAccess.reset(buffer);
-                int nTuple = tAccess.getTupleCount();
-                if (nTuple < 1) {
-                    if (nTuple < 0) {
-                        throw new HyracksDataException("Negative number of 
tuples in the frame: " + nTuple);
-                    }
-                    appender.flush(writer);
-                } else {
-                    if (nTuple > 1) {
-                        for (; tupleIndex < nTuple - 1; tupleIndex++) {
-                            tRef.reset(tAccess, tupleIndex);
-                            produceTuple(tupleBuilder, tAccess, tupleIndex, 
tRef);
-                            appendToFrameFromTupleBuilder(tupleBuilder);
-                        }
-                    }
-
-                    if (tupleIndex < nTuple) {
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            // what if nTuple is 0?
+            tAccess.reset(buffer);
+            int nTuple = tAccess.getTupleCount();
+            if (nTuple < 1) {
+                if (nTuple < 0) {
+                    throw new HyracksDataException("Negative number of tuples 
in the frame: " + nTuple);
+                }
+                appender.flush(writer);
+            } else {
+                if (nTuple > 1) {
+                    for (; tupleIndex < nTuple - 1; tupleIndex++) {
                         tRef.reset(tAccess, tupleIndex);
                         produceTuple(tupleBuilder, tAccess, tupleIndex, tRef);
-                        if (flushFramesRapidly) {
-                            // Whenever all the tuples in the incoming frame 
have been consumed, the assign operator
-                            // will push its frame to the next operator; i.e., 
it won't wait until the frame gets full.
-                            appendToFrameFromTupleBuilder(tupleBuilder, true);
-                        } else {
-                            appendToFrameFromTupleBuilder(tupleBuilder);
-                        }
+                        appendToFrameFromTupleBuilder(tupleBuilder);
+                    }
+                }
+                if (tupleIndex < nTuple) {
+                    tRef.reset(tAccess, tupleIndex);
+                    produceTuple(tupleBuilder, tAccess, tupleIndex, tRef);
+                    if (flushFramesRapidly) {
+                        // Whenever all the tuples in the incoming frame have 
been consumed, the assign operator
+                        // will push its frame to the next operator; i.e., it 
won't wait until the frame gets full.
+                        appendToFrameFromTupleBuilder(tupleBuilder, true);
                     } else {
-                        if (flushFramesRapidly) {
-                            flushAndReset();
-                        }
+                        appendToFrameFromTupleBuilder(tupleBuilder);
+                    }
+                } else {
+                    if (flushFramesRapidly) {
+                        flushAndReset();
                     }
                 }
-                tupleIndex = 0;
             }
+            tupleIndex = 0;
+        }

-            private void produceTuple(ArrayTupleBuilder tb, 
IFrameTupleAccessor accessor, int tIndex,
-                    FrameTupleReference tupleRef) throws HyracksDataException {
-                try {
-                    tb.reset();
-                    for (int f = 0; f < projectionList.length; f++) {
-                        int k = projectionToOutColumns[f];
-                        if (k >= 0) {
-                            eval[k].evaluate(tupleRef, result);
-                            tb.addField(result.getByteArray(), 
result.getStartOffset(), result.getLength());
-                        } else {
-                            tb.addField(accessor, tIndex, projectionList[f]);
-                        }
+        protected void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor 
accessor, int tIndex,
+                FrameTupleReference tupleRef) throws HyracksDataException {
+            try {
+                tb.reset();
+                for (int f = 0; f < projectionList.length; f++) {
+                    int k = projectionToOutColumns[f];
+                    if (k >= 0) {
+                        eval[k].evaluate(tupleRef, result);
+                        tb.addField(result.getByteArray(), 
result.getStartOffset(), result.getLength());
+                    } else {
+                        tb.addField(accessor, tIndex, projectionList[f]);
                     }
-                } catch (HyracksDataException e) {
-                    throw 
HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE, e, sourceLoc, 
tupleIndex);
                 }
+            } catch (HyracksDataException e) {
+                throw 
HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE, e, sourceLoc, 
tupleIndex);
             }
+        }

-            @Override
-            public void flush() throws HyracksDataException {
-                appender.flush(writer);
-            }
-        };
+        @Override
+        public void flush() throws HyracksDataException {
+            appender.flush(writer);
+        }
     }
-}
+}
\ No newline at end of file

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18249
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ieb583580f7bc8a40a15839ce15c492aa0bfb410c
Gerrit-Change-Number: 18249
Gerrit-PatchSet: 1
Gerrit-Owner: Ritik Raj <raj.ritik9...@gmail.com>
Gerrit-MessageType: newchange

Reply via email to