>From Peeyush Gupta <[email protected]>:

Peeyush Gupta has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19744 )

Change subject: [ASTERIXDB-3603][FUN] Runtime changes for transform functions
......................................................................

[ASTERIXDB-3603][FUN] Runtime changes for transform functions

- user model changes: no
- storage format changes: no
- interface changes: yes

Ext-ref: MB-63039
Change-Id: I0b23d88db632d90aedec409db816c66a5a688daf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19744
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Peeyush Gupta <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
Tested-by: Peeyush Gupta <[email protected]>
---
M 
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp
M 
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
16 files changed, 254 insertions(+), 113 deletions(-)

Approvals:
  Ali Alsuliman: Looks good to me, approved
  Peeyush Gupta: Looks good to me, but someone else must approve; Verified
  Jenkins: Verified




diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 3bb871b..6d37447 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -488,6 +488,16 @@
         return isFileStore ? 
String.valueOf(ExternalWriterProvider.getSeparator(adapter)) : "";
     }

+    private LogicalVariable getUnnestVar(ILogicalOperator op) {
+        while (op.getOperatorTag() != LogicalOperatorTag.UNNEST && 
!op.getInputs().isEmpty()) {
+            op = op.getInputs().get(0).getValue();
+        }
+        if (op.getOperatorTag() == LogicalOperatorTag.UNNEST) {
+            return ((UnnestOperator) op).getVariable();
+        }
+        return null;
+    }
+
     public ILogicalPlan translate(Query expr, String outputDatasetName, 
ICompiledDmlStatement stmt,
             ILogicalOperator baseOp, IResultMetadata resultMetadata) throws 
AlgebricksException {
         MutableObject<ILogicalOperator> base = new MutableObject<>(new 
EmptyTupleSourceOperator());
@@ -500,8 +510,11 @@
         ILogicalOperator topOp = p.first;
         List<LogicalVariable> liveVars = new ArrayList<>();
         VariableUtilities.getLiveVariables(topOp, liveVars);
-        LogicalVariable unnestVar = liveVars.get(0);
-        LogicalVariable resVar = unnestVar;
+        LogicalVariable unnestVar = getUnnestVar(topOp);
+        if (unnestVar == null) {
+            unnestVar = liveVars.get(0);
+        }
+        LogicalVariable resVar = liveVars.get(0);

         if (outputDatasetName == null) {
             FileSplit outputFileSplit = metadataProvider.getOutputFile();
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp
index 0ab3ace..f03a74d 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp
@@ -18,8 +18,6 @@
  */
 use test;

--- Fail: More than one argument
-
 CREATE TRANSFORM FUNCTION transformTest1(doc) {
     SELECT count(*) as count FROM [doc] t UNNEST t.a
 };
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp
index 6d55991..a04ea00 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp
@@ -18,8 +18,6 @@
  */
 use test;

--- Fail: Less than one argument
-
 CREATE TRANSFORM FUNCTION transformTest2(doc) {
     SELECT (SELECT * FROM [doc] t UNNEST t.a) as tt
 };
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp
index c38bd72..0770d98 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp
@@ -18,8 +18,6 @@
  */
 use test;

--- Fail: Using a collection in the definition
-
 CREATE TRANSFORM FUNCTION transformTest3(doc) {
     SELECT * FROM [doc] t UNNEST t.a LIMIT 1
 };
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp
index 10cfb4e..d2c2265 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp
@@ -18,8 +18,6 @@
  */
 use test;

--- Fail: Using a view in the definition
-
 CREATE TRANSFORM FUNCTION transformTest4(cust) {
     SELECT VALUE c
       FROM [cust] AS c
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 25d43b2..a60e53b 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -689,12 +689,6 @@
             // in the cache.
             return null;
         }
-        //TODO(DB): review this and other similar ones
-        if (ctx.getDataverse(functionSignature.getDatabaseName(), 
functionSignature.getDataverseName()) != null) {
-            // This transaction has dropped and subsequently created the same
-            // dataverse.
-            return null;
-        }
         function = cache.getFunction(functionSignature);
         if (function != null) {
             // Function is already in the cache, don't add it again.
@@ -1348,6 +1342,15 @@
         INSTANCE = new NCMetadataManagerImpl(proxies, metadataNode);
     }

+    @Override
+    public List<Dataset> getAllDatasets(MetadataTransactionContext ctx) throws 
AlgebricksException {
+        try {
+            return metadataNode.getAllDatasets(ctx.getTxnId());
+        } catch (RemoteException e) {
+            throw new 
MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
+        }
+    }
+
     private static class CCMetadataManagerImpl extends MetadataManager {
         private final MetadataProperties metadataProperties;
         private final ICcApplicationContext appCtx;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index fdd36de..4ce8742 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -919,4 +919,6 @@
             String feedName) throws AlgebricksException;

     long getMaxTxnId();
+
+    List<Dataset> getAllDatasets(MetadataTransactionContext ctx) throws 
AlgebricksException;
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index ee4a2c9..bc6789c 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -1017,4 +1017,6 @@

     List<FeedConnection> getFeedConnections(TxnId txnId, String database, 
DataverseName dataverseName, String feedName)
             throws AlgebricksException, RemoteException;
+
+    List<Dataset> getAllDatasets(TxnId txnId) throws AlgebricksException, 
RemoteException;
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f11a338..63eda21 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -2033,4 +2033,8 @@
             off += Character.charCount(codePointChar);
         }
     }
