Ildar Absalyamov has submitted this change and it was merged. Change subject: ASTERIXDB-1002: Fix exception handling in EmptyTupleSourceRuntimeFactory ......................................................................
ASTERIXDB-1002: Fix exception handling in EmptyTupleSourceRuntimeFactory Revisiting the previous fix by calling fail() on pipeline Change-Id: I19f8c8485e483e4d4efeff939e6bd82c7a04a101 Reviewed-on: https://asterix-gerrit.ics.uci.edu/443 Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: Ian Maxon <[email protected]> --- M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java M algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java M hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java 3 files changed, 29 insertions(+), 21 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Ian Maxon: Looks good to me, but someone else must approve Jenkins: Verified diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java index 6618326..1a7150e 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java @@ -20,9 +20,6 @@ import java.nio.ByteBuffer; -import org.json.JSONException; -import org.json.JSONObject; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; @@ -36,6 +33,8 @@ import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; +import org.json.JSONException; +import org.json.JSONObject; public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { @@ -93,10 +92,11 @@ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) { return new AbstractUnaryOutputSourceOperatorNodePushable() { + @Override public void initialize() throws HyracksDataException { IFrameWriter startOfPipeline; - RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0] - : null; + RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 + ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0] : null; PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null, pipelineOutputRecordDescriptor); @@ -105,8 +105,13 @@ } catch (AlgebricksException e) { throw new HyracksDataException(e); } - startOfPipeline.open(); - startOfPipeline.close(); + try { + startOfPipeline.open(); + } catch (HyracksDataException e) { + startOfPipeline.fail(); + } finally { + startOfPipeline.close(); + } } }; } @@ -120,10 +125,10 @@ @Override public void open() throws HyracksDataException { if (startOfPipeline == null) { - RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0] - : null; - RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor( - AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0); + RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 + ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0] : null; + RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider + .getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0); PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor); try { diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java index 5b66736..a2b9652 100644 --- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java +++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java @@ -43,20 +43,21 @@ public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException { return new AbstractOneInputSourcePushRuntime() { - private ArrayTupleBuilder tb = new ArrayTupleBuilder(0); - private FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx)); + private final ArrayTupleBuilder tb = new ArrayTupleBuilder(0); + private final FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx)); @Override public void open() throws HyracksDataException { writer.open(); - try { - if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) { - throw new IllegalStateException(); - } - appender.flush(writer, true); - } finally { - writer.close(); + if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) { + throw new IllegalStateException(); } + appender.flush(writer, true); + } + + @Override + public void close() throws HyracksDataException { + writer.close(); } }; } diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java index 7f6dd10..2ca1e0f 100644 --- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java +++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java @@ -81,7 +81,9 @@ @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { ensureConnected(); - delegate.nextFrame(buffer); + if (!failed) { + delegate.nextFrame(buffer); + } } private void ensureConnected() throws HyracksDataException { -- To view, visit https://asterix-gerrit.ics.uci.edu/443 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I19f8c8485e483e4d4efeff939e6bd82c7a04a101 Gerrit-PatchSet: 2 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: Ildar Absalyamov <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Ildar Absalyamov <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]>
