>From Peeyush Gupta <[email protected]>: Peeyush Gupta has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19744 )
Change subject: [ASTERIXDB-3603][FUN] Runtime changes for transform functions ...................................................................... [ASTERIXDB-3603][FUN] Runtime changes for transform functions - user model changes: no - storage format changes: no - interface changes: yes Ext-ref: MB-63039 Change-Id: I0b23d88db632d90aedec409db816c66a5a688daf Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19744 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Peeyush Gupta <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Peeyush Gupta <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java 16 files changed, 254 insertions(+), 113 deletions(-) Approvals: Ali Alsuliman: Looks good to me, approved Peeyush Gupta: Looks good to me, but someone else must approve; Verified Jenkins: Verified diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java index 3bb871b..6d37447 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java @@ -488,6 +488,16 @@ return isFileStore ? String.valueOf(ExternalWriterProvider.getSeparator(adapter)) : ""; } + private LogicalVariable getUnnestVar(ILogicalOperator op) { + while (op.getOperatorTag() != LogicalOperatorTag.UNNEST && !op.getInputs().isEmpty()) { + op = op.getInputs().get(0).getValue(); + } + if (op.getOperatorTag() == LogicalOperatorTag.UNNEST) { + return ((UnnestOperator) op).getVariable(); + } + return null; + } + public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt, ILogicalOperator baseOp, IResultMetadata resultMetadata) throws AlgebricksException { MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator()); @@ -500,8 +510,11 @@ ILogicalOperator topOp = p.first; List<LogicalVariable> liveVars = new ArrayList<>(); VariableUtilities.getLiveVariables(topOp, liveVars); - LogicalVariable unnestVar = liveVars.get(0); - LogicalVariable resVar = unnestVar; + LogicalVariable unnestVar = getUnnestVar(topOp); + if (unnestVar == null) { + unnestVar = liveVars.get(0); + } + LogicalVariable resVar = liveVars.get(0); if (outputDatasetName == null) { FileSplit outputFileSplit = metadataProvider.getOutputFile(); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp index 0ab3ace..f03a74d 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp @@ -18,8 +18,6 @@ */ use test; --- Fail: More than one argument - CREATE TRANSFORM FUNCTION transformTest1(doc) { SELECT count(*) as count FROM [doc] t UNNEST t.a }; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp index 6d55991..a04ea00 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp @@ -18,8 +18,6 @@ */ use test; --- Fail: Less than one argument - CREATE TRANSFORM FUNCTION transformTest2(doc) { SELECT (SELECT * FROM [doc] t UNNEST t.a) as tt }; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp index c38bd72..0770d98 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp @@ -18,8 +18,6 @@ */ use test; --- Fail: Using a collection in the definition - CREATE TRANSFORM FUNCTION transformTest3(doc) { SELECT * FROM [doc] t UNNEST t.a LIMIT 1 }; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp index 10cfb4e..d2c2265 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp @@ -18,8 +18,6 @@ */ use test; --- Fail: Using a view in the definition - CREATE TRANSFORM FUNCTION transformTest4(cust) { SELECT VALUE c FROM [cust] AS c diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java index 25d43b2..a60e53b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java @@ -689,12 +689,6 @@ // in the cache. return null; } - //TODO(DB): review this and other similar ones - if (ctx.getDataverse(functionSignature.getDatabaseName(), functionSignature.getDataverseName()) != null) { - // This transaction has dropped and subsequently created the same - // dataverse. - return null; - } function = cache.getFunction(functionSignature); if (function != null) { // Function is already in the cache, don't add it again. @@ -1348,6 +1342,15 @@ INSTANCE = new NCMetadataManagerImpl(proxies, metadataNode); } + @Override + public List<Dataset> getAllDatasets(MetadataTransactionContext ctx) throws AlgebricksException { + try { + return metadataNode.getAllDatasets(ctx.getTxnId()); + } catch (RemoteException e) { + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); + } + } + private static class CCMetadataManagerImpl extends MetadataManager { private final MetadataProperties metadataProperties; private final ICcApplicationContext appCtx; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java index fdd36de..4ce8742 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java @@ -919,4 +919,6 @@ String feedName) throws AlgebricksException; long getMaxTxnId(); + + List<Dataset> getAllDatasets(MetadataTransactionContext ctx) throws AlgebricksException; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java index ee4a2c9..bc6789c 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java @@ -1017,4 +1017,6 @@ List<FeedConnection> getFeedConnections(TxnId txnId, String database, DataverseName dataverseName, String feedName) throws AlgebricksException, RemoteException; + + List<Dataset> getAllDatasets(TxnId txnId) throws AlgebricksException, RemoteException; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index f11a338..63eda21 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -2033,4 +2033,8 @@ off += Character.charCount(codePointChar); } } + + public List<Dataset> getAllDatasets() throws AlgebricksException { + return MetadataManager.INSTANCE.getAllDatasets(mdTxnCtx); + } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java index 1701e64..005bdf5 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java @@ -69,6 +69,18 @@ this.funID = funID; } + public IScalarEvaluatorFactory getListEvalFactory() { + return listEvalFactory; + } + + public SourceLocation getSourceLoc() { + return sourceLoc; + } + + public FunctionIdentifier getFunID() { + return funID; + } + @Override public IUnnestingEvaluator createUnnestingEvaluator(IEvaluatorContext ctx) throws HyracksDataException { return new IUnnestingEvaluator() { diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java index 0c74260..9b643cd 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java @@ -26,6 +26,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.CleanupUtils; +import org.apache.hyracks.dataflow.common.comm.io.AbstractFrameAppender; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; @@ -38,10 +39,12 @@ protected IFrame frame; protected FrameTupleAccessor tAccess; protected FrameTupleReference tRef; + protected boolean ignoreFailures = false; protected final void initAccessAppend(IHyracksTaskContext ctx) throws HyracksDataException { frame = new VSizeFrame(ctx); appender = new FrameTupleAppender(frame); + ((AbstractFrameAppender) appender).setIgnoreFailures(ignoreFailures); tAccess = new FrameTupleAccessor(inputRecordDesc); } @@ -59,8 +62,10 @@ try { flushIfNotFailed(); } catch (Exception e) { - closeException = e; - fail(closeException); + if (!ignoreFailures) { + closeException = e; + fail(closeException); + } } finally { closeException = CleanupUtils.close(writer, closeException); } @@ -115,4 +120,15 @@ throws HyracksDataException { FrameUtils.appendConcatToWriter(writer, getTupleAppender(), accessor0, tIndex0, accessor1, tIndex1); } + + public void setIgnoreFailures(boolean ignoreFailures) { + this.ignoreFailures = ignoreFailures; + if (appender != null) { + ((AbstractFrameAppender) appender).setIgnoreFailures(ignoreFailures); + } + } + + public boolean isIgnoreFailures() { + return ignoreFailures; + } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java index 202c087..c2e8473 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java @@ -26,6 +26,7 @@ import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.base.ProfiledPushRuntime; +import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -46,15 +47,23 @@ private final int outputArity; private final AlgebricksPipeline pipeline; private final Map<IPushRuntimeFactory, IPushRuntime[]> runtimeMap; + private final boolean ignoreFailures; public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity, RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor) { + this(pipeline, inputArity, outputArity, pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor, false); + } + + public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity, + RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor, + boolean ignoreFailures) { this.pipeline = pipeline; this.pipelineInputRecordDescriptor = pipelineInputRecordDescriptor; this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor; this.inputArity = inputArity; this.outputArity = outputArity; this.runtimeMap = new HashMap<>(); + this.ignoreFailures = ignoreFailures; } public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException { @@ -99,6 +108,9 @@ } else { newRuntimes[j].setOutputFrameWriter(0, start, recordDescriptors[i]); } + if (newRuntimes[j] instanceof AbstractOneInputOneOutputOneFramePushRuntime) { + ((AbstractOneInputOneOutputOneFramePushRuntime) newRuntimes[j]).setIgnoreFailures(ignoreFailures); + } } runtimeMap.put(runtimeFactory, newRuntimes); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java index 7feca3c..242c603 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java @@ -90,6 +90,22 @@ return pipelines; } + public RecordDescriptor getInputRecordDesc() { + return inputRecordDesc; + } + + public RecordDescriptor getOutputRecordDesc() { + return outputRecordDesc; + } + + public IMissingWriterFactory[] getMissingWriterFactories() { + return missingWriterFactories; + } + + public Map<IPushRuntimeFactory, IOperatorStats> getStats() { + return stats; + } + public void setStats(Map<IPushRuntimeFactory, IOperatorStats> stats) { this.stats.putAll(stats); } @@ -97,20 +113,20 @@ @Override public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException { - return new SubplanPushRuntime(ctx); + return new SubplanPushRuntime(ctx, false); } - private class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { + public class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { - final IHyracksTaskContext ctx; + protected final IHyracksTaskContext ctx; - final NestedTupleSourceRuntime[] startOfPipelines; + protected final NestedTupleSourceRuntime[] startOfPipelines; boolean first; boolean profile; - SubplanPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + protected SubplanPushRuntime(IHyracksTaskContext ctx, boolean ignoreFailures) throws HyracksDataException { this.ctx = ctx; this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME); this.first = true; @@ -150,7 +166,8 @@ outputRecordDescriptor = pipelineLastRecordDescriptor; } - PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor); + PipelineAssembler pa = + new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor, ignoreFailures); IFrameWriter head = pa.assemblePipeline(outputWriter, ctx, stats); startOfPipelines[i] = (NestedTupleSourceRuntime) head; pipelineAssemblers[i] = pa; diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java index 0874f2b..9cfc070 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java @@ -52,8 +52,8 @@ private static final long serialVersionUID = 1L; - private final int outCol; - private final int positionalCol; + protected final int outCol; + protected final int positionalCol; private final IUnnestingEvaluatorFactory unnestingFactory; private final IUnnestingPositionWriterFactory positionWriterFactory; private final boolean leftOuter; @@ -76,6 +76,26 @@ this.missingWriterFactory = missingWriterFactory; } + public int[] getProjectionList() { + return projectionList; + } + + public int getOutCol() { + return outCol; + } + + public int getPositionalCol() { + return positionalCol; + } + + public IUnnestingEvaluatorFactory getUnnestingFactory() { + return unnestingFactory; + } + + public IUnnestingPositionWriterFactory getPositionWriterFactory() { + return positionWriterFactory; + } + @Override public String toString() { return "unnest " + outCol + (positionalCol >= 0 ? " at " + positionalCol : "") + " <- " + unnestingFactory @@ -85,92 +105,102 @@ @Override public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException { - ByteArrayAccessibleOutputStream missingBytes = leftOuter ? writeMissingBytes() : null; - IEvaluatorContext evalCtx = new EvaluatorContext(ctx); - return new AbstractOneInputOneOutputOneFramePushRuntime() { - private IPointable p = VoidPointable.FACTORY.createPointable(); - private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length); - private IUnnestingEvaluator unnest = unnestingFactory.createUnnestingEvaluator(evalCtx); - private final IUnnestingPositionWriter positionWriter = - positionWriterFactory != null ? positionWriterFactory.createUnnestingPositionWriter() : null; - - @Override - public void open() throws HyracksDataException { - super.open(); - if (tRef == null) { - initAccessAppendRef(ctx); - } - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - tAccess.reset(buffer); - int nTuple = tAccess.getTupleCount(); - for (int t = 0; t < nTuple; t++) { - tRef.reset(tAccess, t); - try { - unnest.init(tRef); - unnesting(t); - } catch (IOException ae) { - throw HyracksDataException.create(ae); - } - } - } - - private void unnesting(int t) throws IOException { - // Assumes that when unnesting the tuple, each step() call for each element - // in the tuple will increase the positionIndex, and the positionIndex will - // be reset when a new tuple is to be processed. - int positionIndex = 1; - boolean emitted = false; - do { - if (!unnest.step(p)) { - break; - } - writeOutput(t, positionIndex++, false); - emitted = true; - } while (true); - if (leftOuter && !emitted) { - writeOutput(t, -1, true); - } - } - - private void writeOutput(int t, int positionIndex, boolean missing) - throws HyracksDataException, IOException { - tupleBuilder.reset(); - for (int f = 0; f < projectionList.length; f++) { - int col = projectionList[f]; - if (col == outCol) { - if (missing) { - tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size()); - } else { - tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength()); - } - } else if (col == positionalCol) { - if (missing) { - tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size()); - } else { - positionWriter.write(tupleBuilder.getDataOutput(), positionIndex); - tupleBuilder.addFieldEndOffset(); - } - } else { - tupleBuilder.addField(tAccess, t, projectionList[f]); - } - } - appendToFrameFromTupleBuilder(tupleBuilder); - } - - @Override - public void flush() throws HyracksDataException { - appender.flush(writer); - } - }; + return new UnnestPushRuntime(ctx); } - private ByteArrayAccessibleOutputStream writeMissingBytes() throws HyracksDataException { + public class UnnestPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { + private IPointable p = VoidPointable.FACTORY.createPointable(); + protected ArrayTupleBuilder tupleBuilder; + protected IUnnestingEvaluator unnest; + private final IUnnestingPositionWriter positionWriter; + private final IHyracksTaskContext ctx; + protected ByteArrayAccessibleOutputStream missingBytes; + + public UnnestPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + this.ctx = ctx; + IEvaluatorContext evalCtx = new EvaluatorContext(ctx); + unnest = unnestingFactory.createUnnestingEvaluator(evalCtx); + tupleBuilder = new ArrayTupleBuilder(projectionList.length); + positionWriter = + positionWriterFactory != null ? positionWriterFactory.createUnnestingPositionWriter() : null; + missingBytes = leftOuter ? writeMissingBytes() : null; + } + + @Override + public void open() throws HyracksDataException { + super.open(); + if (tRef == null) { + initAccessAppendRef(ctx); + } + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + tAccess.reset(buffer); + int nTuple = tAccess.getTupleCount(); + for (int t = 0; t < nTuple; t++) { + tRef.reset(tAccess, t); + try { + unnest.init(tRef); + unnesting(t); + } catch (IOException ae) { + throw HyracksDataException.create(ae); + } + } + } + + protected void unnesting(int t) throws IOException { + // Assumes that when unnesting the tuple, each step() call for each element + // in the tuple will increase the positionIndex, and the positionIndex will + // be reset when a new tuple is to be processed. + int positionIndex = 1; + boolean emitted = false; + do { + if (!unnest.step(p)) { + break; + } + writeOutput(t, positionIndex++, false); + emitted = true; + } while (true); + if (leftOuter && !emitted) { + writeOutput(t, -1, true); + } + } + + private void writeOutput(int t, int positionIndex, boolean missing) throws HyracksDataException, IOException { + tupleBuilder.reset(); + for (int f = 0; f < projectionList.length; f++) { + int col = projectionList[f]; + if (col == outCol) { + if (missing) { + tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size()); + } else { + tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength()); + } + } else if (col == positionalCol) { + if (missing) { + tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size()); + } else { + positionWriter.write(tupleBuilder.getDataOutput(), positionIndex); + tupleBuilder.addFieldEndOffset(); + } + } else { + tupleBuilder.addField(tAccess, t, projectionList[f]); + } + } + appendToFrameFromTupleBuilder(tupleBuilder); + } + + @Override + public void flush() throws HyracksDataException { + appender.flush(writer); + } + }; + + protected ByteArrayAccessibleOutputStream writeMissingBytes() throws HyracksDataException { ByteArrayAccessibleOutputStream baos = new ByteArrayAccessibleOutputStream(); IMissingWriter missingWriter = missingWriterFactory.createMissingWriter(); missingWriter.writeMissing(new DataOutputStream(baos)); return baos; } -} +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java index e1acaee..7338cc7 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java @@ -48,6 +48,7 @@ protected int tupleCount; protected int tupleDataEndOffset; + protected boolean ignoreFailures = false; @Override public void reset(IFrame frame, boolean clear) throws HyracksDataException { @@ -91,10 +92,21 @@ public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException { failIfInterrupted(); getBuffer().clear(); - outWriter.nextFrame(getBuffer()); - if (clearFrame) { - frame.reset(); - reset(getBuffer(), true); + if (!ignoreFailures) { + outWriter.nextFrame(getBuffer()); + if (clearFrame) { + frame.reset(); + reset(getBuffer(), true); + } + } else { + try { + outWriter.nextFrame(getBuffer()); + } finally { + if (clearFrame) { + frame.reset(); + reset(getBuffer(), true); + } + } } } @@ -125,4 +137,12 @@ throw HyracksDataException.create(new InterruptedException()); } } + + public void setIgnoreFailures(boolean ignoreFailures) { + this.ignoreFailures = ignoreFailures; + } + + public boolean isIgnoreFailures() { + return ignoreFailures; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java index 3ef8b28..58194f4 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java @@ -299,5 +299,4 @@ } return false; } - } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19744 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I0b23d88db632d90aedec409db816c66a5a688daf Gerrit-Change-Number: 19744 Gerrit-PatchSet: 7 Gerrit-Owner: Peeyush Gupta <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Peeyush Gupta <[email protected]> Gerrit-MessageType: merged