+
+    public List<Dataset> getAllDatasets() throws AlgebricksException {
+        return MetadataManager.INSTANCE.getAllDatasets(mdTxnCtx);
+    }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
index 1701e64..005bdf5 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
@@ -69,6 +69,18 @@
             this.funID = funID;
         }

+        public IScalarEvaluatorFactory getListEvalFactory() {
+            return listEvalFactory;
+        }
+
+        public SourceLocation getSourceLoc() {
+            return sourceLoc;
+        }
+
+        public FunctionIdentifier getFunID() {
+            return funID;
+        }
+
         @Override
         public IUnnestingEvaluator createUnnestingEvaluator(IEvaluatorContext 
ctx) throws HyracksDataException {
             return new IUnnestingEvaluator() {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 0c74260..9b643cd 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.dataflow.common.comm.io.AbstractFrameAppender;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -38,10 +39,12 @@
     protected IFrame frame;
     protected FrameTupleAccessor tAccess;
     protected FrameTupleReference tRef;
+    protected boolean ignoreFailures = false;

     protected final void initAccessAppend(IHyracksTaskContext ctx) throws 
HyracksDataException {
         frame = new VSizeFrame(ctx);
         appender = new FrameTupleAppender(frame);
+        ((AbstractFrameAppender) appender).setIgnoreFailures(ignoreFailures);
         tAccess = new FrameTupleAccessor(inputRecordDesc);
     }

@@ -59,8 +62,10 @@
         try {
             flushIfNotFailed();
         } catch (Exception e) {
-            closeException = e;
-            fail(closeException);
+            if (!ignoreFailures) {
+                closeException = e;
+                fail(closeException);
+            }
         } finally {
             closeException = CleanupUtils.close(writer, closeException);
         }
@@ -115,4 +120,15 @@
             throws HyracksDataException {
         FrameUtils.appendConcatToWriter(writer, getTupleAppender(), accessor0, 
tIndex0, accessor1, tIndex1);
     }
+
+    public void setIgnoreFailures(boolean ignoreFailures) {
+        this.ignoreFailures = ignoreFailures;
+        if (appender != null) {
+            ((AbstractFrameAppender) 
appender).setIgnoreFailures(ignoreFailures);
+        }
+    }
+
+    public boolean isIgnoreFailures() {
+        return ignoreFailures;
+    }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 202c087..c2e8473 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.base.ProfiledPushRuntime;
+import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -46,15 +47,23 @@
     private final int outputArity;
     private final AlgebricksPipeline pipeline;
     private final Map<IPushRuntimeFactory, IPushRuntime[]> runtimeMap;
+    private final boolean ignoreFailures;

     public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int 
outputArity,
             RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor 
pipelineOutputRecordDescriptor) {
+        this(pipeline, inputArity, outputArity, pipelineInputRecordDescriptor, 
pipelineOutputRecordDescriptor, false);
+    }
+
+    public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int 
outputArity,
+            RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor 
pipelineOutputRecordDescriptor,
+            boolean ignoreFailures) {
         this.pipeline = pipeline;
         this.pipelineInputRecordDescriptor = pipelineInputRecordDescriptor;
         this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor;
         this.inputArity = inputArity;
         this.outputArity = outputArity;
         this.runtimeMap = new HashMap<>();
+        this.ignoreFailures = ignoreFailures;
     }

     public IFrameWriter assemblePipeline(IFrameWriter writer, 
IHyracksTaskContext ctx) throws HyracksDataException {
@@ -99,6 +108,9 @@
                 } else {
                     newRuntimes[j].setOutputFrameWriter(0, start, 
recordDescriptors[i]);
                 }
