[ASTERIXDB-2204][STO] Fix implementations and usages of IIndexCursor - user model changes: no - storage format changes: no - interface changes: yes - IIndexCursor.close() is now idempotent and can be called on a closed cursor. - IIndexCursor.destroy() is now idempotent and can be called on a destroyed cursor. - Add IIndexAccessor.destroy() letting the accessor know it is safe to destroy its reusable cursors and operation contexts. - Add IIndexOperationContext.destroy() letting the context know that the user is done with it and allow it to release resources
details: - Previously, implementations of the IIndexCursor interface didn't enforce the interface contract. This change enforces the contract for all the implementations. - With the enforcement of the contract, all the users of the cursors are expected to follow and enforce the expected lifecycle. - Test cases were added. Change-Id: I98a7a8b931eb24dbe11bf2bdc61b754ca28ebdf9 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2324 Reviewed-by: Michael Blow <mb...@apache.org> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/4ff6a36d Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/4ff6a36d Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/4ff6a36d Branch: refs/heads/master Commit: 4ff6a36d108182e5f36ceac40a27dd89cc29cd23 Parents: 2f392e8 Author: Abdullah Alamoudi <bamou...@gmail.com> Authored: Mon Feb 12 09:24:15 2018 -0800 Committer: abdullah alamoudi <bamou...@gmail.com> Committed: Mon Feb 12 15:27:53 2018 -0800 ---------------------------------------------------------------------- .../apache/asterix/app/nc/RecoveryManager.java | 46 +-- .../message/MetadataNodeRequestMessage.java | 7 +- .../asterix/messaging/CCMessageBroker.java | 2 +- .../asterix/test/storage/DiskIsFullTest.java | 60 ++-- .../dataflow/FeedRecordDataFlowController.java | 35 +- .../indexing/ExternalFileIndexAccessor.java | 13 +- .../input/stream/SocketServerInputStream.java | 37 +-- .../asterix/external/util/DataflowUtils.java | 37 ++- .../apache/asterix/metadata/MetadataNode.java | 192 ++++++----- .../LSMPrimaryUpsertOperatorNodePushable.java | 26 +- .../management/service/logging/LogManager.java | 67 ++-- .../hyracks/api/dataflow/IDestroyable.java | 35 ++ .../api/exceptions/HyracksDataException.java | 32 +- .../apache/hyracks/api/util/DestroyUtils.java | 53 +++ .../apache/hyracks/api/util/ExceptionUtils.java | 108 +++++++ .../apache/hyracks/control/cc/job/JobRun.java | 2 +- .../control/common/utils/ExceptionUtils.java | 86 ----- .../org/apache/hyracks/control/nc/Task.java | 2 +- .../hyracks/control/nc/work/StartTasksWork.java | 2 +- .../std/collectors/InputChannelFrameReader.java | 1 - .../tests/unit/EnforcedIndexCursorTest.java | 49 --- .../hyracks/tests/unit/IIndexCursorTest.java | 263 --------------- .../BTreeUpdateSearchOperatorNodePushable.java | 4 +- .../hyracks/storage/am/btree/impls/BTree.java | 10 + .../btree/impls/BTreeCountingSearchCursor.java | 25 +- .../storage/am/btree/impls/BTreeOpContext.java | 14 + .../am/btree/impls/BTreeRangeSearchCursor.java | 37 ++- .../storage/am/btree/impls/DiskBTree.java | 5 +- .../btree/impls/DiskBTreePointSearchCursor.java | 6 +- .../btree/impls/DiskBTreeRangeSearchCursor.java | 6 +- .../storage/am/btree/test/FramewriterTest.java | 2 + .../hyracks/hyracks-storage-am-common/pom.xml | 30 +- .../am/common/api/IIndexOperationContext.java | 15 +- .../am/common/api/ITreeIndexAccessor.java | 2 + .../dataflow/IndexDropOperatorNodePushable.java | 3 +- .../IndexSearchOperatorNodePushable.java | 61 ++-- ...eIndexDiskOrderScanOperatorNodePushable.java | 79 +++-- .../impls/TreeIndexDiskOrderScanCursor.java | 34 +- .../am/common/util/ResourceReleaseUtils.java | 158 +++++++++ .../common/test/DummyEnforcedIndexCursor.java | 59 ++++ .../am/common/test/EnforcedIndexCursorTest.java | 61 ++++ .../am/common/test/IIndexCursorTest.java | 294 +++++++++++++++++ .../hyracks-storage-am-lsm-btree/pom.xml | 10 +- .../lsm/btree/impls/ExternalBTreeOpContext.java | 6 + .../lsm/btree/impls/ExternalBTreeWithBuddy.java | 6 +- .../impls/ExternalBTreeWithBuddyOpContext.java | 6 + .../storage/am/lsm/btree/impls/LSMBTree.java | 175 ++++++---- .../btree/impls/LSMBTreeCursorInitialState.java | 12 +- .../impls/LSMBTreeDiskComponentScanCursor.java | 33 +- .../am/lsm/btree/impls/LSMBTreeOpContext.java | 17 + .../btree/impls/LSMBTreePointSearchCursor.java | 45 ++- .../btree/impls/LSMBTreeRangeSearchCursor.java | 21 +- .../lsm/btree/impls/LSMBTreeSearchCursor.java | 41 +-- .../impls/LSMBTreeWithBuddyAbstractCursor.java | 10 +- .../impls/LSMBTreeWithBuddySearchCursor.java | 22 +- .../impls/LSMBTreeWithBuddySortedCursor.java | 14 +- .../btree/impls/LSMBuddyBTreeMergeCursor.java | 2 +- .../am/lsm/common/api/ILSMIndexAccessor.java | 3 + .../impls/ComponentReplacementContext.java | 5 + .../lsm/common/impls/LSMIndexSearchCursor.java | 15 +- .../lsm/common/impls/LSMTreeIndexAccessor.java | 10 + .../am/lsm/common/impls/VirtualBufferCache.java | 5 + .../invertedindex/api/IInvertedListCursor.java | 2 +- .../invertedindex/impls/LSMInvertedIndex.java | 174 +++++----- .../impls/LSMInvertedIndexAccessor.java | 10 + ...nvertedIndexDeletedKeysBTreeMergeCursor.java | 2 +- .../impls/LSMInvertedIndexOpContext.java | 15 + .../LSMInvertedIndexRangeSearchCursor.java | 11 +- .../impls/LSMInvertedIndexSearchCursor.java | 25 +- .../inmemory/InMemoryInvertedIndexAccessor.java | 17 + .../InMemoryInvertedIndexOpContext.java | 13 + .../inmemory/InMemoryInvertedListCursor.java | 23 +- .../FixedSizeElementInvertedListCursor.java | 26 +- .../ondisk/OnDiskInvertedIndex.java | 60 +++- .../ondisk/OnDiskInvertedIndexOpContext.java | 14 + .../OnDiskInvertedIndexRangeSearchCursor.java | 17 +- .../ondisk/OnDiskInvertedIndexSearchCursor.java | 16 +- .../ondisk/PartitionedOnDiskInvertedIndex.java | 3 +- .../search/PartitionedTOccurrenceSearcher.java | 8 +- .../hyracks-storage-am-lsm-rtree/pom.xml | 14 +- .../lsm/rtree/impls/ExternalRTreeOpContext.java | 6 + .../storage/am/lsm/rtree/impls/LSMRTree.java | 321 ++++++++++++------- .../lsm/rtree/impls/LSMRTreeAbstractCursor.java | 10 +- .../LSMRTreeDeletedKeysBTreeMergeCursor.java | 2 +- .../am/lsm/rtree/impls/LSMRTreeOpContext.java | 18 ++ .../lsm/rtree/impls/LSMRTreeSearchCursor.java | 22 +- .../lsm/rtree/impls/LSMRTreeSortedCursor.java | 25 +- .../impls/LSMRTreeWithAntiMatterTuples.java | 186 +++++++---- ...LSMRTreeWithAntiMatterTuplesFlushCursor.java | 40 ++- ...SMRTreeWithAntiMatterTuplesSearchCursor.java | 34 +- .../am/lsm/rtree/impls/TreeTupleSorter.java | 37 ++- .../hyracks/storage/am/rtree/impls/RTree.java | 10 + .../storage/am/rtree/impls/RTreeOpContext.java | 12 + .../am/rtree/impls/RTreeSearchCursor.java | 17 +- .../storage/common/EnforcedIndexCursor.java | 98 ++++-- .../apache/hyracks/storage/common/IIndex.java | 2 +- .../hyracks/storage/common/IIndexAccessor.java | 24 +- .../storage/common/IIndexBulkLoader.java | 4 +- .../hyracks/storage/common/IIndexCursor.java | 16 +- .../storage/common/buffercache/VirtualPage.java | 5 +- .../AbstractSearchOperationCallbackTest.java | 37 ++- .../am/btree/OrderedIndexExamplesTest.java | 70 ++-- .../storage/am/btree/OrderedIndexTestUtils.java | 160 ++++----- .../am/common/AbstractIndexTestWorker.java | 8 +- .../storage/am/common/TreeIndexTestUtils.java | 75 +++-- .../am/rtree/AbstractRTreeExamplesTest.java | 56 ++-- .../storage/am/rtree/RTreeTestUtils.java | 62 ++-- .../hyracks-storage-am-btree-test/pom.xml | 7 + .../am/btree/BTreeCountingCursorTest.java | 106 ++++++ .../am/btree/BTreeRangeSearchCursorTest.java | 100 ++++++ .../storage/am/btree/BTreeSearchCursorTest.java | 109 +++---- .../storage/am/btree/BTreeUpdateSearchTest.java | 122 +++---- .../am/btree/DiskBTreeDiskScanCursorTest.java | 115 +++++++ .../btree/DiskBTreePointSearchCursorTest.java | 106 ++++++ .../btree/DiskBTreeRangeSearchCursorTest.java | 101 ++++++ .../am/btree/DiskBTreeSearchCursorTest.java | 113 +++---- .../am/btree/DiskOrderScanCursorTest.java | 113 +++++++ .../am/btree/multithread/BTreeTestWorker.java | 150 +++++---- .../hyracks-storage-am-lsm-btree-test/pom.xml | 7 + .../am/lsm/btree/LSMBTreeExamplesTest.java | 11 +- .../am/lsm/btree/LSMBTreeMergeFailTest.java | 2 +- .../LSMBTreeSearchOperationCallbackTest.java | 31 +- ...TreeUpdateInPlaceScanDiskComponentsTest.java | 15 +- .../cursor/LSMBTreePointSearchCursorTest.java | 163 ++++++++++ .../cursor/LSMBTreeRangeSearchCursorTest.java | 116 +++++++ .../btree/cursor/LSMBTreeSearchCursorTest.java | 104 ++++++ .../btree/impl/TestLsmBtreeSearchCursor.java | 4 +- .../util/LSMInvertedIndexTestUtils.java | 170 +++++----- .../hyracks-storage-am-rtree-test/pom.xml | 7 + .../rtree/RTreeSearchCursorLifecycleTest.java | 88 +++++ .../storage/am/rtree/RTreeSearchCursorTest.java | 149 ++++----- .../am/rtree/multithread/RTreeTestWorker.java | 64 ++-- hyracks-fullstack/pom.xml | 50 ++- 133 files changed, 4282 insertions(+), 2166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index de3c691..273d832 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -680,33 +680,41 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId()); ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE); - if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.INSERT_BYTE) { - indexAccessor.forceDelete(logRecord.getNewValue()); - } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.DELETE_BYTE) { - indexAccessor.forceInsert(logRecord.getOldValue()); - } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.UPSERT_BYTE) { - // undo, upsert the old value if found, otherwise, physical delete - if (logRecord.getOldValue() == null) { - try { - indexAccessor.forcePhysicalDelete(logRecord.getNewValue()); - } catch (HyracksDataException hde) { - // Since we're undoing according the write-ahead log, the actual upserting tuple - // might not have been written to memory yet. - if (hde.getErrorCode() != ErrorCode.UPDATE_OR_DELETE_NON_EXISTENT_KEY) { - throw hde; - } - } + try { + if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.INSERT_BYTE) { + indexAccessor.forceDelete(logRecord.getNewValue()); + } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.DELETE_BYTE) { + indexAccessor.forceInsert(logRecord.getOldValue()); + } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.UPSERT_BYTE) { + // undo, upsert the old value if found, otherwise, physical delete + undoUpsert(indexAccessor, logRecord); } else { - indexAccessor.forceUpsert(logRecord.getOldValue()); + throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp()); } - } else { - throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp()); + } finally { + indexAccessor.destroy(); } } catch (Exception e) { throw new IllegalStateException("Failed to undo", e); } } + private static void undoUpsert(ILSMIndexAccessor indexAccessor, ILogRecord logRecord) throws HyracksDataException { + if (logRecord.getOldValue() == null) { + try { + indexAccessor.forcePhysicalDelete(logRecord.getNewValue()); + } catch (HyracksDataException hde) { + // Since we're undoing according the write-ahead log, the actual upserting tuple + // might not have been written to memory yet. + if (hde.getErrorCode() != ErrorCode.UPDATE_OR_DELETE_NON_EXISTENT_KEY) { + throw hde; + } + } + } else { + indexAccessor.forceUpsert(logRecord.getOldValue()); + } + } + private static void redo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) { try { int datasetId = logRecord.getDatasetId(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java index 817bbe6..fb89c9c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java @@ -24,6 +24,7 @@ import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -44,7 +45,7 @@ public class MetadataNodeRequestMessage extends CcIdentifiedMessage @Override public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException { INCMessageBroker broker = (INCMessageBroker) appContext.getServiceContext().getMessageBroker(); - HyracksDataException hde = null; + Throwable hde = null; try { if (export) { appContext.initializeMetadata(false, partitionId); @@ -63,11 +64,11 @@ public class MetadataNodeRequestMessage extends CcIdentifiedMessage broker.sendMessageToCC(getCcId(), reponse); } catch (Exception e) { LOGGER.log(Level.ERROR, "Failed taking over metadata", e); - hde = HyracksDataException.suppress(hde, e); + hde = ExceptionUtils.suppress(hde, e); } } if (hde != null) { - throw hde; + throw HyracksDataException.create(hde); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java index 80e0b33..5f42d7a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java @@ -121,7 +121,7 @@ public class CCMessageBroker implements ICCMessageBroker { MutablePair<ResponseState, Object> right = pair.getRight(); switch (right.getKey()) { case FAILURE: - throw HyracksDataException.create((Exception) right.getValue()); + throw HyracksDataException.create((Throwable) right.getValue()); case SUCCESS: return right.getRight(); default: http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java index 8897169..17509a4 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java @@ -33,6 +33,9 @@ import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.common.transactions.TransactionOptions; import org.apache.asterix.external.util.DataflowUtils; import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.metadata.entities.Dataset; @@ -128,35 +131,46 @@ public class DiskIsFullTest { KEY_INDICATOR_LIST, 0); JobId jobId = nc.newJobId(); IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, false); + ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx), + new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)); // Prepare insert operation LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft(); - insertOp.open(); - TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR, - RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); - VSizeFrame frame = new VSizeFrame(ctx); - FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); - // Insert records until disk becomes full - int tupleCount = 100000; - while (tupleCount > 0) { - ITupleReference tuple = tupleGenerator.next(); - try { - DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp); - } catch (Throwable t) { - final Throwable rootCause = ExceptionUtils.getRootCause(t); - rootCause.printStackTrace(); - if (rootCause instanceof HyracksDataException) { - HyracksDataException cause = (HyracksDataException) rootCause; - Assert.assertEquals(cause.getErrorCode(), expectedException.getErrorCode()); - Assert.assertEquals(cause.getMessage(), expectedException.getMessage()); - return; - } else { - break; + try { + insertOp.open(); + TupleGenerator tupleGenerator = + new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR, RECORD_GEN_FUNCTION, + UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + VSizeFrame frame = new VSizeFrame(ctx); + FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); + // Insert records until disk becomes full + int tupleCount = 100000; + while (tupleCount > 0) { + ITupleReference tuple = tupleGenerator.next(); + try { + DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp); + } catch (Throwable t) { + final Throwable rootCause = ExceptionUtils.getRootCause(t); + rootCause.printStackTrace(); + if (rootCause instanceof HyracksDataException) { + HyracksDataException cause = (HyracksDataException) rootCause; + Assert.assertEquals(cause.getErrorCode(), expectedException.getErrorCode()); + Assert.assertEquals(cause.getMessage(), expectedException.getMessage()); + return; + } else { + break; + } } + tupleCount--; + } + Assert.fail("Expected exception (" + expectedException + ") was not thrown"); + } finally { + try { + insertOp.close(); + } finally { + nc.getTransactionManager().abortTransaction(txnCtx.getTxnId()); } - tupleCount--; } - Assert.fail("Expected exception (" + expectedException + ") was not thrown"); } finally { nc.deInit(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index 3b9391e..25c71df 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -26,12 +26,15 @@ import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.api.IRawRecord; import org.apache.asterix.external.api.IRecordDataParser; import org.apache.asterix.external.api.IRecordReader; +import org.apache.asterix.external.util.DataflowUtils; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.FeedLogManager; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -74,7 +77,7 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl setState(State.STARTED); } } - Exception failure = null; + Throwable failure = null; try { tupleForwarder.initialize(ctx, writer); while (hasNext()) { @@ -111,7 +114,7 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl failure = e; tupleForwarder.fail(); } - } catch (Exception e) { + } catch (Throwable e) { failure = e; tupleForwarder.fail(); LOGGER.log(Level.WARN, "Failure while operating a feed source", e); @@ -174,30 +177,12 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl } } - private Exception finish(Exception failure) { - HyracksDataException hde = null; - try { - recordReader.close(); - } catch (Exception th) { - LOGGER.log(Level.WARN, "Failure during while operating a feed source", th); - hde = HyracksDataException.suppress(hde, th); - } - try { - tupleForwarder.close(); - } catch (Exception th) { - hde = HyracksDataException.suppress(hde, th); - } finally { - closeSignal(); - } + private Throwable finish(Throwable failure) { + Throwable th = ResourceReleaseUtils.close(recordReader, null); + th = DataflowUtils.close(tupleForwarder, th); + closeSignal(); setState(State.STOPPED); - if (hde != null) { - if (failure != null) { - failure.addSuppressed(hde); - } else { - return hde; - } - } - return failure; + return ExceptionUtils.suppress(failure, th); } private boolean parseAndForward(IRawRecord<? extends T> record) throws IOException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java index 88b8a14..5a9852e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java @@ -35,6 +35,7 @@ import org.apache.asterix.om.types.BuiltinType; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.DestroyUtils; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; @@ -42,6 +43,7 @@ import org.apache.hyracks.storage.am.btree.impls.RangePredicate; import org.apache.hyracks.storage.am.btree.util.BTreeUtils; import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils; import org.apache.hyracks.storage.am.lsm.btree.impls.ExternalBTree; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.common.IIndexCursor; @@ -135,12 +137,11 @@ public class ExternalFileIndexAccessor { } public void close() throws HyracksDataException { - if (index != null) { - try { - fileIndexSearchCursor.destroy(); - } finally { - indexDataflowHelper.close(); - } + Throwable failure = ResourceReleaseUtils.close(fileIndexSearchCursor, null); + failure = DestroyUtils.destroy(failure, fileIndexSearchCursor, fileIndexAccessor); + failure = ResourceReleaseUtils.close(indexDataflowHelper, failure); + if (failure != null) { + throw HyracksDataException.create(failure); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java index b487b40..471d23f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java @@ -25,6 +25,7 @@ import java.net.Socket; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -102,34 +103,14 @@ public class SocketServerInputStream extends AsterixInputStream { @Override public synchronized void close() throws IOException { - HyracksDataException hde = null; - try { - if (connectionStream != null) { - connectionStream.close(); - } - connectionStream = null; - } catch (IOException e) { - hde = HyracksDataException.create(e); - } - try { - if (socket != null) { - socket.close(); - } - socket = null; - } catch (IOException e) { - hde = HyracksDataException.suppress(hde, e); - } - try { - if (server != null) { - server.close(); - } - } catch (IOException e) { - hde = HyracksDataException.suppress(hde, e); - } finally { - server = null; - } - if (hde != null) { - throw hde; + Throwable failure = ResourceReleaseUtils.close(connectionStream, null); + connectionStream = null; + failure = ResourceReleaseUtils.close(socket, failure); + socket = null; + failure = ResourceReleaseUtils.close(server, failure); + server = null; + if (failure != null) { + throw HyracksDataException.create(failure); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java index 31a223f..438f1df 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java @@ -20,8 +20,8 @@ package org.apache.asterix.external.util; import java.util.Map; -import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.api.ITupleForwarder; import org.apache.asterix.external.api.ITupleForwarder.TupleForwardPolicy; import org.apache.asterix.external.dataflow.CounterTimerTupleForwarder; @@ -30,11 +30,20 @@ import org.apache.asterix.external.dataflow.FrameFullTupleForwarder; import org.apache.asterix.external.dataflow.RateControlledTupleForwarder; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class DataflowUtils { + private static final Logger LOGGER = LogManager.getLogger(); + + private DataflowUtils() { + } + public static void addTupleToFrame(FrameTupleAppender appender, ArrayTupleBuilder tb, IFrameWriter writer) throws HyracksDataException { if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) { @@ -80,4 +89,30 @@ public class DataflowUtils { } } } + + /** + * Close the ITupleForwarder and suppress any Throwable thrown by the close call. + * This method must NEVER throw any Throwable + * + * @param indexHelper + * the indexHelper to close + * @param root + * the first exception encountered during release of resources + * @return the root Throwable if not null or a new Throwable if any was thrown, otherwise, it returns null + */ + public static Throwable close(ITupleForwarder tupleForwarder, Throwable root) { + if (tupleForwarder != null) { + try { + tupleForwarder.close(); + } catch (Throwable th) { // NOSONAR Will be re-thrown + try { + LOGGER.log(Level.WARN, "Failure closing a closeable resource", th); + } catch (Throwable ignore) { // NOSONAR Logging exception will be ignored + // NOSONAR ignore + } + root = ExceptionUtils.suppress(root, th); + } + } + return root; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java index 681bae7..966c99a 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java @@ -1270,64 +1270,68 @@ public class MetadataNode implements IMetadataNode { StringBuilder sb = new StringBuilder(); try { + RangePredicate rangePred = null; IMetadataIndex index = MetadataPrimaryIndexes.DATAVERSE_DATASET; String resourceName = index.getFile().toString(); IIndex indexInstance = datasetLifecycleManager.get(resourceName); datasetLifecycleManager.open(resourceName); IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE); IIndexCursor rangeCursor = indexAccessor.createSearchCursor(false); - - RangePredicate rangePred = null; - rangePred = new RangePredicate(null, null, true, true, null, null); - indexAccessor.search(rangeCursor, rangePred); try { - while (rangeCursor.hasNext()) { - rangeCursor.next(); - sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] { - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING) })); + rangePred = new RangePredicate(null, null, true, true, null, null); + indexAccessor.search(rangeCursor, rangePred); + try { + while (rangeCursor.hasNext()) { + rangeCursor.next(); + sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), + new ISerializerDeserializer[] { SerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(BuiltinType.ASTRING) })); + } + } finally { + rangeCursor.close(); } - } finally { - rangeCursor.destroy(); - } - datasetLifecycleManager.close(resourceName); - - index = MetadataPrimaryIndexes.DATASET_DATASET; - indexInstance = datasetLifecycleManager.get(resourceName); - datasetLifecycleManager.open(resourceName); - indexAccessor = indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE); - rangeCursor = indexAccessor.createSearchCursor(false); - - rangePred = null; - rangePred = new RangePredicate(null, null, true, true, null, null); - indexAccessor.search(rangeCursor, rangePred); - try { - while (rangeCursor.hasNext()) { - rangeCursor.next(); - sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] { - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING), - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING) })); + datasetLifecycleManager.close(resourceName); + index = MetadataPrimaryIndexes.DATASET_DATASET; + indexInstance = datasetLifecycleManager.get(resourceName); + datasetLifecycleManager.open(resourceName); + indexAccessor = indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE); + rangeCursor = indexAccessor.createSearchCursor(false); + rangePred = null; + rangePred = new RangePredicate(null, null, true, true, null, null); + indexAccessor.search(rangeCursor, rangePred); + try { + while (rangeCursor.hasNext()) { + rangeCursor.next(); + sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), + new ISerializerDeserializer[] { + SerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(BuiltinType.ASTRING), + SerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(BuiltinType.ASTRING) })); + } + } finally { + rangeCursor.close(); } - } finally { - rangeCursor.destroy(); - } - datasetLifecycleManager.close(resourceName); - - index = MetadataPrimaryIndexes.INDEX_DATASET; - indexInstance = datasetLifecycleManager.get(resourceName); - datasetLifecycleManager.open(resourceName); - indexAccessor = indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE); - rangeCursor = indexAccessor.createSearchCursor(false); - - rangePred = null; - rangePred = new RangePredicate(null, null, true, true, null, null); - indexAccessor.search(rangeCursor, rangePred); - try { - while (rangeCursor.hasNext()) { - rangeCursor.next(); - sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] { - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING), - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING), - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING) })); + datasetLifecycleManager.close(resourceName); + index = MetadataPrimaryIndexes.INDEX_DATASET; + indexInstance = datasetLifecycleManager.get(resourceName); + datasetLifecycleManager.open(resourceName); + indexAccessor = indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE); + rangeCursor = indexAccessor.createSearchCursor(false); + rangePred = null; + rangePred = new RangePredicate(null, null, true, true, null, null); + indexAccessor.search(rangeCursor, rangePred); + try { + while (rangeCursor.hasNext()) { + rangeCursor.next(); + sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] { + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING), + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING), + SerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(BuiltinType.ASTRING) })); + } + } finally { + rangeCursor.close(); } } finally { rangeCursor.destroy(); @@ -1351,75 +1355,95 @@ public class MetadataNode implements IMetadataNode { IIndex indexInstance = datasetLifecycleManager.get(resourceName); datasetLifecycleManager.open(resourceName); IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE); - IIndexCursor rangeCursor = indexAccessor.createSearchCursor(false); - - IBinaryComparator[] searchCmps = null; - MultiComparator searchCmp = null; - RangePredicate rangePred = null; - if (searchKey != null) { - searchCmps = new IBinaryComparator[searchKey.getFieldCount()]; - for (int i = 0; i < searchKey.getFieldCount(); i++) { - searchCmps[i] = comparatorFactories[i].createBinaryComparator(); + try { + IBinaryComparator[] searchCmps = null; + MultiComparator searchCmp = null; + if (searchKey != null) { + searchCmps = new IBinaryComparator[searchKey.getFieldCount()]; + for (int i = 0; i < searchKey.getFieldCount(); i++) { + searchCmps[i] = comparatorFactories[i].createBinaryComparator(); + } + searchCmp = new MultiComparator(searchCmps); } - searchCmp = new MultiComparator(searchCmps); + RangePredicate rangePred = new RangePredicate(searchKey, searchKey, true, true, searchCmp, searchCmp); + search(indexAccessor, rangePred, results, valueExtractor, txnId); + } finally { + indexAccessor.destroy(); } - rangePred = new RangePredicate(searchKey, searchKey, true, true, searchCmp, searchCmp); - indexAccessor.search(rangeCursor, rangePred); + datasetLifecycleManager.close(resourceName); + } + private <ResultType> void search(IIndexAccessor indexAccessor, RangePredicate rangePred, List<ResultType> results, + IValueExtractor<ResultType> valueExtractor, TxnId txnId) + throws HyracksDataException, RemoteException, AlgebricksException { + IIndexCursor rangeCursor = indexAccessor.createSearchCursor(false); try { - while (rangeCursor.hasNext()) { - rangeCursor.next(); - ResultType result = valueExtractor.getValue(txnId, rangeCursor.getTuple()); - if (result != null) { - results.add(result); + indexAccessor.search(rangeCursor, rangePred); + try { + while (rangeCursor.hasNext()) { + rangeCursor.next(); + ResultType result = valueExtractor.getValue(txnId, rangeCursor.getTuple()); + if (result != null) { + results.add(result); + } } + } finally { + rangeCursor.close(); } } finally { rangeCursor.destroy(); } - datasetLifecycleManager.close(resourceName); } @Override public void initializeDatasetIdFactory(TxnId txnId) throws AlgebricksException, RemoteException { - int mostRecentDatasetId = MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID; + int mostRecentDatasetId; try { String resourceName = MetadataPrimaryIndexes.DATASET_DATASET.getFile().getRelativePath(); IIndex indexInstance = datasetLifecycleManager.get(resourceName); datasetLifecycleManager.open(resourceName); try { - IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE); - IIndexCursor rangeCursor = indexAccessor.createSearchCursor(false); - - DatasetTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDatasetTupleTranslator(false); - IValueExtractor<Dataset> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter); - RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null); + mostRecentDatasetId = getMostRecentDatasetIdFromStoredDatasetIndex(indexInstance, txnId); + } finally { + datasetLifecycleManager.close(resourceName); + } + } catch (HyracksDataException e) { + throw new AlgebricksException(e); + } + DatasetIdFactory.initialize(mostRecentDatasetId); + } + private int getMostRecentDatasetIdFromStoredDatasetIndex(IIndex indexInstance, TxnId txnId) + throws HyracksDataException, RemoteException, AlgebricksException { + DatasetTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDatasetTupleTranslator(false); + IValueExtractor<Dataset> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter); + RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null); + int mostRecentDatasetId = MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID; + IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE); + try { + IIndexCursor rangeCursor = indexAccessor.createSearchCursor(false); + try { indexAccessor.search(rangeCursor, rangePred); - int datasetId; - try { while (rangeCursor.hasNext()) { rangeCursor.next(); final ITupleReference ref = rangeCursor.getTuple(); final Dataset ds = valueExtractor.getValue(txnId, ref); - datasetId = ds.getDatasetId(); + int datasetId = ds.getDatasetId(); if (mostRecentDatasetId < datasetId) { mostRecentDatasetId = datasetId; } } } finally { - rangeCursor.destroy(); + rangeCursor.close(); } } finally { - datasetLifecycleManager.close(resourceName); + rangeCursor.destroy(); } - - } catch (HyracksDataException e) { - throw new AlgebricksException(e); + } finally { + indexAccessor.destroy(); } - - DatasetIdFactory.initialize(mostRecentDatasetId); + return mostRecentDatasetId; } // TODO: Can use Hyrack's TupleUtils for this, once we switch to a newer http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java index 21259cc..6bd9d99 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java @@ -140,16 +140,19 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe resetSearchPredicate(index); if (isFiltered || isDelete || hasSecondaries) { lsmAccessor.search(cursor, searchPred); - if (cursor.hasNext()) { - cursor.next(); - prevTuple = cursor.getTuple(); + try { + if (cursor.hasNext()) { + cursor.next(); + prevTuple = cursor.getTuple(); + appendFilterToPrevTuple(); + appendPrevRecord(); + appendPreviousMeta(); + appendFilterToOutput(); + } else { + appendPreviousTupleAsMissing(); + } + } finally { cursor.close(); // end the search - appendFilterToPrevTuple(); - appendPrevRecord(); - appendPreviousMeta(); - appendFilterToOutput(); - } else { - appendPreviousTupleAsMissing(); } } else { searchCallback.before(key); // lock @@ -319,7 +322,6 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe if (isFiltered) { writeMissingField(); } - cursor.close(); } /** @@ -362,7 +364,9 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe public void close() throws HyracksDataException { try { try { - cursor.destroy(); + if (cursor != null) { + cursor.destroy(); + } } finally { writer.close(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index e08bebe..4b154bd 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -352,7 +352,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent { try { if (fileLogDir.exists()) { List<Long> logFileIds = getLogFileIds(); - if (logFileIds == null) { + if (logFileIds.isEmpty()) { fileId = nextLogFileId; createFileIfNotExists(getLogFilePath(fileId)); if (LOGGER.isInfoEnabled()) { @@ -395,9 +395,11 @@ public class LogManager implements ILogManager, ILifeCycleComponent { public void deleteOldLogFiles(long checkpointLSN) { Long checkpointLSNLogFileID = getLogFileId(checkpointLSN); List<Long> logFileIds = getLogFileIds(); - if (logFileIds != null) { + if (!logFileIds.isEmpty()) { //sort log files from oldest to newest Collections.sort(logFileIds); + // remove the last one not to delete the current log file + logFileIds.remove(logFileIds.size() - 1); /** * At this point, any future LogReader should read from LSN >= checkpointLSN */ @@ -412,7 +414,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent { || (txnLogFileId2ReaderCount.containsKey(id) && txnLogFileId2ReaderCount.get(id) > 0)) { break; } - //delete old log file File file = new File(getLogFilePath(id)); file.delete(); @@ -447,12 +448,14 @@ public class LogManager implements ILogManager, ILifeCycleComponent { private long deleteAllLogFiles() { txnLogFileId2ReaderCount.clear(); List<Long> logFileIds = getLogFileIds(); - if (logFileIds != null) { + if (!logFileIds.isEmpty()) { for (Long id : logFileIds) { File file = new File(getLogFilePath(id)); + LOGGER.info("Deleting log file: " + file.getAbsolutePath()); if (!file.delete()) { throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath()); } + LOGGER.info("log file: " + file.getAbsolutePath() + " was deleted successfully"); } return logFileIds.get(logFileIds.size() - 1); } else { @@ -464,29 +467,40 @@ public class LogManager implements ILogManager, ILifeCycleComponent { File fileLogDir = new File(logDir); String[] logFileNames = null; List<Long> logFileIds = null; - if (fileLogDir.exists()) { - logFileNames = fileLogDir.list(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - if (name.startsWith(logFilePrefix)) { - return true; - } - return false; - } - }); - if (logFileNames != null && logFileNames.length != 0) { - logFileIds = new ArrayList<>(); - for (String fileName : logFileNames) { - logFileIds.add(Long.parseLong(fileName.substring(logFilePrefix.length() + 1))); + if (!fileLogDir.exists()) { + LOGGER.log(Level.INFO, "log dir " + logDir + " doesn't exist. returning empty list"); + return Collections.emptyList(); + } + if (!fileLogDir.isDirectory()) { + throw new IllegalStateException("log dir " + logDir + " exists but it is not a directory"); + } + logFileNames = fileLogDir.list(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + if (name.startsWith(logFilePrefix)) { + return true; } - Collections.sort(logFileIds, new Comparator<Long>() { - @Override - public int compare(Long arg0, Long arg1) { - return arg0.compareTo(arg1); - } - }); + return false; } - } + }); + if (logFileNames == null) { + throw new IllegalStateException("listing of log dir (" + logDir + ") files returned null. " + + "Either an IO error occurred or the dir was just deleted by another process/thread"); + } + if (logFileNames.length == 0) { + LOGGER.log(Level.INFO, "the log dir (" + logDir + ") is empty. returning empty list"); + return Collections.emptyList(); + } + logFileIds = new ArrayList<>(); + for (String fileName : logFileNames) { + logFileIds.add(Long.parseLong(fileName.substring(logFilePrefix.length() + 1))); + } + Collections.sort(logFileIds, new Comparator<Long>() { + @Override + public int compare(Long arg0, Long arg1) { + return arg0.compareTo(arg1); + } + }); return logFileIds; } @@ -551,7 +565,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent { @Override public long getReadableSmallestLSN() { List<Long> logFileIds = getLogFileIds(); - if (logFileIds != null) { + if (!logFileIds.isEmpty()) { return logFileIds.get(0) * logFileSize; } else { throw new IllegalStateException("Couldn't find any log files."); @@ -672,7 +686,6 @@ class LogFlusher implements Callable<Boolean> { public void terminate() { // make sure the LogFlusher thread started before terminating it. InvokeUtil.doUninterruptibly(started::acquire); - stopping = true; // we must tell any active flush, if any, to stop http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IDestroyable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IDestroyable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IDestroyable.java new file mode 100644 index 0000000..2e50215 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IDestroyable.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.api.dataflow; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +@FunctionalInterface +public interface IDestroyable { + /** + * Destroy the object and releases any system resources associated + * with it. If the object is already destroyed then invoking this + * method has no effect. + * The behavior of other calls after this method is invoked is undefined + * + * @throws HyracksDataException + */ + void destroy() throws HyracksDataException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java index d0e4655..6560d1b 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java @@ -30,12 +30,21 @@ public class HyracksDataException extends HyracksException { private static final long serialVersionUID = 1L; + /** + * Wrap the failure cause in a HyracksDataException. + * If the cause is an InterruptedException, the thread is interrupted first. + * If the cause is already a HyracksDataException, then return it as it is. + * + * @param cause + * the root failure + * @return the wrapped failure + */ public static HyracksDataException create(Throwable cause) { - if (cause instanceof HyracksDataException || cause == null) { + if (cause == null) { + throw new NullPointerException("Attempt to wrap null in a HyracksDataException"); + } + if (cause instanceof HyracksDataException) { return (HyracksDataException) cause; - } else if (cause instanceof Error) { - // don't wrap errors, allow them to propagate - throw (Error) cause; } else if (cause instanceof InterruptedException) { Thread.currentThread().interrupt(); } @@ -50,21 +59,6 @@ public class HyracksDataException extends HyracksException { return new HyracksDataException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), cause, params); } - public static HyracksDataException suppress(HyracksDataException root, Throwable th) { - if (root == null) { - return HyracksDataException.create(th); - } - if (th instanceof Error) { - // don't suppress errors into a HyracksDataException, allow them to propagate - th.addSuppressed(root); - throw (Error) th; - } else if (th instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - root.addSuppressed(th); - return root; - } - public HyracksDataException(String component, int errorCode, String message, Throwable cause, String nodeId, Serializable... params) { super(component, errorCode, message, cause, nodeId, params); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/DestroyUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/DestroyUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/DestroyUtils.java new file mode 100644 index 0000000..97c284d --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/DestroyUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.util; + +import org.apache.hyracks.api.dataflow.IDestroyable; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class DestroyUtils { + + private static final Logger LOGGER = LogManager.getLogger(); + + private DestroyUtils() { + } + + public static Throwable destroy(Throwable root, IDestroyable... destroyables) { + for (int i = 0; i < destroyables.length; i++) { + if (destroyables[i] != null) { + IDestroyable destroyable = destroyables[i]; + if (destroyable != null) { + try { + destroyable.destroy(); + } catch (Throwable th) { // NOSONAR. Had to be done to satisfy contracts + try { + LOGGER.log(Level.WARN, "Failure destroying a destroyable resource", th); + } catch (Throwable ignore) { + // Do nothing + } + root = ExceptionUtils.suppress(root, th); + } + } + } + } + return root; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java new file mode 100644 index 0000000..1a88e46 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * @author yingyib + */ +public class ExceptionUtils { + + private ExceptionUtils() { + } + + /** + * get a list of possible root causes from a list of all exceptions + * + * @param allExceptions + * @return List of possible root causes + */ + public static List<Exception> getActualExceptions(List<Exception> allExceptions) { + List<Exception> exceptions = new ArrayList<>(); + for (Exception exception : allExceptions) { + if (possibleRootCause(exception)) { + exceptions.add(exception); + } + } + return exceptions; + } + + /** + * Associate a node with a list of exceptions + * + * @param exceptions + * @param nodeId + */ + public static void setNodeIds(Collection<Exception> exceptions, String nodeId) { + List<Exception> newExceptions = new ArrayList<>(); + for (Exception e : exceptions) { + if (e instanceof HyracksDataException) { + if (((HyracksDataException) e).getNodeId() == null) { + newExceptions.add(HyracksDataException.create((HyracksDataException) e, nodeId)); + } else { + newExceptions.add(e); + } + } else { + newExceptions.add(new HyracksDataException(ErrorCode.HYRACKS, ErrorCode.FAILURE_ON_NODE, e, nodeId)); + } + } + exceptions.clear(); + exceptions.addAll(newExceptions); + } + + private static boolean possibleRootCause(Throwable exception) { + Throwable cause = exception; + while ((cause = cause.getCause()) != null) { + if (cause instanceof java.lang.InterruptedException + || cause instanceof java.nio.channels.ClosedChannelException) { + return false; + } + } + return true; + } + + /** + * Suppress the second exception if not null into the first exception if not null. + * If the suppressed exception is an instance of InterruptedException, the current thread is interrupted. + * + * @param first + * the root failure + * @param second + * the subsequent failure + * @return the root exception, or null if both parameters are null + */ + public static Throwable suppress(Throwable first, Throwable second) { + if (second != null && second instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + if (first == null) { + return second; + } else if (second == null) { + return first; + } + first.addSuppressed(second); + return first; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java index fa08420..e4699c7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java @@ -45,13 +45,13 @@ import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.partitions.PartitionId; +import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.DeployedJobSpecStore.DeployedJobSpecDescriptor; import org.apache.hyracks.control.cc.executor.ActivityPartitionDetails; import org.apache.hyracks.control.cc.executor.JobExecutor; import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker; import org.apache.hyracks.control.common.job.profiling.om.JobProfile; -import org.apache.hyracks.control.common.utils.ExceptionUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java deleted file mode 100644 index 1d506ca..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.common.utils; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.hyracks.api.exceptions.ErrorCode; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -/** - * @author yingyib - */ -public class ExceptionUtils { - - private ExceptionUtils() { - } - - /** - * get a list of possible root causes from a list of all exceptions - * - * @param allExceptions - * @return List of possible root causes - */ - public static List<Exception> getActualExceptions(List<Exception> allExceptions) { - List<Exception> exceptions = new ArrayList<>(); - for (Exception exception : allExceptions) { - if (possibleRootCause(exception)) { - exceptions.add(exception); - } - } - return exceptions; - } - - /** - * Associate a node with a list of exceptions - * - * @param exceptions - * @param nodeId - */ - public static void setNodeIds(Collection<Exception> exceptions, String nodeId) { - List<Exception> newExceptions = new ArrayList<>(); - for (Exception e : exceptions) { - if (e instanceof HyracksDataException) { - if (((HyracksDataException) e).getNodeId() == null) { - newExceptions.add(HyracksDataException.create((HyracksDataException) e, nodeId)); - } else { - newExceptions.add(e); - } - } else { - newExceptions.add(new HyracksDataException(ErrorCode.HYRACKS, ErrorCode.FAILURE_ON_NODE, e, nodeId)); - } - } - exceptions.clear(); - exceptions.addAll(newExceptions); - } - - private static boolean possibleRootCause(Throwable exception) { - Throwable cause = exception; - while ((cause = cause.getCause()) != null) { - if (cause instanceof java.lang.InterruptedException - || cause instanceof java.nio.channels.ClosedChannelException) { - return false; - } - } - return true; - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index 6e5a58e..9b32cc7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -56,13 +56,13 @@ import org.apache.hyracks.api.job.profiling.counters.ICounter; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.partitions.PartitionId; import org.apache.hyracks.api.resources.IDeallocatable; +import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.common.job.PartitionState; import org.apache.hyracks.control.common.job.profiling.StatsCollector; import org.apache.hyracks.control.common.job.profiling.counters.Counter; import org.apache.hyracks.control.common.job.profiling.om.PartitionProfile; import org.apache.hyracks.control.common.job.profiling.om.TaskProfile; -import org.apache.hyracks.control.common.utils.ExceptionUtils; import org.apache.hyracks.control.nc.io.WorkspaceFileFactory; import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry; import org.apache.hyracks.control.nc.work.NotifyTaskCompleteWork; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index 46b176e..e229149 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -54,10 +54,10 @@ import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.partitions.PartitionId; +import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.comm.channels.NetworkInputChannel; import org.apache.hyracks.control.common.deployment.DeploymentUtils; import org.apache.hyracks.control.common.job.TaskAttemptDescriptor; -import org.apache.hyracks.control.common.utils.ExceptionUtils; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.nc.Joblet; import org.apache.hyracks.control.nc.NodeControllerService; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java index 6748a4d..0efed6f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java @@ -53,7 +53,6 @@ public class InputChannelFrameReader implements IFrameReader, IInputChannelMonit try { wait(); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); throw HyracksDataException.create(e); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4ff6a36d/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/EnforcedIndexCursorTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/EnforcedIndexCursorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/EnforcedIndexCursorTest.java deleted file mode 100644 index 8fe689e..0000000 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/EnforcedIndexCursorTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.tests.unit; - -import org.apache.hyracks.storage.common.EnforcedIndexCursor; -import org.apache.hyracks.storage.common.ICursorInitialState; -import org.apache.hyracks.storage.common.IIndexCursor; -import org.apache.hyracks.storage.common.ISearchPredicate; -import org.mockito.Mockito; - -import java.util.ArrayList; -import java.util.List; - -public class EnforcedIndexCursorTest extends IIndexCursorTest { - @Override - protected List<ISearchPredicate> createSearchPredicates() { - List<ISearchPredicate> predicates = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - predicates.add(Mockito.mock(ISearchPredicate.class)); - } - return predicates; - } - - @Override - protected ICursorInitialState createCursorInitialState() { - return Mockito.mock(ICursorInitialState.class); - } - - @Override - protected IIndexCursor createCursor() { - return new EnforcedIndexCursor(); - } -}