>From Peeyush Gupta <[email protected]>:

Peeyush Gupta has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19744 )


Change subject: [ASTERIXDB-3603][FUN] Runtime changes for transform functions 
part 1
......................................................................

[ASTERIXDB-3603][FUN] Runtime changes for transform functions part 1

- user model changes: no
- storage format changes: no
- interface changes: yes

Ext-ref: MB-63039
Change-Id: I0b23d88db632d90aedec409db816c66a5a688daf
---
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
3 files changed, 163 insertions(+), 89 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/44/19744/1

diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
index 1701e64..005bdf5 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
@@ -69,6 +69,18 @@
             this.funID = funID;
         }

+        public IScalarEvaluatorFactory getListEvalFactory() {
+            return listEvalFactory;
+        }
+
+        public SourceLocation getSourceLoc() {
+            return sourceLoc;
+        }
+
+        public FunctionIdentifier getFunID() {
+            return funID;
+        }
+
         @Override
         public IUnnestingEvaluator createUnnestingEvaluator(IEvaluatorContext 
ctx) throws HyracksDataException {
             return new IUnnestingEvaluator() {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 7feca3c..7d7fbe7 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -90,6 +90,22 @@
         return pipelines;
     }

+    public RecordDescriptor getInputRecordDesc() {
+        return inputRecordDesc;
+    }
+
+    public RecordDescriptor getOutputRecordDesc() {
+        return outputRecordDesc;
+    }
+
+    public IMissingWriterFactory[] getMissingWriterFactories() {
+        return missingWriterFactories;
+    }
+
+    public Map<IPushRuntimeFactory, IOperatorStats> getStats() {
+        return stats;
+    }
+
     public void setStats(Map<IPushRuntimeFactory, IOperatorStats> stats) {
         this.stats.putAll(stats);
     }
@@ -100,22 +116,24 @@
         return new SubplanPushRuntime(ctx);
     }

