Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2456

Change subject: [NO ISSUE][TX] Ensure Uncommited Atomic Txns Are Rolledback
......................................................................

[NO ISSUE][TX] Ensure Uncommited Atomic Txns Are Rolledback

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Ensure rollback of an atomic transaction starts
  from its first LSN.
- Ensure update logs of uncommited atomic transactions
  are undone during recovery.
- Add test case for atomic transaction rollback after
  flush.
- Add test case for atomic transaction recovery after
  flush.

Change-Id: If8d5df630f1d9119002ef91da5c282da18901acc
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
7 files changed, 198 insertions(+), 46 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/56/2456/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 74277ce..5a4bbc0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -49,6 +49,7 @@
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.transactions.Checkpoint;
@@ -174,16 +175,16 @@
     public synchronized void replayPartitionsLogs(Set<Integer> partitions, 
ILogReader logReader, long lowWaterMarkLSN)
             throws IOException, ACIDException {
         try {
-            Set<Long> winnerJobSet = startRecoverysAnalysisPhase(partitions, 
logReader, lowWaterMarkLSN);
-            startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, 
winnerJobSet);
+            Set<Long> winnerJobSet = startRecoveryAnalysisPhase(partitions, 
logReader, lowWaterMarkLSN);
+            startRecoveryUndoRedoPhase(partitions, logReader, lowWaterMarkLSN, 
winnerJobSet);
         } finally {
             logReader.close();
             deleteRecoveryTemporaryFiles();
         }
     }
 
