Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2456
Change subject: [NO ISSUE][TX] Ensure Uncommited Atomic Txns Are Rolledback
......................................................................
[NO ISSUE][TX] Ensure Uncommited Atomic Txns Are Rolledback
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Ensure rollback of an atomic transaction starts
from its first LSN.
- Ensure update logs of uncommited atomic transactions
are undone during recovery.
- Add test case for atomic transaction rollback after
flush.
- Add test case for atomic transaction recovery after
flush.
Change-Id: If8d5df630f1d9119002ef91da5c282da18901acc
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
7 files changed, 198 insertions(+), 46 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/56/2456/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 74277ce..5a4bbc0 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -49,6 +49,7 @@
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.storage.DatasetResourceReference;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.transactions.Checkpoint;
@@ -174,16 +175,16 @@
public synchronized void replayPartitionsLogs(Set<Integer> partitions,
ILogReader logReader, long lowWaterMarkLSN)
throws IOException, ACIDException {
try {
- Set<Long> winnerJobSet = startRecoverysAnalysisPhase(partitions,
logReader, lowWaterMarkLSN);
- startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN,
winnerJobSet);
+ Set<Long> winnerJobSet = startRecoveryAnalysisPhase(partitions,
logReader, lowWaterMarkLSN);
+ startRecoveryUndoRedoPhase(partitions, logReader, lowWaterMarkLSN,
winnerJobSet);
} finally {
logReader.close();
deleteRecoveryTemporaryFiles();
}
}
- private synchronized Set<Long> startRecoverysAnalysisPhase(Set<Integer>
partitions, ILogReader logReader,
- long lowWaterMarkLSN) throws IOException, ACIDException {
+ private synchronized Set<Long> startRecoveryAnalysisPhase(Set<Integer>
partitions, ILogReader logReader,
+ long lowWaterMarkLSN) throws IOException {
int updateLogCount = 0;
int entityCommitLogCount = 0;
int jobCommitLogCount = 0;
@@ -268,19 +269,21 @@
jobEntityWinners.add(logRecord);
}
- private synchronized void startRecoveryRedoPhase(Set<Integer> partitions,
ILogReader logReader,
- long lowWaterMarkLSN, Set<Long> winnerTxnSet) throws IOException,
ACIDException {
+ private synchronized void startRecoveryUndoRedoPhase(Set<Integer>
partitions, ILogReader logReader,
+ long lowWaterMarkLSN, Set<Long> winnerTxnSet) throws IOException {
int redoCount = 0;
+ int undoCount = 0;
long txnId = 0;
long resourceId;
long maxDiskLastLsn;
- long lsn = -1;
- ILSMIndex index = null;
- LocalResource localResource = null;
- DatasetLocalResource localResourceMetadata = null;
- boolean foundWinner = false;
- JobEntityCommits jobEntityWinners = null;
+ long lsn;
+ ILSMIndex index;
+ LocalResource localResource;
+ DatasetLocalResource localResourceMetadata;
+ boolean foundWinner;
+ boolean foundLoser;
+ JobEntityCommits jobEntityWinners;
IDatasetLifecycleManager datasetLifecycleManager =
appCtx.getDatasetLifecycleManager();
final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
@@ -290,8 +293,8 @@
Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>();
TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1,
false);
- ILogRecord logRecord = null;
- ILSMComponentIdGenerator idGenerator = null;
+ ILogRecord logRecord;
+ ILSMComponentIdGenerator idGenerator;
try {
logReader.setPosition(lowWaterMarkLSN);
logRecord = logReader.next();
@@ -302,6 +305,7 @@
lsn = logRecord.getLSN();
txnId = logRecord.getTxnId();
foundWinner = false;
+ foundLoser = false;
switch (logRecord.getLogType()) {
case LogType.UPDATE:
if
(partitions.contains(logRecord.getResourcePartition())) {
@@ -314,11 +318,13 @@
if
(jobEntityWinners.containsEntityCommitForTxnId(lsn, tempKeyTxnEntityId)) {
foundWinner = true;
}
+ } else if (isAtomicTxnLog(logRecord)) {
+ foundLoser = true;
}
- if (foundWinner) {
+ if (foundWinner || foundLoser) {
resourceId = logRecord.getResourceId();
localResource = resourcesMap.get(resourceId);
-
/*******************************************************************
+ /*
* [Notice]
* -> Issue
* Delete index may cause a problem during
redo.
@@ -330,22 +336,21 @@
* the corresponding index is retrieved, which
will end up with 'null'.
* If null is returned, then just go and
process the next
* log record.
-
*******************************************************************/
+ */
if (localResource == null) {
LOGGER.log(Level.WARN, "resource was not
found for resource id " + resourceId);
logRecord = logReader.next();
continue;
}
-
/*******************************************************************/
- //get index instance from IndexLifeCycleManager
- //if index is not registered into
IndexLifeCycleManager,
- //create the index using LocalMetadata stored
in LocalResourceRepository
- //get partition path in this node
+ // get index instance from
IndexLifeCycleManager
+ // if index is not registered into
IndexLifeCycleManager,
+ // create the index using LocalMetadata stored
in LocalResourceRepository
+ // get partition path in this node
localResourceMetadata = (DatasetLocalResource)
localResource.getResource();
index = (ILSMIndex)
datasetLifecycleManager.get(localResource.getPath());
if (index == null) {
- //#. create index instance and register to
indexLifeCycleManager
+ // create index instance and register to
indexLifeCycleManager
index = (ILSMIndex)
localResourceMetadata.createInstance(serviceCtx);
datasetLifecycleManager.register(localResource.getPath(), index);
datasetLifecycleManager.open(localResource.getPath());
@@ -358,15 +363,18 @@
datasetLifecycleManager.close(localResource.getPath());
throw e;
}
- //#. set resourceId and maxDiskLastLSN to
the map
+ // set resourceId and maxDiskLastLSN to
the map
resourceId2MaxLSNMap.put(resourceId,
maxDiskLastLsn);
} else {
maxDiskLastLsn =
resourceId2MaxLSNMap.get(resourceId);
}
// lsn @ maxDiskLastLsn is either a flush log
or a master replica log
- if (lsn >= maxDiskLastLsn) {
+ if (foundWinner && lsn >= maxDiskLastLsn) {
redo(logRecord, datasetLifecycleManager);
redoCount++;
+ } else if (foundLoser) {
+ undo(logRecord, datasetLifecycleManager);
+ undoCount++;
}
}
}
@@ -425,7 +433,7 @@
}
logRecord = logReader.next();
}
- LOGGER.info("Logs REDO phase completed. Redo logs count: " +
redoCount);
+ LOGGER.info("Logs REDO phase completed. Redo logs count: {}, undo
logs count: {}", redoCount, undoCount);
} finally {
txnSubsystem.getTransactionManager().ensureMaxTxnId(txnId);
//close all indexes
@@ -566,18 +574,7 @@
public void rollbackTransaction(ITransactionContext txnContext) throws
ACIDException {
long abortedTxnId = txnContext.getTxnId().getId();
// Obtain the first/last log record LSNs written by the Job
- long firstLSN = txnContext.getFirstLSN();
- /*
- * The effect of any log record with LSN below minFirstLSN has already
been written to disk and
- * will not be rolled back. Therefore, we will set the first LSN of
the job to the maximum of
- * minFirstLSN and the job's first LSN.
- */
- try {
- long localMinFirstLSN = getLocalMinFirstLSN();
- firstLSN = Math.max(firstLSN, localMinFirstLSN);
- } catch (HyracksDataException e) {
- throw new ACIDException(e);
- }
+ long firstLSN = getRollbackFirstLsn(txnContext);
long lastLSN = txnContext.getLastLSN();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("rollbacking transaction log records from " + firstLSN
+ " to " + lastLSN);
@@ -724,6 +721,30 @@
// do nothing
}
+ private long getRollbackFirstLsn(ITransactionContext txnContext) {
+ long firstLSN = txnContext.getFirstLSN();
+ switch (txnContext.getAtomicityLevel()) {
+ case ATOMIC:
+ break;
+ case ENTITY_LEVEL:
+ try {
+ /*
+ * The effect of any log record with LSN below minFirstLSN
has already been written to disk and
+ * will not be rolled back. Therefore, we will set the
first LSN of the txn to the maximum of
+ * minFirstLSN and the transaction's first LSN.
+ */
+ final long localMinFirstLSN = getLocalMinFirstLSN();
+ firstLSN = Math.max(firstLSN, localMinFirstLSN);
+ } catch (HyracksDataException e) {
+ throw new ACIDException(e);
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unknown atomicity level: " +
txnContext.getAtomicityLevel());
+ }
+ return firstLSN;
+ }
+
private static void undo(ILogRecord logRecord, IDatasetLifecycleManager
datasetLifecycleManager) {
try {
ILSMIndex index =
@@ -807,6 +828,10 @@
accessor.scheduleFlush(index.getIOOperationCallback());
}
+ private static boolean isAtomicTxnLog(ILogRecord logRecord) {
+ return
MetadataIndexImmutableProperties.isMetadataDataset(logRecord.getDatasetId());
+ }
+
private class JobEntityCommits {
private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
private final long txnId;
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
index 70e5f6e..4efc27b 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
@@ -19,14 +19,17 @@
package org.apache.asterix.test.metadata;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.config.DatasetConfig;
@@ -36,12 +39,14 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
+import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.NodeGroup;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
import org.junit.After;
@@ -67,6 +72,46 @@
}
@Test
+ public void undoFlushedUncommittedMetadataTxn() throws Exception {
+ ICcApplicationContext appCtx =
+ (ICcApplicationContext)
integrationUtil.getClusterControllerService().getApplicationContext();
+ final MetadataProvider metadataProvider = new MetadataProvider(appCtx,
null);
+ final MetadataTransactionContext mdTxn =
MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxn);
+ final String nodeGroupName = "ng";
+ try {
+ final List<String> ngNodes =
Collections.singletonList("asterix_nc1");
+ MetadataManager.INSTANCE.addNodegroup(mdTxn, new
NodeGroup(nodeGroupName, ngNodes));
+
+ // force flush
+ IDatasetLifecycleManager datasetLifecycleManager =
+ ((NCAppRuntimeContext)
integrationUtil.ncs[0].getApplicationContext()).getDatasetLifecycleManager();
+ PrimaryIndexOperationTracker operationTracker =
datasetLifecycleManager
+
.getOperationTracker(MetadataPrimaryIndexes.NODEGROUP_DATASET.getDatasetId().getId(),
0);
+ InvokeUtil.runWithTimeout(() -> {
+ synchronized (operationTracker) {
+ operationTracker.wait(1000);
+ }
+ }, () -> operationTracker.getNumActiveOperations() == 0, 5,
TimeUnit.SECONDS);
+ datasetLifecycleManager.flushAllDatasets();
+ // abort
+ MetadataManager.INSTANCE.abortTransaction(mdTxn);
+ } finally {
+ metadataProvider.getLocks().unlock();
+ }
+ // ensure that the node group was not added
+ final MetadataTransactionContext readMdTxn =
MetadataManager.INSTANCE.beginTransaction();
+ try {
+ final NodeGroup nodegroup =
MetadataManager.INSTANCE.getNodegroup(readMdTxn, nodeGroupName);
+ if (nodegroup != null) {
+ throw new AssertionError("nodegroup was found after flush");
+ }
+ } finally {
+ MetadataManager.INSTANCE.commitTransaction(readMdTxn);
+ }
+ }
+
+ @Test
public void abortMetadataTxn() throws Exception {
ICcApplicationContext appCtx =
(ICcApplicationContext)
integrationUtil.getClusterControllerService().getApplicationContext();
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
index 05e5aad..82a12b1 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
@@ -19,23 +19,30 @@
package org.apache.asterix.test.txn;
import java.io.File;
-import java.util.logging.ConsoleHandler;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.TestDataUtil;
-import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.utils.Servlets;
-import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
-import org.apache.asterix.test.common.TestHelper;
+import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.NodeGroup;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
public class RecoveryManagerTest {
@@ -110,4 +117,41 @@
final long countAfterRecovery =
TestDataUtil.getDatasetCount(datasetName);
Assert.assertEquals(countBeforeRecovery, countAfterRecovery);
}
+
+ @Test
+ public void atomicTxnRecovery() throws Exception {
+ ICcApplicationContext appCtx =
+ (ICcApplicationContext)
integrationUtil.getClusterControllerService().getApplicationContext();
+ final MetadataProvider metadataProvider = new MetadataProvider(appCtx,
null);
+ final MetadataTransactionContext mdTxn =
MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxn);
+ final String nodeGroupName = "ng";
+ // try {
+ final List<String> ngNodes = Collections.singletonList("asterix_nc1");
+ MetadataManager.INSTANCE.addNodegroup(mdTxn, new
NodeGroup(nodeGroupName, ngNodes));
+ IDatasetLifecycleManager datasetLifecycleManager =
+ ((NCAppRuntimeContext)
integrationUtil.ncs[0].getApplicationContext()).getDatasetLifecycleManager();
+ PrimaryIndexOperationTracker operationTracker = datasetLifecycleManager
+
.getOperationTracker(MetadataPrimaryIndexes.NODEGROUP_DATASET.getDatasetId().getId(),
0);
+ InvokeUtil.runWithTimeout(() -> {
+ synchronized (operationTracker) {
+ operationTracker.wait(1000);
+ }
+ }, () -> operationTracker.getNumActiveOperations() == 0, 5,
TimeUnit.SECONDS);
+ // force flush
+ datasetLifecycleManager.flushAllDatasets();
+ // force recovery
+ integrationUtil.deinit(false);
+ integrationUtil.init(false, TEST_CONFIG_FILE_PATH);
+ // ensure that recovery removed the uncommitted txn effects
+ final MetadataTransactionContext readMdTxn =
MetadataManager.INSTANCE.beginTransaction();
+ try {
+ final NodeGroup nodegroup =
MetadataManager.INSTANCE.getNodegroup(readMdTxn, nodeGroupName);
+ if (nodegroup != null) {
+ throw new AssertionError("nodegroup was found after crash
recovery");
+ }
+ } finally {
+ MetadataManager.INSTANCE.commitTransaction(readMdTxn);
+ }
+ }
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index a3d5bc5..afead85 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -155,4 +155,11 @@
* so that any resources held by the transaction may be released
*/
void complete();
+
+ /**
+ * Gets the atomicity level of this transaction
+ *
+ * @return the atomicity level
+ */
+ ITransactionManager.AtomicityLevel getAtomicityLevel();
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
index 219cf07..e579952 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -24,6 +24,7 @@
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -104,4 +105,9 @@
AtomicTransactionContext that = (AtomicTransactionContext) o;
return this.txnId.equals(that.txnId);
}
+
+ @Override
+ public ITransactionManager.AtomicityLevel getAtomicityLevel() {
+ return ITransactionManager.AtomicityLevel.ATOMIC;
+ }
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
index 9d2f54b..094108a 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -25,6 +25,7 @@
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -114,4 +115,9 @@
EntityLevelTransactionContext that = (EntityLevelTransactionContext) o;
return this.txnId.equals(that.txnId);
}
+
+ @Override
+ public ITransactionManager.AtomicityLevel getAtomicityLevel() {
+ return ITransactionManager.AtomicityLevel.ENTITY_LEVEL;
+ }
}
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index ba4f82d..dc3cf13 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -22,6 +22,9 @@
import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BooleanSupplier;
+import java.util.function.Supplier;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.Level;
@@ -232,6 +235,22 @@
}
}
+ /**
+ * Tries to run the supplied {@code action} until {@code stopCondition} is
met or timeout.
+ */
+ public static void runWithTimeout(ThrowingAction action, BooleanSupplier
stopCondition, long timeout, TimeUnit unit)
+ throws Exception {
+ long remainingTime = unit.toNanos(timeout);
+ final long startTime = System.nanoTime();
+ while (!stopCondition.getAsBoolean()) {
+ if (remainingTime <= 0) {
+ throw new TimeoutException();
+ }
+ action.run();
+ remainingTime -= System.nanoTime() - startTime;
+ }
+ }
+
@FunctionalInterface
public interface InterruptibleAction {
void run() throws InterruptedException;
--
To view, visit https://asterix-gerrit.ics.uci.edu/2456
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: If8d5df630f1d9119002ef91da5c282da18901acc
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>