+                if (newRuntimes[j] instanceof 
AbstractOneInputOneOutputOneFramePushRuntime) {
+                    ((AbstractOneInputOneOutputOneFramePushRuntime) 
newRuntimes[j]).setIgnoreFailures(ignoreFailures);
+                }
             }
             runtimeMap.put(runtimeFactory, newRuntimes);

diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 7feca3c..242c603 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -90,6 +90,22 @@
         return pipelines;
     }

+    public RecordDescriptor getInputRecordDesc() {
+        return inputRecordDesc;
+    }
+
+    public RecordDescriptor getOutputRecordDesc() {
+        return outputRecordDesc;
+    }
+
+    public IMissingWriterFactory[] getMissingWriterFactories() {
+        return missingWriterFactories;
+    }
+
+    public Map<IPushRuntimeFactory, IOperatorStats> getStats() {
+        return stats;
+    }
+
     public void setStats(Map<IPushRuntimeFactory, IOperatorStats> stats) {
         this.stats.putAll(stats);
     }
@@ -97,20 +113,20 @@
     @Override
     public AbstractOneInputOneOutputPushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
             throws HyracksDataException {
-        return new SubplanPushRuntime(ctx);
+        return new SubplanPushRuntime(ctx, false);
     }

-    private class SubplanPushRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {
+    public class SubplanPushRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {

-        final IHyracksTaskContext ctx;
+        protected final IHyracksTaskContext ctx;

-        final NestedTupleSourceRuntime[] startOfPipelines;
+        protected final NestedTupleSourceRuntime[] startOfPipelines;

         boolean first;

         boolean profile;

-        SubplanPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+        protected SubplanPushRuntime(IHyracksTaskContext ctx, boolean 
ignoreFailures) throws HyracksDataException {
             this.ctx = ctx;
             this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
             this.first = true;
@@ -150,7 +166,8 @@
                     outputRecordDescriptor = pipelineLastRecordDescriptor;
                 }

-                PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, 
inputRecordDesc, outputRecordDescriptor);
+                PipelineAssembler pa =
+                        new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, 
outputRecordDescriptor, ignoreFailures);
                 IFrameWriter head = pa.assemblePipeline(outputWriter, ctx, 
stats);
                 startOfPipelines[i] = (NestedTupleSourceRuntime) head;
                 pipelineAssemblers[i] = pa;
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 0874f2b..9cfc070 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -52,8 +52,8 @@

     private static final long serialVersionUID = 1L;

-    private final int outCol;
-    private final int positionalCol;
+    protected final int outCol;
+    protected final int positionalCol;
     private final IUnnestingEvaluatorFactory unnestingFactory;
     private final IUnnestingPositionWriterFactory positionWriterFactory;
     private final boolean leftOuter;
@@ -76,6 +76,26 @@
         this.missingWriterFactory = missingWriterFactory;
     }