-    private class SubplanPushRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {
+    public class SubplanPushRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {

-        final IHyracksTaskContext ctx;
+        protected final IHyracksTaskContext ctx;

-        final NestedTupleSourceRuntime[] startOfPipelines;
+        protected final NestedTupleSourceRuntime[] startOfPipelines;

         boolean first;

         boolean profile;
+        protected IMissingWriter[] missingWriters;
+        protected ArrayTupleBuilder missingTupleBuilder;

-        SubplanPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+        protected SubplanPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
             this.ctx = ctx;
             this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
             this.first = true;

-            IMissingWriter[] missingWriters = new 
IMissingWriter[missingWriterFactories.length];
+            missingWriters = new IMissingWriter[missingWriterFactories.length];
             for (int i = 0; i < missingWriterFactories.length; i++) {
                 missingWriters[i] = 
missingWriterFactories[i].createMissingWriter();
             }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 0874f2b..9cfc070 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -52,8 +52,8 @@

     private static final long serialVersionUID = 1L;

-    private final int outCol;
-    private final int positionalCol;
+    protected final int outCol;
+    protected final int positionalCol;
     private final IUnnestingEvaluatorFactory unnestingFactory;
     private final IUnnestingPositionWriterFactory positionWriterFactory;
     private final boolean leftOuter;
@@ -76,6 +76,26 @@
         this.missingWriterFactory = missingWriterFactory;
     }

+    public int[] getProjectionList() {
+        return projectionList;
+    }
+
+    public int getOutCol() {
+        return outCol;
+    }
+
+    public int getPositionalCol() {
+        return positionalCol;
+    }
+
+    public IUnnestingEvaluatorFactory getUnnestingFactory() {
+        return unnestingFactory;
+    }
+
+    public IUnnestingPositionWriterFactory getPositionWriterFactory() {
+        return positionWriterFactory;
+    }
+
     @Override
     public String toString() {
         return "unnest " + outCol + (positionalCol >= 0 ? " at " + 
positionalCol : "") + " <- " + unnestingFactory
@@ -85,92 +105,102 @@
     @Override
     public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
             throws HyracksDataException {
-        ByteArrayAccessibleOutputStream missingBytes = leftOuter ? 
writeMissingBytes() : null;
-        IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
-        return new AbstractOneInputOneOutputOneFramePushRuntime() {
-            private IPointable p = VoidPointable.FACTORY.createPointable();
-            private ArrayTupleBuilder tupleBuilder = new 
ArrayTupleBuilder(projectionList.length);
-            private IUnnestingEvaluator unnest = 
unnestingFactory.createUnnestingEvaluator(evalCtx);
-            private final IUnnestingPositionWriter positionWriter =
-                    positionWriterFactory != null ? 
positionWriterFactory.createUnnestingPositionWriter() : null;
-
-            @Override
-            public void open() throws HyracksDataException {
-                super.open();
-                if (tRef == null) {
-                    initAccessAppendRef(ctx);
-                }
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
-                tAccess.reset(buffer);
-                int nTuple = tAccess.getTupleCount();
-                for (int t = 0; t < nTuple; t++) {
-                    tRef.reset(tAccess, t);
-                    try {
-                        unnest.init(tRef);
-                        unnesting(t);
-                    } catch (IOException ae) {
-                        throw HyracksDataException.create(ae);
-                    }
-                }
-            }
-
-            private void unnesting(int t) throws IOException {
-                // Assumes that when unnesting the tuple, each step() call for 
each element
-                // in the tuple will increase the positionIndex, and the 
positionIndex will
-                // be reset when a new tuple is to be processed.
-                int positionIndex = 1;
-                boolean emitted = false;
-                do {
-                    if (!unnest.step(p)) {
-                        break;
-                    }
-                    writeOutput(t, positionIndex++, false);
-                    emitted = true;
-                } while (true);
-                if (leftOuter && !emitted) {
-                    writeOutput(t, -1, true);
-                }
-            }
-
-            private void writeOutput(int t, int positionIndex, boolean missing)
-                    throws HyracksDataException, IOException {
-                tupleBuilder.reset();
-                for (int f = 0; f < projectionList.length; f++) {
-                    int col = projectionList[f];
-                    if (col == outCol) {
-                        if (missing) {
-                            tupleBuilder.addField(missingBytes.getByteArray(), 
0, missingBytes.size());
-                        } else {
-                            tupleBuilder.addField(p.getByteArray(), 
p.getStartOffset(), p.getLength());
-                        }
-                    } else if (col == positionalCol) {
-                        if (missing) {
-                            tupleBuilder.addField(missingBytes.getByteArray(), 
0, missingBytes.size());
-                        } else {
-                            positionWriter.write(tupleBuilder.getDataOutput(), 
positionIndex);
-                            tupleBuilder.addFieldEndOffset();
-                        }
-                    } else {
-                        tupleBuilder.addField(tAccess, t, projectionList[f]);
-                    }
-                }
-                appendToFrameFromTupleBuilder(tupleBuilder);
-            }
-
-            @Override
-            public void flush() throws HyracksDataException {
-                appender.flush(writer);
-            }
-        };
+        return new UnnestPushRuntime(ctx);
     }

-    private ByteArrayAccessibleOutputStream writeMissingBytes() throws 
HyracksDataException {
+    public class UnnestPushRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {
+        private IPointable p = VoidPointable.FACTORY.createPointable();
+        protected ArrayTupleBuilder tupleBuilder;
+        protected IUnnestingEvaluator unnest;
+        private final IUnnestingPositionWriter positionWriter;
+        private final IHyracksTaskContext ctx;
+        protected ByteArrayAccessibleOutputStream missingBytes;
+
+        public UnnestPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+            this.ctx = ctx;
+            IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
+            unnest = unnestingFactory.createUnnestingEvaluator(evalCtx);
+            tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+            positionWriter =
+                    positionWriterFactory != null ? 
positionWriterFactory.createUnnestingPositionWriter() : null;
+            missingBytes = leftOuter ? writeMissingBytes() : null;
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            super.open();
+            if (tRef == null) {
+                initAccessAppendRef(ctx);
+            }
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            tAccess.reset(buffer);
+            int nTuple = tAccess.getTupleCount();
+            for (int t = 0; t < nTuple; t++) {
+                tRef.reset(tAccess, t);
+                try {
+                    unnest.init(tRef);
+                    unnesting(t);
+                } catch (IOException ae) {
+                    throw HyracksDataException.create(ae);
+                }
+            }
+        }
+
+        protected void unnesting(int t) throws IOException {
+            // Assumes that when unnesting the tuple, each step() call for 
each element
+            // in the tuple will increase the positionIndex, and the 
positionIndex will
+            // be reset when a new tuple is to be processed.
+            int positionIndex = 1;
+            boolean emitted = false;
+            do {
+                if (!unnest.step(p)) {
+                    break;
+                }
+                writeOutput(t, positionIndex++, false);
+                emitted = true;
+            } while (true);
+            if (leftOuter && !emitted) {
+                writeOutput(t, -1, true);
+            }
+        }
+
+        private void writeOutput(int t, int positionIndex, boolean missing) 
throws HyracksDataException, IOException {
+            tupleBuilder.reset();
+            for (int f = 0; f < projectionList.length; f++) {
+                int col = projectionList[f];
+                if (col == outCol) {
+                    if (missing) {
+                        tupleBuilder.addField(missingBytes.getByteArray(), 0, 
missingBytes.size());
+                    } else {
+                        tupleBuilder.addField(p.getByteArray(), 
p.getStartOffset(), p.getLength());
+                    }
+                } else if (col == positionalCol) {
+                    if (missing) {
+                        tupleBuilder.addField(missingBytes.getByteArray(), 0, 
missingBytes.size());
+                    } else {
+                        positionWriter.write(tupleBuilder.getDataOutput(), 
positionIndex);
+                        tupleBuilder.addFieldEndOffset();
+                    }
+                } else {
+                    tupleBuilder.addField(tAccess, t, projectionList[f]);
+                }
+            }
+            appendToFrameFromTupleBuilder(tupleBuilder);
+        }
+
+        @Override
+        public void flush() throws HyracksDataException {
+            appender.flush(writer);
+        }
+    };
+
+    protected ByteArrayAccessibleOutputStream writeMissingBytes() throws 
HyracksDataException {
         ByteArrayAccessibleOutputStream baos = new 
ByteArrayAccessibleOutputStream();
         IMissingWriter missingWriter = 
missingWriterFactory.createMissingWriter();
         missingWriter.writeMissing(new DataOutputStream(baos));
         return baos;
     }
-}
+}
\ No newline at end of file

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19744
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I0b23d88db632d90aedec409db816c66a5a688daf
Gerrit-Change-Number: 19744
Gerrit-PatchSet: 1
Gerrit-Owner: Peeyush Gupta <[email protected]>
Gerrit-MessageType: newchange

Reply via email to