-    private synchronized Set<Long> startRecoverysAnalysisPhase(Set<Integer> 
partitions, ILogReader logReader,
-            long lowWaterMarkLSN) throws IOException, ACIDException {
+    private synchronized Set<Long> startRecoveryAnalysisPhase(Set<Integer> 
partitions, ILogReader logReader,
+            long lowWaterMarkLSN) throws IOException {
         int updateLogCount = 0;
         int entityCommitLogCount = 0;
         int jobCommitLogCount = 0;
@@ -268,19 +269,21 @@
         jobEntityWinners.add(logRecord);
     }
 
-    private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, 
ILogReader logReader,
-            long lowWaterMarkLSN, Set<Long> winnerTxnSet) throws IOException, 
ACIDException {
+    private synchronized void startRecoveryUndoRedoPhase(Set<Integer> 
partitions, ILogReader logReader,
+            long lowWaterMarkLSN, Set<Long> winnerTxnSet) throws IOException {
         int redoCount = 0;
+        int undoCount = 0;
         long txnId = 0;
 
         long resourceId;
         long maxDiskLastLsn;
-        long lsn = -1;
-        ILSMIndex index = null;
-        LocalResource localResource = null;
-        DatasetLocalResource localResourceMetadata = null;
-        boolean foundWinner = false;
-        JobEntityCommits jobEntityWinners = null;
+        long lsn;
+        ILSMIndex index;
+        LocalResource localResource;
+        DatasetLocalResource localResourceMetadata;
+        boolean foundWinner;
+        boolean foundLoser;
+        JobEntityCommits jobEntityWinners;
 
         IDatasetLifecycleManager datasetLifecycleManager = 
appCtx.getDatasetLifecycleManager();
         final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
@@ -290,8 +293,8 @@
         Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>();
         TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, 
false);
 
-        ILogRecord logRecord = null;
-        ILSMComponentIdGenerator idGenerator = null;
+        ILogRecord logRecord;
+        ILSMComponentIdGenerator idGenerator;
         try {
             logReader.setPosition(lowWaterMarkLSN);
             logRecord = logReader.next();
@@ -302,6 +305,7 @@
                 lsn = logRecord.getLSN();
                 txnId = logRecord.getTxnId();
                 foundWinner = false;
+                foundLoser = false;
                 switch (logRecord.getLogType()) {
                     case LogType.UPDATE:
                         if 
(partitions.contains(logRecord.getResourcePartition())) {
@@ -314,11 +318,13 @@
                                 if 
(jobEntityWinners.containsEntityCommitForTxnId(lsn, tempKeyTxnEntityId)) {
                                     foundWinner = true;
                                 }
+                            } else if (isAtomicTxnLog(logRecord)) {
+                                foundLoser = true;
                             }
-                            if (foundWinner) {
+                            if (foundWinner || foundLoser) {
                                 resourceId = logRecord.getResourceId();
                                 localResource = resourcesMap.get(resourceId);
-                                
/*******************************************************************
+                                /*
                                  * [Notice]
                                  * -> Issue
                                  * Delete index may cause a problem during 
redo.
@@ -330,22 +336,21 @@
                                  * the corresponding index is retrieved, which 
will end up with 'null'.
                                  * If null is returned, then just go and 
process the next
                                  * log record.
-                                 
*******************************************************************/
+                                 */
                                 if (localResource == null) {
                                     LOGGER.log(Level.WARN, "resource was not 
found for resource id " + resourceId);
                                     logRecord = logReader.next();
                                     continue;
                                 }
-                                
/*******************************************************************/
 
-                                //get index instance from IndexLifeCycleManager
-                                //if index is not registered into 
IndexLifeCycleManager,
-                                //create the index using LocalMetadata stored 
in LocalResourceRepository
-                                //get partition path in this node
+                                // get index instance from 
IndexLifeCycleManager
+                                // if index is not registered into 
IndexLifeCycleManager,
+                                // create the index using LocalMetadata stored 
in LocalResourceRepository
+                                // get partition path in this node
                                 localResourceMetadata = (DatasetLocalResource) 
localResource.getResource();
                                 index = (ILSMIndex) 
datasetLifecycleManager.get(localResource.getPath());
                                 if (index == null) {
-                                    //#. create index instance and register to 
indexLifeCycleManager
+                                    // create index instance and register to 
indexLifeCycleManager
                                     index = (ILSMIndex) 
localResourceMetadata.createInstance(serviceCtx);
                                     
datasetLifecycleManager.register(localResource.getPath(), index);
                                     
datasetLifecycleManager.open(localResource.getPath());
@@ -358,15 +363,18 @@
                                         
datasetLifecycleManager.close(localResource.getPath());
                                         throw e;
                                     }
-                                    //#. set resourceId and maxDiskLastLSN to 
the map
+                                    // set resourceId and maxDiskLastLSN to 
the map
                                     resourceId2MaxLSNMap.put(resourceId, 
maxDiskLastLsn);
                                 } else {
                                     maxDiskLastLsn = 
resourceId2MaxLSNMap.get(resourceId);
                                 }
                                 // lsn @ maxDiskLastLsn is either a flush log 
or a master replica log
-                                if (lsn >= maxDiskLastLsn) {
+                                if (foundWinner && lsn >= maxDiskLastLsn) {
                                     redo(logRecord, datasetLifecycleManager);
                                     redoCount++;
+                                } else if (foundLoser) {
+                                    undo(logRecord, datasetLifecycleManager);
+                                    undoCount++;
                                 }
                             }
                         }
@@ -425,7 +433,7 @@
                 }
                 logRecord = logReader.next();
             }
-            LOGGER.info("Logs REDO phase completed. Redo logs count: " + 
redoCount);
+            LOGGER.info("Logs REDO phase completed. Redo logs count: {}, undo 
logs count: {}", redoCount, undoCount);
         } finally {
             txnSubsystem.getTransactionManager().ensureMaxTxnId(txnId);
             //close all indexes
@@ -566,18 +574,7 @@
     public void rollbackTransaction(ITransactionContext txnContext) throws 
ACIDException {
         long abortedTxnId = txnContext.getTxnId().getId();
         // Obtain the first/last log record LSNs written by the Job
-        long firstLSN = txnContext.getFirstLSN();
-        /*
-         * The effect of any log record with LSN below minFirstLSN has already 
been written to disk and
-         * will not be rolled back. Therefore, we will set the first LSN of 
the job to the maximum of
-         * minFirstLSN and the job's first LSN.
-         */
-        try {
-            long localMinFirstLSN = getLocalMinFirstLSN();
-            firstLSN = Math.max(firstLSN, localMinFirstLSN);
-        } catch (HyracksDataException e) {
-            throw new ACIDException(e);
-        }
+        long firstLSN = getRollbackFirstLsn(txnContext);
         long lastLSN = txnContext.getLastLSN();
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("rollbacking transaction log records from " + firstLSN 
+ " to " + lastLSN);
@@ -724,6 +721,30 @@
         // do nothing
     }
 
+    private long getRollbackFirstLsn(ITransactionContext txnContext) {
+        long firstLSN = txnContext.getFirstLSN();
+        switch (txnContext.getAtomicityLevel()) {
+            case ATOMIC:
+                break;
+            case ENTITY_LEVEL:
+                try {
+                    /*
+                     * The effect of any log record with LSN below minFirstLSN 
has already been written to disk and
+                     * will not be rolled back. Therefore, we will set the 
first LSN of the txn to the maximum of
+                     * minFirstLSN and the transaction's first LSN.
+                     */
+                    final long localMinFirstLSN = getLocalMinFirstLSN();
+                    firstLSN = Math.max(firstLSN, localMinFirstLSN);
+                } catch (HyracksDataException e) {
+                    throw new ACIDException(e);
+                }
+                break;
+            default:
+                throw new IllegalStateException("Unknown atomicity level: " + 
txnContext.getAtomicityLevel());
+        }
+        return firstLSN;
+    }
+
     private static void undo(ILogRecord logRecord, IDatasetLifecycleManager 
datasetLifecycleManager) {
         try {
             ILSMIndex index =
@@ -807,6 +828,10 @@
         accessor.scheduleFlush(index.getIOOperationCallback());
     }
 
+    private static boolean isAtomicTxnLog(ILogRecord logRecord) {
+        return 
MetadataIndexImmutableProperties.isMetadataDataset(logRecord.getDatasetId());
+    }
+
     private class JobEntityCommits {
         private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
         private final long txnId;
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
index 70e5f6e..4efc27b 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
@@ -19,14 +19,17 @@
 package org.apache.asterix.test.metadata;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.config.DatasetConfig;
@@ -36,12 +39,14 @@
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
+import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.junit.After;
@@ -67,6 +72,46 @@
     }
 
     @Test
+    public void undoFlushedUncommittedMetadataTxn() throws Exception {
+        ICcApplicationContext appCtx =
+                (ICcApplicationContext) 
integrationUtil.getClusterControllerService().getApplicationContext();
+        final MetadataProvider metadataProvider = new MetadataProvider(appCtx, 
null);
+        final MetadataTransactionContext mdTxn = 
MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxn);
+        final String nodeGroupName = "ng";
+        try {
+            final List<String> ngNodes = 
Collections.singletonList("asterix_nc1");
+            MetadataManager.INSTANCE.addNodegroup(mdTxn, new 
NodeGroup(nodeGroupName, ngNodes));
+
+            // force flush
+            IDatasetLifecycleManager datasetLifecycleManager =
+                    ((NCAppRuntimeContext) 
integrationUtil.ncs[0].getApplicationContext()).getDatasetLifecycleManager();
+            PrimaryIndexOperationTracker operationTracker = 
datasetLifecycleManager
+                    
.getOperationTracker(MetadataPrimaryIndexes.NODEGROUP_DATASET.getDatasetId().getId(),
 0);
+            InvokeUtil.runWithTimeout(() -> {
+                synchronized (operationTracker) {
+                    operationTracker.wait(1000);
+                }
+            }, () -> operationTracker.getNumActiveOperations() == 0, 5, 
TimeUnit.SECONDS);
+            datasetLifecycleManager.flushAllDatasets();
+            // abort
+            MetadataManager.INSTANCE.abortTransaction(mdTxn);
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+        // ensure that the node group was not added
+        final MetadataTransactionContext readMdTxn = 
MetadataManager.INSTANCE.beginTransaction();
+        try {
+            final NodeGroup nodegroup = 
MetadataManager.INSTANCE.getNodegroup(readMdTxn, nodeGroupName);
+            if (nodegroup != null) {
+                throw new AssertionError("nodegroup was found after flush");
+            }
+        } finally {
+            MetadataManager.INSTANCE.commitTransaction(readMdTxn);
+        }
+    }
+
+    @Test
     public void abortMetadataTxn() throws Exception {
         ICcApplicationContext appCtx =
                 (ICcApplicationContext) 
integrationUtil.getClusterControllerService().getApplicationContext();
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
index 05e5aad..82a12b1 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
@@ -19,23 +19,30 @@
 package org.apache.asterix.test.txn;
 
 import java.io.File;
-import java.util.logging.ConsoleHandler;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.TestDataUtil;
-import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.utils.Servlets;
-import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
-import org.apache.asterix.test.common.TestHelper;
+import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.NodeGroup;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 public class RecoveryManagerTest {
 
@@ -110,4 +117,41 @@
         final long countAfterRecovery = 
TestDataUtil.getDatasetCount(datasetName);
         Assert.assertEquals(countBeforeRecovery, countAfterRecovery);
     }
+
+    @Test
+    public void atomicTxnRecovery() throws Exception {
+        ICcApplicationContext appCtx =
+                (ICcApplicationContext) 
integrationUtil.getClusterControllerService().getApplicationContext();
+        final MetadataProvider metadataProvider = new MetadataProvider(appCtx, 
null);
+        final MetadataTransactionContext mdTxn = 
MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxn);
+        final String nodeGroupName = "ng";
+        //        try {
+        final List<String> ngNodes = Collections.singletonList("asterix_nc1");
+        MetadataManager.INSTANCE.addNodegroup(mdTxn, new 
NodeGroup(nodeGroupName, ngNodes));
+        IDatasetLifecycleManager datasetLifecycleManager =
+                ((NCAppRuntimeContext) 
integrationUtil.ncs[0].getApplicationContext()).getDatasetLifecycleManager();
+        PrimaryIndexOperationTracker operationTracker = datasetLifecycleManager
+                
.getOperationTracker(MetadataPrimaryIndexes.NODEGROUP_DATASET.getDatasetId().getId(),
 0);
+        InvokeUtil.runWithTimeout(() -> {
+            synchronized (operationTracker) {
+                operationTracker.wait(1000);
+            }
+        }, () -> operationTracker.getNumActiveOperations() == 0, 5, 
TimeUnit.SECONDS);
+        // force flush
+        datasetLifecycleManager.flushAllDatasets();
+        // force recovery
+        integrationUtil.deinit(false);
+        integrationUtil.init(false, TEST_CONFIG_FILE_PATH);
+        // ensure that recovery removed the uncommitted txn effects
+        final MetadataTransactionContext readMdTxn = 
MetadataManager.INSTANCE.beginTransaction();
+        try {
+            final NodeGroup nodegroup = 
MetadataManager.INSTANCE.getNodegroup(readMdTxn, nodeGroupName);
+            if (nodegroup != null) {
+                throw new AssertionError("nodegroup was found after crash 
recovery");
+            }
+        } finally {
+            MetadataManager.INSTANCE.commitTransaction(readMdTxn);
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index a3d5bc5..afead85 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -155,4 +155,11 @@
      * so that any resources held by the transaction may be released
      */
     void complete();
+
+    /**
+     * Gets the atomicity level of this transaction
+     *
+     * @return the atomicity level
+     */
+    ITransactionManager.AtomicityLevel getAtomicityLevel();
 }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
index 219cf07..e579952 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -24,6 +24,7 @@
 
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -104,4 +105,9 @@
         AtomicTransactionContext that = (AtomicTransactionContext) o;
         return this.txnId.equals(that.txnId);
     }
+
+    @Override
+    public ITransactionManager.AtomicityLevel getAtomicityLevel() {
+        return ITransactionManager.AtomicityLevel.ATOMIC;
+    }
 }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
index 9d2f54b..094108a 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -25,6 +25,7 @@
 
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -114,4 +115,9 @@
         EntityLevelTransactionContext that = (EntityLevelTransactionContext) o;
         return this.txnId.equals(that.txnId);
     }
+
+    @Override
+    public ITransactionManager.AtomicityLevel getAtomicityLevel() {
+        return ITransactionManager.AtomicityLevel.ENTITY_LEVEL;
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index ba4f82d..dc3cf13 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -22,6 +22,9 @@
 import java.nio.channels.ClosedByInterruptException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BooleanSupplier;
+import java.util.function.Supplier;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.logging.log4j.Level;
@@ -232,6 +235,22 @@
         }
     }
 
+    /**
+     * Tries to run the supplied {@code action} until {@code stopCondition} is 
met or timeout.
+     */
+    public static void runWithTimeout(ThrowingAction action, BooleanSupplier 
stopCondition, long timeout, TimeUnit unit)
+            throws Exception {
+        long remainingTime = unit.toNanos(timeout);
+        final long startTime = System.nanoTime();
+        while (!stopCondition.getAsBoolean()) {
+            if (remainingTime <= 0) {
+                throw new TimeoutException();
+            }
+            action.run();
+            remainingTime -= System.nanoTime() - startTime;
+        }
+    }
+
     @FunctionalInterface
     public interface InterruptibleAction {
         void run() throws InterruptedException;

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2456
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: If8d5df630f1d9119002ef91da5c282da18901acc
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>

Reply via email to