+    public int[] getProjectionList() {
+        return projectionList;
+    }
+
+    public int getOutCol() {
+        return outCol;
+    }
+
+    public int getPositionalCol() {
+        return positionalCol;
+    }
+
+    public IUnnestingEvaluatorFactory getUnnestingFactory() {
+        return unnestingFactory;
+    }
+
+    public IUnnestingPositionWriterFactory getPositionWriterFactory() {
+        return positionWriterFactory;
+    }
+
     @Override
     public String toString() {
         return "unnest " + outCol + (positionalCol >= 0 ? " at " + 
positionalCol : "") + " <- " + unnestingFactory
@@ -85,92 +105,102 @@
     @Override
     public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
             throws HyracksDataException {
-        ByteArrayAccessibleOutputStream missingBytes = leftOuter ? 
writeMissingBytes() : null;
-        IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
-        return new AbstractOneInputOneOutputOneFramePushRuntime() {
-            private IPointable p = VoidPointable.FACTORY.createPointable();
-            private ArrayTupleBuilder tupleBuilder = new 
ArrayTupleBuilder(projectionList.length);
-            private IUnnestingEvaluator unnest = 
unnestingFactory.createUnnestingEvaluator(evalCtx);
-            private final IUnnestingPositionWriter positionWriter =
-                    positionWriterFactory != null ? 
positionWriterFactory.createUnnestingPositionWriter() : null;
-
-            @Override
-            public void open() throws HyracksDataException {
-                super.open();
-                if (tRef == null) {
-                    initAccessAppendRef(ctx);
-                }
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
-                tAccess.reset(buffer);
-                int nTuple = tAccess.getTupleCount();
-                for (int t = 0; t < nTuple; t++) {
-                    tRef.reset(tAccess, t);
-                    try {
-                        unnest.init(tRef);
-                        unnesting(t);
-                    } catch (IOException ae) {
-                        throw HyracksDataException.create(ae);
-                    }
-                }
-            }
-
-            private void unnesting(int t) throws IOException {
-                // Assumes that when unnesting the tuple, each step() call for 
each element
-                // in the tuple will increase the positionIndex, and the 
positionIndex will
-                // be reset when a new tuple is to be processed.
-                int positionIndex = 1;
-                boolean emitted = false;
-                do {
-                    if (!unnest.step(p)) {
-                        break;
-                    }
-                    writeOutput(t, positionIndex++, false);
-                    emitted = true;
-                } while (true);
-                if (leftOuter && !emitted) {
-                    writeOutput(t, -1, true);
-                }
-            }
-
-            private void writeOutput(int t, int positionIndex, boolean missing)
-                    throws HyracksDataException, IOException {
-                tupleBuilder.reset();
-                for (int f = 0; f < projectionList.length; f++) {
-                    int col = projectionList[f];
-                    if (col == outCol) {
-                        if (missing) {
-                            tupleBuilder.addField(missingBytes.getByteArray(), 
0, missingBytes.size());
-                        } else {
-                            tupleBuilder.addField(p.getByteArray(), 
p.getStartOffset(), p.getLength());
-                        }
-                    } else if (col == positionalCol) {
-                        if (missing) {
-                            tupleBuilder.addField(missingBytes.getByteArray(), 
0, missingBytes.size());
-                        } else {
-                            positionWriter.write(tupleBuilder.getDataOutput(), 
positionIndex);
-                            tupleBuilder.addFieldEndOffset();
-                        }
-                    } else {
-                        tupleBuilder.addField(tAccess, t, projectionList[f]);
-                    }
-                }
-                appendToFrameFromTupleBuilder(tupleBuilder);
-            }
-
-            @Override
-            public void flush() throws HyracksDataException {
-                appender.flush(writer);
-            }
-        };
+        return new UnnestPushRuntime(ctx);
     }

-    private ByteArrayAccessibleOutputStream writeMissingBytes() throws 
HyracksDataException {
+    public class UnnestPushRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {
+        private IPointable p = VoidPointable.FACTORY.createPointable();
+        protected ArrayTupleBuilder tupleBuilder;
+        protected IUnnestingEvaluator unnest;
+        private final IUnnestingPositionWriter positionWriter;
+        private final IHyracksTaskContext ctx;
+        protected ByteArrayAccessibleOutputStream missingBytes;
+
+        public UnnestPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+            this.ctx = ctx;
+            IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
+            unnest = unnestingFactory.createUnnestingEvaluator(evalCtx);
+            tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+            positionWriter =
+                    positionWriterFactory != null ? 
positionWriterFactory.createUnnestingPositionWriter() : null;
+            missingBytes = leftOuter ? writeMissingBytes() : null;
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            super.open();
+            if (tRef == null) {
+                initAccessAppendRef(ctx);
+            }
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            tAccess.reset(buffer);
+            int nTuple = tAccess.getTupleCount();
+            for (int t = 0; t < nTuple; t++) {
+                tRef.reset(tAccess, t);
+                try {
+                    unnest.init(tRef);
+                    unnesting(t);
+                } catch (IOException ae) {
+                    throw HyracksDataException.create(ae);
+                }
+            }
+        }
+
+        protected void unnesting(int t) throws IOException {
+            // Assumes that when unnesting the tuple, each step() call for 
each element
+            // in the tuple will increase the positionIndex, and the 
positionIndex will
+            // be reset when a new tuple is to be processed.
+            int positionIndex = 1;
+            boolean emitted = false;
+            do {
+                if (!unnest.step(p)) {
+                    break;
+                }
+                writeOutput(t, positionIndex++, false);
+                emitted = true;
+            } while (true);
+            if (leftOuter && !emitted) {
+                writeOutput(t, -1, true);
+            }
+        }
+
+        private void writeOutput(int t, int positionIndex, boolean missing) 
throws HyracksDataException, IOException {
+            tupleBuilder.reset();
+            for (int f = 0; f < projectionList.length; f++) {
+                int col = projectionList[f];
+                if (col == outCol) {
+                    if (missing) {
+                        tupleBuilder.addField(missingBytes.getByteArray(), 0, 
missingBytes.size());
+                    } else {
+                        tupleBuilder.addField(p.getByteArray(), 
p.getStartOffset(), p.getLength());
+                    }
+                } else if (col == positionalCol) {
+                    if (missing) {
+                        tupleBuilder.addField(missingBytes.getByteArray(), 0, 
missingBytes.size());
+                    } else {
+                        positionWriter.write(tupleBuilder.getDataOutput(), 
positionIndex);
+                        tupleBuilder.addFieldEndOffset();
+                    }
+                } else {
+                    tupleBuilder.addField(tAccess, t, projectionList[f]);
+                }
+            }
+            appendToFrameFromTupleBuilder(tupleBuilder);
+        }
+
+        @Override
+        public void flush() throws HyracksDataException {
+            appender.flush(writer);
+        }
+    };
+
+    protected ByteArrayAccessibleOutputStream writeMissingBytes() throws 
HyracksDataException {
         ByteArrayAccessibleOutputStream baos = new 
ByteArrayAccessibleOutputStream();
         IMissingWriter missingWriter = 
missingWriterFactory.createMissingWriter();
         missingWriter.writeMissing(new DataOutputStream(baos));
         return baos;
     }
-}
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index e1acaee..7338cc7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -48,6 +48,7 @@

     protected int tupleCount;
     protected int tupleDataEndOffset;
