Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1837
Change subject: Improved exception handling for IFrameWriter fail() / close()
......................................................................
Improved exception handling for IFrameWriter fail() / close()
Adds helpers methods to correctly ensure exceptions are not lost during
IFrameWriter execution
Change-Id: Ib09844f03701168727ad6cf4be0e3197d8708451
---
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
A
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameWriterLifecycleHelper.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/MultiThreadTaskEmulator.java
M
hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
43 files changed, 434 insertions(+), 483 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/37/1837/1
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
index b1c7ff3..4d4d18c 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -27,6 +27,7 @@
import org.apache.asterix.test.common.TestTupleGenerator;
import org.apache.asterix.test.common.TestTupleGenerator.FieldType;
import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -84,8 +85,7 @@
IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc,
partitionWriterFactory,
CURRENT_PRODUCER, NUMBER_OF_CONSUMERS,
NUMBER_OF_CONSUMERS);
List<TestFrameWriter> recipients = new ArrayList<>();
- try {
- partitioner.open();
+ FrameWriterLifecycleHelper.openAndRun(partitioner, () -> {
FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
for (IFrameWriter writer :
partitionWriterFactory.getWriters().values()) {
recipients.add((TestFrameWriter) writer);
@@ -124,12 +124,7 @@
Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
MessagingFrameTupleAppender.getMessageType(tempBuffer));
}
- } catch (Throwable t) {
- partitioner.fail();
- throw t;
- } finally {
- partitioner.close();
- }
+ });
for (TestFrameWriter writer : recipients) {
Assert.assertEquals(writer.nextFrameCount(), 4);
Assert.assertEquals(writer.closeCount(), 1);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
index dd713e6..be87c6f 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
@@ -28,6 +28,7 @@
import org.apache.asterix.external.indexing.RecordId;
import org.apache.asterix.external.indexing.RecordIdReader;
import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -95,12 +96,19 @@
@Override
public void fail() throws HyracksDataException {
+ HyracksDataException hde = null;
try {
recordReader.fail();
- } catch (Throwable th) {
- throw new HyracksDataException(th);
- } finally {
+ } catch (Exception th) {
+ hde = HyracksDataException.create(th);
+ }
+ try {
writer.fail();
+ } catch (Exception ex) {
+ hde = HyracksDataException.suppress(hde, ex);
+ }
+ if (hde != null) {
+ throw hde;
}
}
@@ -153,14 +161,7 @@
@Override
public void close() throws HyracksDataException {
- try {
- appender.write(writer, true);
- } catch (Exception e) {
- writer.fail();
- throw e;
- } finally {
- writer.close();
- }
+ FrameWriterLifecycleHelper.run(writer, () -> appender.write(writer,
true));
}
@Override
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
index 6ca4b77..73ee15b 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
@@ -28,6 +28,7 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.log4j.Logger;
@@ -49,23 +50,7 @@
rootFailureCause);
}
// registering a new collector.
- try {
- frameCollector.open();
- } catch (Throwable th) {
- rootFailureCause = th;
- try {
- frameCollector.fail();
- } catch (Throwable failThrowable) {
- th.addSuppressed(failThrowable);
- } finally {
- try {
- frameCollector.close();
- } catch (Throwable closeThrowable) {
- th.addSuppressed(closeThrowable);
- }
- }
- throw th;
- }
+ FrameWriterLifecycleHelper.openAndRun(frameCollector);
registeredCollectors.put(frameCollector.getConnectionId(),
frameCollector);
}
@@ -81,9 +66,9 @@
FeedFrameCollector frameCollector = removeFrameCollector(connectionId);
try {
frameCollector.close();
- } catch (Throwable th) {
- rootFailureCause = th;
- throw th;
+ } catch (Exception ex) {
+ rootFailureCause = ex;
+ throw ex;
}
}
@@ -108,9 +93,9 @@
for (FeedFrameCollector collector : registeredCollectors.values()) {
try {
collector.nextFrame(frame);
- } catch (Throwable th) {
- rootFailureCause = th;
- throw th;
+ } catch (Exception ex) {
+ rootFailureCause = ex;
+ throw ex;
}
}
}
@@ -123,16 +108,16 @@
FeedFrameCollector collector = it.next();
try {
collector.fail();
- } catch (Throwable th) {
+ } catch (Exception ex) {
while (it.hasNext()) {
FeedFrameCollector innerCollector = it.next();
try {
innerCollector.fail();
- } catch (Throwable innerTh) {
- th.addSuppressed(innerTh);
+ } catch (Exception innerEx) {
+ ex.addSuppressed(innerEx);
}
}
- throw th;
+ throw ex;
}
}
}
@@ -145,19 +130,19 @@
FeedFrameCollector collector = it.next();
try {
collector.close();
- } catch (Throwable th) {
+ } catch (Exception ex) {
while (it.hasNext()) {
FeedFrameCollector innerCollector = it.next();
try {
innerCollector.close();
- } catch (Throwable innerTh) {
- th.addSuppressed(innerTh);
+ } catch (Exception innerEx) {
+ ex.addSuppressed(innerEx);
} finally {
innerCollector.setState(State.FINISHED);
}
}
// resume here
- throw th;
+ throw ex;
} finally {
collector.setState(State.FINISHED);
}
@@ -172,9 +157,9 @@
for (FeedFrameCollector collector : registeredCollectors.values()) {
try {
collector.flush();
- } catch (Throwable th) {
- rootFailureCause = th;
- throw th;
+ } catch (Exception ex) {
+ rootFailureCause = ex;
+ throw ex;
}
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
index 45d424e..d1da261 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
@@ -88,8 +88,8 @@
if (index != null) {
try {
bulkLoader.end();
- } catch (Throwable th) {
- throw new HyracksDataException(th);
+ } catch (Exception th) {
+ throw HyracksDataException.create(th);
} finally {
try {
indexHelper.close();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
index 6299982..d3d5273 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
@@ -77,8 +77,8 @@
snapshotAccessor.open();
indexOpen = true;
adapter.open();
- } catch (Throwable th) {
- throw new HyracksDataException(th);
+ } catch (Exception th) {
+ throw HyracksDataException.create(th);
}
}
@@ -88,18 +88,17 @@
if (indexOpen) {
try {
snapshotAccessor.close();
- } catch (Throwable th) {
- hde = new HyracksDataException(th);
+ } catch (Exception th) {
+ hde = HyracksDataException.create(th);
}
try {
adapter.close();
- } catch (Throwable th) {
- if (hde == null) {
- hde = new HyracksDataException(th);
- } else {
- hde.addSuppressed(th);
- }
+ } catch (Exception ex) {
+ hde = HyracksDataException.suppress(hde, ex);
}
+ }
+ if (hde != null) {
+ throw hde;
}
}
@@ -107,8 +106,8 @@
public void fail() throws HyracksDataException {
try {
adapter.fail();
- } catch (Throwable th) {
- throw new HyracksDataException(th);
+ } catch (Exception th) {
+ throw HyracksDataException.create(th);
}
}
@@ -116,8 +115,8 @@
public void nextFrame(ByteBuffer buffer) throws
HyracksDataException {
try {
adapter.nextFrame(buffer);
- } catch (Throwable th) {
- throw new HyracksDataException(th);
+ } catch (Exception th) {
+ throw HyracksDataException.create(th);
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
index 93acb26..ef9fded 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
@@ -20,6 +20,7 @@
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -55,17 +56,10 @@
@Override
public void initialize() throws HyracksDataException {
- IDataSourceAdapter adapter = null;
- try {
- writer.open();
- adapter = adapterFactory.createAdapter(ctx, partition);
+ FrameWriterLifecycleHelper.openAndRun(writer, () -> {
+ IDataSourceAdapter adapter =
adapterFactory.createAdapter(ctx, partition);
adapter.start(partition, writer);
- } catch (Exception e) {
- writer.fail();
- throw new HyracksDataException(e);
- } finally {
- writer.close();
- }
+ });
}
};
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 2876ea6..5112730 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -92,7 +92,11 @@
* indicative of a failure at the sibling intake operator
location. The surviving intake partitions must
* continue to live and receive data from the external source.
*/
- writer.fail();
+ try {
+ writer.fail();
+ } catch (Exception failException) {
+ e.addSuppressed(failException);
+ }
throw e;
} finally {
writer.close();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 24a7462..6d45291 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -142,8 +142,8 @@
FeedUtils.processFeedMessage(buffer, message, fta);
writer.nextFrame(buffer);
} catch (Exception e) {
- LOGGER.log(Level.WARNING, e.getMessage(), e);
- throw new HyracksDataException(e);
+ LOGGER.log(Level.WARNING, "exception in nextFrame", e);
+ throw HyracksDataException.create(e);
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 97c1115..571d91f 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -140,8 +140,8 @@
FeedUtils.processFeedMessage(buffer, message, fta);
writer.nextFrame(buffer);
} catch (Exception e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
+ LOGGER.log(Level.WARNING, "exception in nextFrame", e);
+ throw HyracksDataException.create(e);
}
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index 1e22458..5114471 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -32,6 +32,7 @@
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.utils.TransactionUtil;
import
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -171,14 +172,7 @@
if (isSink) {
return;
}
- try {
- flushIfNotFailed();
- } catch (Exception e) {
- writer.fail();
- throw e;
- } finally {
- writer.close();
- }
+ FrameWriterLifecycleHelper.run(writer, this::flushIfNotFailed);
appender.reset(frame, true);
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 2d8eaed..aa5f296 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.algebricks.runtime.operators.base;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameAppender;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
@@ -51,14 +52,7 @@
@Override
public void close() throws HyracksDataException {
- try {
- flushIfNotFailed();
- } catch (Exception e) {
- writer.fail();
- throw e;
- } finally {
- writer.close();
- }
+ FrameWriterLifecycleHelper.run(writer, this::flushIfNotFailed);
}
protected void flushAndReset() throws HyracksDataException {
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index f6ebf19..4627048 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -89,20 +90,11 @@
@Override
public void initialize() throws HyracksDataException {
- IFrameWriter startOfPipeline;
RecordDescriptor pipelineOutputRecordDescriptor =
outputArity > 0 ?
AlgebricksMetaOperatorDescriptor.this.outRecDescs[0] : null;
PipelineAssembler pa =
new PipelineAssembler(pipeline, inputArity,
outputArity, null, pipelineOutputRecordDescriptor);
- startOfPipeline = pa.assemblePipeline(writer, ctx);
- try {
- startOfPipeline.open();
- } catch (Exception e) {
- startOfPipeline.fail();
- throw e;
- } finally {
- startOfPipeline.close();
- }
+
FrameWriterLifecycleHelper.openAndRun(pa.assemblePipeline(writer, ctx));
}
};
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 8b13e09..85bf222 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -28,6 +28,7 @@
import
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
import
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
import
org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
@@ -159,14 +160,7 @@
for (int t = 0; t < nTuple; t++) {
tRef.reset(tAccess, t);
startOfPipeline.writeTuple(buffer, t);
- try {
- startOfPipeline.open();
- } catch (Exception e) {
- startOfPipeline.fail();
- throw e;
- } finally {
- startOfPipeline.close();
- }
+ FrameWriterLifecycleHelper.openAndRun(startOfPipeline);
}
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
index 68274ce..aab5776 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
@@ -166,12 +166,8 @@
try {
appenders[i].write(writers[i], true);
writers[i].close();
- } catch (Throwable th) {
- if (hde == null) {
- hde = new HyracksDataException(th);
- } else {
- hde.addSuppressed(th);
- }
+ } catch (Exception ex) {
+ hde = HyracksDataException.suppress(hde, ex);
}
}
}
@@ -187,12 +183,8 @@
if (isOpen[i]) {
try {
writers[i].fail();
- } catch (Throwable th) {
- if (hde == null) {
- hde = new HyracksDataException(th);
- } else {
- hde.addSuppressed(th);
- }
+ } catch (Exception ex) {
+ hde = HyracksDataException.suppress(hde, ex);
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameWriterLifecycleHelper.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameWriterLifecycleHelper.java
new file mode 100644
index 0000000..d95d126
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameWriterLifecycleHelper.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.comm;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FrameWriterLifecycleHelper {
+
+ @FunctionalInterface
+ public interface Failable {
+ void call() throws Exception;
+ }
+
+ public static void run(IFrameWriter writer, Failable tryBlock) throws
HyracksDataException {
+ run(writer, tryBlock, null, null);
+ }
+
+ public static void run(IFrameWriter writer, Failable tryBlock, Failable
extraCatch, Failable extraFinally)
+ throws HyracksDataException {
+ execute(writer, false, tryBlock, null, null);
+ }
+
+ public static void openAndRun(IFrameWriter writer) throws
HyracksDataException {
+ openAndRun(writer, null, null, null);
+ }
+
+ public static void openAndRun(IFrameWriter writer, Failable tryBlock)
throws HyracksDataException {
+ openAndRun(writer, tryBlock, null, null);
+ }
+
+ public static void openAndRun(IFrameWriter writer, Failable tryBlock,
Failable extraCatch, Failable extraFinally)
+ throws HyracksDataException {
+ execute(writer, true, tryBlock, extraCatch, extraFinally);
+ }
+
+ private static void execute(IFrameWriter writer, boolean open, Failable
tryBlock, Failable extraCatch,
+ Failable extraFinally) throws HyracksDataException {
+ HyracksDataException hde = null;
+ try {
+ if (open) {
+ writer.open();
+ }
+ if (tryBlock != null) {
+ tryBlock.call();
+ }
+ } catch (Exception e) {
+ hde = catchBlock(writer, e, extraCatch);
+ } finally {
+ hde = finallyBlock(writer, hde, extraFinally);
+ }
+ if (hde != null) {
+ throw hde;
+ }
+ }
+
+ private static HyracksDataException catchBlock(IFrameWriter writer,
Exception e, Failable extraCatch) {
+ HyracksDataException hde = HyracksDataException.create(e);
+ if (extraCatch != null) {
+ try {
+ extraCatch.call();
+ } catch (Exception extraCatchException) {
+ hde = HyracksDataException.suppress(hde, extraCatchException);
+ }
+ }
+ try {
+ writer.fail();
+ } catch (Exception catchEx) {
+ hde.addSuppressed(catchEx);
+ }
+ return hde;
+ }
+
+ private static HyracksDataException finallyBlock(IFrameWriter writer,
HyracksDataException hde,
+ Failable extraFinally) {
+ if (extraFinally != null) {
+ try {
+ extraFinally.call();
+ } catch (Exception e) {
+ hde = HyracksDataException.suppress(hde, e);
+ }
+ }
+ try {
+ writer.close();
+ } catch (Exception finallyEx) {
+ hde = HyracksDataException.suppress(hde, finallyEx);
+ }
+ return hde;
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index c9cdb2d..5ed8865 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -34,10 +34,14 @@
private static final Logger LOGGER =
Logger.getLogger(HyracksDataException.class.getName());
public static HyracksDataException create(Throwable cause) {
- if (cause instanceof HyracksDataException || cause == null) {
+ if (cause == null) {
+ throw new NullPointerException("null cause");
+ }
+ if (cause instanceof HyracksDataException) {
return (HyracksDataException) cause;
}
- if (cause instanceof InterruptedException &&
!Thread.currentThread().isInterrupted()) {
+ if (LOGGER.isLoggable(Level.WARNING) && cause instanceof
InterruptedException
+ && !Thread.currentThread().isInterrupted()) {
LOGGER.log(Level.WARNING,
"Wrapping an InterruptedException in HyracksDataException
and current thread is not interrupted",
cause);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index d689bc0..2bddb0e 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -34,6 +34,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrameReader;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.IPartitionCollector;
@@ -365,8 +366,7 @@
IFrameReader reader = collector.getReader();
reader.open();
try {
- try {
- writer.open();
+ FrameWriterLifecycleHelper.openAndRun(writer, () -> {
VSizeFrame frame = new VSizeFrame(this);
while (reader.nextFrame(frame)) {
if (aborted) {
@@ -376,16 +376,7 @@
writer.nextFrame(buffer);
buffer.compact();
}
- } catch (Exception e) {
- try {
- writer.fail();
- } catch (HyracksDataException e1) {
- e.addSuppressed(e1);
- }
- throw e;
- } finally {
- writer.close();
- }
+ });
} finally {
reader.close();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 5506a94..92a3f47 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -100,6 +100,7 @@
setDataConsumerThread(thread); // Sets the data consumer
thread to the current thread.
String oldName = thread.getName();
try {
+ HyracksDataException hde = null;
thread.setName(MaterializingPipelinedPartition.class.getName() + pid);
FileReference fRefCopy;
synchronized (MaterializingPipelinedPartition.this) {
@@ -111,7 +112,7 @@
writer.open();
IFileHandle readHandle = fRefCopy == null ? null
: ioManager.open(fRefCopy,
IIOManager.FileReadWriteMode.READ_ONLY,
-
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
try {
if (readHandle == null) {
// Either fail() is called or close() is called
with 0 tuples coming in.
@@ -151,16 +152,26 @@
}
}
} catch (Exception e) {
- writer.fail();
- throw e;
+ hde = HyracksDataException.create(e);
+ try {
+ writer.fail();
+ } catch (Exception e1) {
+ hde.addSuppressed(e1);
+ }
} finally {
try {
writer.close();
+ } catch (Exception e) {
+ hde = HyracksDataException.suppress(hde, e);
} finally {
// Makes sure that the reader is always closed and
the temp file is always deleted.
try {
if (readHandle != null) {
- ioManager.close(readHandle);
+ try {
+ ioManager.close(readHandle);
+ } catch (Exception e) {
+ hde =
HyracksDataException.suppress(hde, e);
+ }
}
} finally {
if (fRef != null) {
@@ -169,8 +180,11 @@
}
}
}
+ if (hde != null) {
+ throw hde;
+ }
} catch (Exception e) {
- LOGGER.log(Level.SEVERE, e.getMessage(), e);
+ LOGGER.log(Level.SEVERE, "writeTo failed", e);
} finally {
thread.setName(oldName);
setDataConsumerThread(null); // Sets back the data
consumer thread to null.
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
index b8bdda7..9c1ba45 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -24,6 +24,8 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.hyracks.api.channels.IInputChannel;
import org.apache.hyracks.api.channels.IInputChannelMonitor;
@@ -39,6 +41,8 @@
import org.apache.hyracks.api.partitions.PartitionId;
public class ReceiveSideMaterializingCollector implements IPartitionCollector {
+ private static final Logger LOGGER =
Logger.getLogger(ReceiveSideMaterializingCollector.class.getName());
+
private final IHyracksTaskContext ctx;
private PartitionManager manager;
@@ -149,7 +153,8 @@
}
}
} catch (InterruptedException e) {
- throw new HyracksDataException(e);
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
}
}
}
@@ -157,7 +162,8 @@
channel.close();
delegate.addPartitions(Collections
.singleton(new PartitionChannel(pid, new
MaterializedPartitionInputChannel(1, pid, manager))));
- } catch (HyracksException e) {
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "PartitionWriter failure;
partitionId=" + pid, e);
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
index d9a4c7c..6eb9d4a 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
@@ -21,6 +21,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -63,14 +64,10 @@
throw new HyracksDataException("Closing SerializingDataWriter that
has not been opened");
}
try {
- tupleAppender.write(frameWriter, true);
- } catch (Exception e) {
- frameWriter.fail();
- throw e;
+ FrameWriterLifecycleHelper.run(frameWriter, () ->
tupleAppender.write(frameWriter, true));
} finally {
- frameWriter.close();
+ open = false;
}
- open = false;
}
@SuppressWarnings("unchecked")
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
index 8ee4fa3..bfdbfb5 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
@@ -163,12 +163,8 @@
if (isOpen[i]) {
try {
writers[i].close();
- } catch (Throwable th) {
- if (hde == null) {
- hde = new HyracksDataException(th);
- } else {
- hde.addSuppressed(th);
- }
+ } catch (Exception ex) {
+ hde = HyracksDataException.suppress(hde,
ex);
}
}
}
@@ -185,12 +181,8 @@
if (isOpen[i]) {
try {
writers[i].fail();
- } catch (Throwable th) {
- if (hde == null) {
- hde = new HyracksDataException(th);
- } else {
- hde.addSuppressed(th);
- }
+ } catch (Exception ex) {
+ hde = HyracksDataException.suppress(hde, ex);
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
index 253b3e3..d1e9ddb 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
@@ -102,12 +102,8 @@
if (isWriterOpen[i]) {
try {
pWriters[i].fail();
- } catch (Throwable th) {
- if (failException == null) {
- failException = new HyracksDataException(th);
- } else {
- failException.addSuppressed(th);
- }
+ } catch (Exception ex) {
+ failException =
HyracksDataException.suppress(failException, ex);
}
}
}
@@ -128,21 +124,13 @@
if (isWriterOpen[i]) {
try {
appenders[i].write(pWriters[i], true);
- } catch (Throwable th) {
- if (closeException == null) {
- closeException = new HyracksDataException(th);
- } else {
- closeException.addSuppressed(th);
- }
+ } catch (Exception ex) {
+ closeException =
HyracksDataException.suppress(closeException, ex);
} finally {
try {
pWriters[i].close();
- } catch (Throwable th) {
- if (closeException == null) {
- closeException = new HyracksDataException(th);
- } else {
- closeException.addSuppressed(th);
- }
+ } catch (Exception ex) {
+ closeException =
HyracksDataException.suppress(closeException, ex);
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
index d748c8e..ad7b9c9 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
@@ -72,12 +72,8 @@
if (isOpen[i]) {
try {
epWriters[i].fail();
- } catch (Throwable th) {
- if (failException == null) {
- failException = new HyracksDataException(th);
- } else {
- failException.addSuppressed(th);
- }
+ } catch (Exception ex) {
+ failException =
HyracksDataException.suppress(failException, ex);
}
}
}
@@ -93,12 +89,8 @@
if (isOpen[i]) {
try {
epWriters[i].close();
- } catch (Throwable th) {
- if (closeException == null) {
- closeException = new HyracksDataException(th);
- } else {
- closeException.addSuppressed(th);
- }
+ } catch (Exception ex) {
+ closeException =
HyracksDataException.suppress(closeException, ex);
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 189ce9d..ee88254 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -83,7 +83,7 @@
}
} catch (Exception e) {
newFailure = true;
- closeException = wrapException(closeException, e);
+ closeException =
HyracksDataException.suppress(closeException, e);
break;
}
}
@@ -91,7 +91,7 @@
try {
fail(); // Fail all writers if any new failure happens.
} catch (Exception e) {
- closeException = wrapException(closeException, e);
+ closeException =
HyracksDataException.suppress(closeException, e);
}
}
}
@@ -101,7 +101,7 @@
try {
pWriters[i].close();
} catch (Exception e) {
- closeException = wrapException(closeException, e);
+ closeException =
HyracksDataException.suppress(closeException, e);
}
}
}
@@ -145,7 +145,7 @@
try {
pWriters[i].fail();
} catch (Exception e) {
- failException = wrapException(failException, e);
+ failException =
HyracksDataException.suppress(failException, e);
}
}
}
@@ -163,12 +163,4 @@
}
}
- // Wraps the current encountered exception into the final exception.
- private HyracksDataException wrapException(HyracksDataException
finalException, Exception currentException) {
- if (finalException == null) {
- return HyracksDataException.create(currentException);
- }
- finalException.addSuppressed(currentException);
- return finalException;
- }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
index 76e6c79..4326c29 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
@@ -20,9 +20,8 @@
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -59,22 +58,7 @@
@Override
public void initialize() throws HyracksDataException {
File f = split.getFile(ioManager);
- try {
- writer.open();
- InputStream in;
- try {
- in = new FileInputStream(f);
- } catch (FileNotFoundException e) {
- writer.fail();
- throw new HyracksDataException(e);
- }
- tp.parse(in, writer);
- } catch (Throwable th) {
- writer.fail();
- throw new HyracksDataException(th);
- } finally {
- writer.close();
- }
+ FrameWriterLifecycleHelper.openAndRun(writer, () ->
tp.parse(new FileInputStream(f), writer));
}
};
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
index 4d368bd..4af7861 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
@@ -22,6 +22,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -91,21 +92,22 @@
ISpillableTable table = aggState.getSpillableTable();
RunFileWriter[] partitionRuns = aggState.getRuns();
int[] numberOfTuples = aggState.getSpilledNumTuples();
- try {
- writer.open();
- doPass(table, partitionRuns, numberOfTuples, writer, 1); // level
0 use used at build stage.
- } catch (Exception e) {
+ FrameWriterLifecycleHelper.run(writer, () -> {
try {
- for (RunFileWriter run : generatedRuns) {
- run.erase();
+ writer.open();
+ doPass(table, partitionRuns, numberOfTuples, writer, 1);
+ } catch (Exception e1) {
+ HyracksDataException hde = HyracksDataException.create(e1);
+ try {
+ for (RunFileWriter run : generatedRuns) {
+ run.erase();
+ }
+ } catch (Exception catchEx) {
+ hde.addSuppressed(catchEx);
}
- } finally {
- writer.fail();
+ throw hde;
}
- throw e;
- } finally {
- writer.close();
- }
+ });
}
private void doPass(ISpillableTable table, RunFileWriter[] runs, int[]
numOfTuples, IFrameWriter writer, int level)
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
index d29e9ab..15d2409 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalHashGroupBy.java
@@ -69,8 +69,12 @@
spilledNumTuples[partition] += table.flushFrames(partition,
writer, AggregateType.PARTIAL);
table.clear(partition);
} catch (Exception ex) {
- writer.fail();
- throw new HyracksDataException(ex);
+ try {
+ writer.fail();
+ } catch (Exception failException) {
+ ex.addSuppressed(failException);
+ }
+ throw HyracksDataException.create(ex);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 99dbfad..92abd67 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -190,26 +191,34 @@
@Override
public void close() throws HyracksDataException {
if (failed) {
+ HyracksDataException hde = null;
try {
state.joiner.closeCache();
+ } catch (Exception e) {
+ hde = HyracksDataException.create(e);
+ throw e;
} finally {
- writer.close();
+ try {
+ writer.close();
+ } catch (Exception e) {
+ hde = HyracksDataException.suppress(hde, e);
+ throw hde; // NOSONAR (original exception
retained by suppress)
+ }
}
return;
}
- try {
+ FrameWriterLifecycleHelper.run(writer, () -> {
try {
- state.joiner.completeJoin(writer);
- } finally {
- state.joiner.releaseMemory();
+ try {
+ state.joiner.completeJoin(writer);
+ } finally {
+ state.joiner.releaseMemory();
+ }
+ } catch (Exception e) {
+ state.joiner.closeCache();
+ throw e;
}
- } catch (Exception e) {
- state.joiner.closeCache();
- writer.fail();
- throw e;
- } finally {
- writer.close();
- }
+ });
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 5df134f..761901a 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.dataflow.std.misc;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -44,16 +45,9 @@
@Override
public void initialize() throws HyracksDataException {
FrameTupleAppender appender = new FrameTupleAppender(new
VSizeFrame(ctx));
- if (fieldSlots != null && tupleData != null && tupleSize > 0)
+ if (fieldSlots != null && tupleData != null && tupleSize > 0) {
appender.append(fieldSlots, tupleData, 0, tupleSize);
- writer.open();
- try {
- appender.write(writer, false);
- } catch (Throwable th) {
- writer.fail();
- throw new HyracksDataException(th);
- } finally {
- writer.close();
}
+ FrameWriterLifecycleHelper.openAndRun(writer, () ->
appender.write(writer, false));
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index 918155d..464bf8f 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -24,6 +24,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -75,30 +76,24 @@
public void writeOut(IFrameWriter writer, IFrame frame, boolean failed)
throws HyracksDataException {
RunFileReader in = out.createReader();
- writer.open();
try {
- if (failed) {
- writer.fail();
- return;
- }
- in.open();
- try {
- while (in.nextFrame(frame)) {
- writer.nextFrame(frame.getBuffer());
+ FrameWriterLifecycleHelper.openAndRun(writer, () -> {
+ if (failed) {
+ writer.fail();
+ return;
}
- } finally {
- in.close();
- }
- } catch (Exception e) {
- writer.fail();
- throw e;
+ in.open();
+ try {
+ while (in.nextFrame(frame)) {
+ writer.nextFrame(frame.getBuffer());
+ }
+ } finally {
+ in.close();
+ }
+ });
} finally {
- try {
- writer.close();
- } finally {
- if (numConsumers.decrementAndGet() == 0) {
- out.getFileReference().delete();
- }
+ if (numConsumers.decrementAndGet() == 0) {
+ out.getFileReference().delete();
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 78417ac..961442b 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -22,6 +22,7 @@
import java.io.PrintStream;
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
@@ -118,27 +119,20 @@
@Override
public void close() throws HyracksDataException {
if (datasetPartitionWriter != null) {
- try {
+ FrameWriterLifecycleHelper.run(datasetPartitionWriter, ()
-> {
if (!failed && frameOutputStream.getTupleCount() > 0) {
frameOutputStream.flush(datasetPartitionWriter);
}
- } catch (Exception e) {
- datasetPartitionWriter.fail();
- throw e;
- } finally {
- datasetPartitionWriter.close();
- }
+ });
}
}
@Override
public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("{ ");
- sb.append("\"rsId\": \"").append(rsId).append("\", ");
- sb.append("\"ordered\": ").append(ordered).append(", ");
- sb.append("\"asyncMode\": ").append(asyncMode).append(" }");
- return sb.toString();
+ return "{ " +
+ "\"rsId\": \"" + rsId + "\", " +
+ "\"ordered\": " + ordered + ", " +
+ "\"asyncMode\": " + asyncMode + " }";
}
};
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
index c8f9268..9cb4792 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
@@ -26,6 +26,7 @@
import java.util.logging.Logger;
import org.apache.hyracks.api.comm.FrameConstants;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -132,15 +133,8 @@
RunFileWriter mergeFileWriter =
prepareIntermediateMergeRunFile();
IFrameWriter mergeResultWriter =
prepareIntermediateMergeResultWriter(mergeFileWriter);
- try {
- mergeResultWriter.open();
- merge(mergeResultWriter, partialRuns);
- } catch (Throwable t) {
- mergeResultWriter.fail();
- throw t;
- } finally {
- mergeResultWriter.close();
- }
+
FrameWriterLifecycleHelper.openAndRun(mergeResultWriter,
+ () -> merge(mergeResultWriter,
partialRuns));
reader = mergeFileWriter.createReader();
}
runs.add(reader);
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
index 3c11669..88cdde3 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
@@ -22,6 +22,7 @@
import java.util.LinkedList;
import java.util.List;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
@@ -60,15 +61,7 @@
getSorter().sort();
RunFileWriter runWriter = getRunFileWriter();
IFrameWriter flushWriter = getFlushableFrameWriter(runWriter);
- flushWriter.open();
- try {
- getSorter().flush(flushWriter);
- } catch (Exception e) {
- flushWriter.fail();
- throw e;
- } finally {
- flushWriter.close();
- }
+ FrameWriterLifecycleHelper.openAndRun(flushWriter, () ->
getSorter().flush(flushWriter));
generatedRunFileReaders.add(runWriter.createDeleteOnCloseReader());
getSorter().reset();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 996101b..8f8ed56 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -164,17 +165,10 @@
IOperatorNodePushable op = new
AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
- try {
- writer.open();
- SortTaskState state = (SortTaskState)
ctx.getStateObject(
- new TaskId(new ActivityId(getOperatorId(),
SORT_ACTIVITY_ID), partition));
- state.frameSorter.flush(writer);
- } catch (Throwable th) {
- writer.fail();
- throw new HyracksDataException(th);
- } finally {
- writer.close();
- }
+ FrameWriterLifecycleHelper.openAndRun(writer,
+ () -> ((SortTaskState) ctx.getStateObject(new
TaskId(
+ new ActivityId(getOperatorId(),
SORT_ACTIVITY_ID), partition))).frameSorter
+ .flush(writer));
}
};
return op;
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
index e4d9960..a4e7482 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
@@ -23,6 +23,7 @@
import java.util.HashSet;
import java.util.Random;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -79,7 +80,7 @@
@Override
public void initialize() throws HyracksDataException {
- try {
+ FrameWriterLifecycleHelper.openAndRun(writer, () -> {
writer.open();
for (int i = 0; i < numRecords; i++) {
tb.reset();
@@ -96,12 +97,7 @@
}
}
appender.write(writer, true);
- } catch (Throwable th) {
- writer.fail();
- throw new HyracksDataException(th);
- } finally {
- writer.close();
- }
+ });
}
private void genField(ArrayTupleBuilder tb, int fieldIndex) throws
HyracksDataException {
@@ -158,7 +154,7 @@
String s =
Long.toHexString(Double.doubleToLongBits(random.nextDouble()));
StringBuilder strBuilder = new StringBuilder();
for (int i = 0; i < s.length() && i < length; i++) {
- strBuilder.append(s.charAt(Math.abs(random.nextInt()) %
s.length()));
+ strBuilder.append(s.charAt(random.nextInt(s.length())));
}
return strBuilder.toString();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
index 7eba9e7..79c1d30 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
@@ -22,6 +22,7 @@
import java.io.File;
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
@@ -283,18 +284,13 @@
@Override
public void initialize() throws HyracksDataException {
- try {
- writer.open();
+ FrameWriterLifecycleHelper.openAndRun(writer, () -> {
while (true) {
synchronized (this) {
wait();
}
}
- } catch (Exception e) {
- writer.fail();
- } finally {
- writer.close();
- }
+ });
}
};
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
index ef9e4b6..70653d2 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
@@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -98,15 +99,7 @@
@Override
public void initialize() throws HyracksDataException {
- try {
- writer.open();
- writer.nextFrame(frame);
- } catch (Exception e) {
- writer.fail();
- throw e;
- } finally {
- writer.close();
- }
+ FrameWriterLifecycleHelper.openAndRun(writer, () ->
writer.nextFrame(frame));
}
};
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/MultiThreadTaskEmulator.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/MultiThreadTaskEmulator.java
index 022d3a2..6e2b183 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/MultiThreadTaskEmulator.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/MultiThreadTaskEmulator.java
@@ -26,6 +26,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -69,17 +70,11 @@
private void executeOneWriter(IFrameWriter writer, List<IFrame>
inputFrame, List<Exception> exceptions) {
try {
- try {
- writer.open();
+ FrameWriterLifecycleHelper.openAndRun(writer, () -> {
for (IFrame frame : inputFrame) {
writer.nextFrame(frame.getBuffer());
}
- } catch (Exception ex) {
- writer.fail();
- throw ex;
- } finally {
- writer.close();
- }
+ });
} catch (Exception e) {
exceptions.add(e);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index 8357ae0..7b7fcec 100644
---
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -101,50 +102,48 @@
public void initialize() throws HyracksDataException {
ClassLoader ctxCL =
Thread.currentThread().getContextClassLoader();
try {
- writer.open();
-
Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
- JobConf conf = confFactory.getConf();
-
conf.setClassLoader(ctx.getJobletContext().getClassLoader());
- IKeyValueParser parser =
tupleParserFactory.createKeyValueParser(ctx);
- try {
- parser.open(writer);
- InputFormat inputFormat = conf.getInputFormat();
- for (int i = 0; i < inputSplits.length; i++) {
- /**
- * read all the partitions scheduled to the
current node
- */
- if (scheduledLocations[i].equals(nodeName)) {
- /**
- * pick an unread split to read
- * synchronize among simultaneous partitions
in the same machine
+ FrameWriterLifecycleHelper.openAndRun(writer, () -> {
+
Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
+ JobConf conf = confFactory.getConf();
+
conf.setClassLoader(ctx.getJobletContext().getClassLoader());
+ IKeyValueParser parser =
tupleParserFactory.createKeyValueParser(ctx);
+ try {
+ parser.open(writer);
+ InputFormat inputFormat = conf.getInputFormat();
+ for (int i = 0; i < inputSplits.length; i++) {
+ /*
+ * read all the partitions scheduled to the
current node
*/
- synchronized (executed) {
- if (executed[i] == false) {
- executed[i] = true;
- } else {
- continue;
+ if (scheduledLocations[i].equals(nodeName)) {
+ /*
+ * pick an unread split to read
+ * synchronize among simultaneous
partitions in the same machine
+ */
+ synchronized (executed) {
+ if (!executed[i]) {
+ executed[i] = true;
+ } else {
+ continue;
+ }
+ }
+
+ /*
+ * read the split
+ */
+ RecordReader reader =
inputFormat.getRecordReader(inputSplits[i], conf,
+ Reporter.NULL);
+ Object key = reader.createKey();
+ Object value = reader.createValue();
+ while (reader.next(key, value)) {
+ parser.parse(key, value, writer,
inputSplits[i].toString());
}
}
-
- /**
- * read the split
- */
- RecordReader reader =
inputFormat.getRecordReader(inputSplits[i], conf, Reporter.NULL);
- Object key = reader.createKey();
- Object value = reader.createValue();
- while (reader.next(key, value) == true) {
- parser.parse(key, value, writer,
inputSplits[i].toString());
- }
}
+ } finally {
+ parser.close(writer);
}
- } finally {
- parser.close(writer);
- }
- } catch (Throwable th) {
- writer.fail();
- throw new HyracksDataException(th);
+ });
} finally {
- writer.close();
Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 15bf260..5b169eb1 100644
---
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -115,50 +116,47 @@
public void initialize() throws HyracksDataException {
ClassLoader ctxCL =
Thread.currentThread().getContextClassLoader();
try {
- writer.open();
-
Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
- Job job = confFactory.getConf();
-
job.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
- IKeyValueParser parser =
tupleParserFactory.createKeyValueParser(ctx);
- InputFormat inputFormat =
ReflectionUtils.newInstance(job.getInputFormatClass(),
- job.getConfiguration());
- int size = inputSplits.size();
- for (int i = 0; i < size; i++) {
- /**
- * read all the partitions scheduled to the current
node
- */
- if (scheduledLocations[i].equals(nodeName)) {
- /**
- * pick an unread split to read synchronize among
- * simultaneous partitions in the same machine
+ FrameWriterLifecycleHelper.openAndRun(writer, () -> {
+
Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
+ Job job = confFactory.getConf();
+
job.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
+ IKeyValueParser parser =
tupleParserFactory.createKeyValueParser(ctx);
+ InputFormat inputFormat =
ReflectionUtils.newInstance(job.getInputFormatClass(),
+ job.getConfiguration());
+ int size = inputSplits.size();
+ for (int i = 0; i < size; i++) {
+ /*
+ * read all the partitions scheduled to the
current node
*/
- synchronized (executed) {
- if (executed[i] == false) {
- executed[i] = true;
- } else {
- continue;
+ if (scheduledLocations[i].equals(nodeName)) {
+ /*
+ * pick an unread split to read synchronize
among
+ * simultaneous partitions in the same machine
+ */
+ synchronized (executed) {
+ if (!executed[i]) {
+ executed[i] = true;
+ } else {
+ continue;
+ }
+ }
+
+ /*
+ * read the split
+ */
+ TaskAttemptContext context =
ctxFactory.createContext(job.getConfiguration(), i);
+
context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
+ RecordReader reader =
inputFormat.createRecordReader(inputSplits.get(i), context);
+ reader.initialize(inputSplits.get(i), context);
+ while (reader.nextKeyValue()) {
+ parser.parse(reader.getCurrentKey(),
reader.getCurrentValue(), writer,
+ inputSplits.get(i).toString());
}
}
-
- /**
- * read the split
- */
- TaskAttemptContext context =
ctxFactory.createContext(job.getConfiguration(), i);
-
context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
- RecordReader reader =
inputFormat.createRecordReader(inputSplits.get(i), context);
- reader.initialize(inputSplits.get(i), context);
- while (reader.nextKeyValue() == true) {
- parser.parse(reader.getCurrentKey(),
reader.getCurrentValue(), writer,
- inputSplits.get(i).toString());
- }
}
- }
- parser.close(writer);
- } catch (Throwable th) {
- writer.fail();
- throw new HyracksDataException(th);
+ parser.close(writer);
+ });
} finally {
- writer.close();
Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index b358f07..cfeb6f8 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -149,7 +149,7 @@
frameTuple = new FrameTupleReference();
}
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -213,26 +213,30 @@
if (appender.getTupleCount() > 0) {
appender.write(writer, true);
}
- } catch (Throwable th) {
- writer.fail();
- closeException = new HyracksDataException(th);
+ } catch (Exception e) {
+ closeException = HyracksDataException.create(e);
+ try {
+ writer.fail();
+ } catch (HyracksDataException e1) {
+ closeException.addSuppressed(e1);
+ }
}
}
try {
cursor.close();
- } catch (Throwable th) {
+ } catch (Exception e) {
if (closeException == null) {
- closeException = new HyracksDataException(th);
+ closeException = HyracksDataException.create(e);
} else {
- closeException.addSuppressed(th);
+ closeException.addSuppressed(e);
}
}
try {
indexHelper.close();
- } catch (Throwable th) {
+ } catch (Exception th) {
if (closeException == null) {
- closeException = new HyracksDataException(th);
+ closeException = HyracksDataException.create(th);
} else {
closeException.addSuppressed(th);
}
@@ -241,11 +245,11 @@
try {
// will definitely be called regardless of exceptions
writer.close();
- } catch (Throwable th) {
+ } catch (Exception e) {
if (closeException == null) {
- closeException = new HyracksDataException(th);
+ closeException = HyracksDataException.create(e);
} else {
- closeException.addSuppressed(th);
+ closeException.addSuppressed(e);
}
}
if (closeException != null) {
@@ -260,13 +264,9 @@
}
private void writeTupleToOutput(ITupleReference tuple) throws IOException {
- try {
- for (int i = 0; i < tuple.getFieldCount(); i++) {
- dos.write(tuple.getFieldData(i), tuple.getFieldStart(i),
tuple.getFieldLength(i));
- tb.addFieldEndOffset();
- }
- } catch (Exception e) {
- throw e;
+ for (int i = 0; i < tuple.getFieldCount(); i++) {
+ dos.write(tuple.getFieldData(i), tuple.getFieldStart(i),
tuple.getFieldLength(i));
+ tb.addFieldEndOffset();
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index bc7cb85..816d2fb 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -20,6 +20,7 @@
import java.io.DataOutput;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -64,8 +65,7 @@
searchCallbackFactory.createSearchOperationCallback(resource.getId(), ctx,
null);
ITreeIndexAccessor indexAccessor =
(ITreeIndexAccessor)
treeIndex.createAccessor(NoOpOperationCallback.INSTANCE, searchCallback);
- try {
- writer.open();
+ FrameWriterLifecycleHelper.openAndRun(writer, () -> {
indexAccessor.diskOrderScan(cursor);
int fieldCount = treeIndex.getFieldCount();
FrameTupleAppender appender = new FrameTupleAppender(new
VSizeFrame(ctx));
@@ -88,21 +88,15 @@
}
appender.write(writer, true);
- } catch (Throwable th) {
- writer.fail();
- throw new HyracksDataException(th);
- } finally {
- try {
- cursor.close();
- } catch (Exception cursorCloseException) {
- throw new IllegalStateException(cursorCloseException);
- } finally {
- writer.close();
- }
+ }, null, cursor::close);
+ } catch (Exception ex) {
+ HyracksDataException hde = HyracksDataException.create(ex);
+ try {
+ treeIndexHelper.close();
+ } catch (Exception e) {
+ hde.addSuppressed(e);
}
- } catch (Throwable th) {
- treeIndexHelper.close();
- throw new HyracksDataException(th);
+ throw hde;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
index c00cecb..8135e9e 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
@@ -20,6 +20,7 @@
import java.io.DataOutput;
+import org.apache.hyracks.api.comm.FrameWriterLifecycleHelper;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -66,8 +67,7 @@
public void initialize() throws HyracksDataException {
treeIndexHelper.open();
ITreeIndex treeIndex = (ITreeIndex) treeIndexHelper.getIndexInstance();
- try {
- writer.open();
+ FrameWriterLifecycleHelper.openAndRun(writer, () -> {
IBufferCache bufferCache =
storageManager.getBufferCache(ctx.getJobletContext().getServiceContext());
IFileMapProvider fileMapProvider =
storageManager.getFileMapProvider(ctx.getJobletContext().getServiceContext());
@@ -92,15 +92,6 @@
+ appender.getBuffer().capacity() + ")");
}
appender.write(writer, false);
- } catch (Exception e) {
- writer.fail();
- throw new HyracksDataException(e);
- } finally {
- try {
- writer.close();
- } finally {
- treeIndexHelper.close();
- }
- }
+ }, null, treeIndexHelper::close);
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1837
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib09844f03701168727ad6cf4be0e3197d8708451
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>