Ildar Absalyamov has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/443
Change subject: ASTERIXDB-1002: Fix exception handling in
EmptyTupleSourceRuntimeFactory
......................................................................
ASTERIXDB-1002: Fix exception handling in EmptyTupleSourceRuntimeFactory
Revisiting the previous fix by calling fail() on pipeline
Change-Id: I19f8c8485e483e4d4efeff939e6bd82c7a04a101
---
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
M
algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
M
hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
3 files changed, 29 insertions(+), 21 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/43/443/1
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 6618326..1a7150e 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -20,9 +20,6 @@
import java.nio.ByteBuffer;
-import org.json.JSONException;
-import org.json.JSONObject;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
@@ -36,6 +33,8 @@
import
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.json.JSONException;
+import org.json.JSONObject;
public class AlgebricksMetaOperatorDescriptor extends
AbstractSingleActivityOperatorDescriptor {
@@ -93,10 +92,11 @@
final IRecordDescriptorProvider recordDescProvider, int partition,
int nPartitions) {
return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
public void initialize() throws HyracksDataException {
IFrameWriter startOfPipeline;
- RecordDescriptor pipelineOutputRecordDescriptor = outputArity
> 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
- : null;
+ RecordDescriptor pipelineOutputRecordDescriptor = outputArity
> 0
+ ?
AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0] : null;
PipelineAssembler pa = new PipelineAssembler(pipeline,
inputArity, outputArity, null,
pipelineOutputRecordDescriptor);
@@ -105,8 +105,13 @@
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
- startOfPipeline.open();
- startOfPipeline.close();
+ try {
+ startOfPipeline.open();
+ } catch (HyracksDataException e) {
+ startOfPipeline.fail();
+ } finally {
+ startOfPipeline.close();
+ }
}
};
}
@@ -120,10 +125,10 @@
@Override
public void open() throws HyracksDataException {
if (startOfPipeline == null) {
- RecordDescriptor pipelineOutputRecordDescriptor =
outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
- : null;
- RecordDescriptor pipelineInputRecordDescriptor =
recordDescProvider.getInputRecordDescriptor(
-
AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
+ RecordDescriptor pipelineOutputRecordDescriptor =
outputArity > 0
+ ?
AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0] : null;
+ RecordDescriptor pipelineInputRecordDescriptor =
recordDescProvider
+
.getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(),
0);
PipelineAssembler pa = new PipelineAssembler(pipeline,
inputArity, outputArity,
pipelineInputRecordDescriptor,
pipelineOutputRecordDescriptor);
try {
diff --git
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 5b66736..a2b9652 100644
---
a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++
b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -43,20 +43,21 @@
public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx)
throws HyracksDataException {
return new AbstractOneInputSourcePushRuntime() {
- private ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
- private FrameTupleAppender appender = new FrameTupleAppender(new
VSizeFrame(ctx));
+ private final ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
+ private final FrameTupleAppender appender = new
FrameTupleAppender(new VSizeFrame(ctx));
@Override
public void open() throws HyracksDataException {
writer.open();
- try {
- if (!appender.append(tb.getFieldEndOffsets(),
tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- appender.flush(writer, true);
- } finally {
- writer.close();
+ if (!appender.append(tb.getFieldEndOffsets(),
tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
}
+ appender.flush(writer, true);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
}
};
}
diff --git
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
index 7f6dd10..2ca1e0f 100644
---
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
+++
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -81,7 +81,9 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
ensureConnected();
- delegate.nextFrame(buffer);
+ if (!failed) {
+ delegate.nextFrame(buffer);
+ }
}
private void ensureConnected() throws HyracksDataException {
--
To view, visit https://asterix-gerrit.ics.uci.edu/443
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I19f8c8485e483e4d4efeff939e6bd82c7a04a101
Gerrit-PatchSet: 1
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: Ildar Absalyamov <[email protected]>