+    protected boolean ignoreFailures = false;

     @Override
     public void reset(IFrame frame, boolean clear) throws HyracksDataException 
{
@@ -91,10 +92,21 @@
     public void write(IFrameWriter outWriter, boolean clearFrame) throws 
HyracksDataException {
         failIfInterrupted();
         getBuffer().clear();
-        outWriter.nextFrame(getBuffer());
-        if (clearFrame) {
-            frame.reset();
-            reset(getBuffer(), true);
+        if (!ignoreFailures) {
+            outWriter.nextFrame(getBuffer());
+            if (clearFrame) {
+                frame.reset();
+                reset(getBuffer(), true);
+            }
+        } else {
+            try {
+                outWriter.nextFrame(getBuffer());
+            } finally {
+                if (clearFrame) {
+                    frame.reset();
+                    reset(getBuffer(), true);
+                }
+            }
         }
     }

@@ -125,4 +137,12 @@
             throw HyracksDataException.create(new InterruptedException());
         }
     }
+
+    public void setIgnoreFailures(boolean ignoreFailures) {
+        this.ignoreFailures = ignoreFailures;
+    }
+
+    public boolean isIgnoreFailures() {
+        return ignoreFailures;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 3ef8b28..58194f4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -299,5 +299,4 @@
         }
         return false;
     }
-
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19744
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I0b23d88db632d90aedec409db816c66a5a688daf
Gerrit-Change-Number: 19744
Gerrit-PatchSet: 7
Gerrit-Owner: Peeyush Gupta <[email protected]>
Gerrit-Reviewer: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Peeyush Gupta <[email protected]>
Gerrit-MessageType: merged

Reply via email to