abdullah alamoudi has submitted this change and it was merged.

Change subject: [NO ISSUE][RT] Fix wait for IO operations
......................................................................


[NO ISSUE][RT] Fix wait for IO operations

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Some operations such as close dataset, delete components,
  and drop index need to wait for IO operations.
- Before this change, the wait for IO operation would just check
  the count of IO operations on the dataset info. This is not
  enough as a flush might have started by writing the flush log to
  the log tail but only on the flush of that log, we trigger the
  flush operation and the count of IO operation increases.
- To address this problem, we write a wait log before we check the
  IO operation count ensuring that any flush logs in the log tail
  have been flushed and counts incremented.

Change-Id: Ibfa883410cd24e0af54732f7ea6f1b4eb2184e8e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2495
Sonar-Qube: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Contrib: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M 
asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
4 files changed, 31 insertions(+), 19 deletions(-)

Approvals:
  Anon. E. Moose #1000171: 
  Jenkins: Verified; No violations found; ; Verified
  Murtadha Hubail: Looks good to me, approved



diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 44baf77..f4d764a 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -23,6 +23,9 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.logging.log4j.LogManager;
@@ -35,6 +38,8 @@
     // resourceID -> index
     private final Map<Long, IndexInfo> indexes;
     private final int datasetID;
+    private final ILogManager logManager;
+    private final LogRecord waitLog = new LogRecord();
     private int numActiveIOOps;
     private long lastAccess;
     private boolean isExternal;
@@ -42,13 +47,16 @@
     private boolean memoryAllocated;
     private boolean durable;
 
-    public DatasetInfo(int datasetID) {
+    public DatasetInfo(int datasetID, ILogManager logManager) {
         this.partitionIndexes = new HashMap<>();
         this.indexes = new HashMap<>();
         this.setLastAccess(-1);
         this.datasetID = datasetID;
         this.setRegistered(false);
         this.setMemoryAllocated(false);
+        this.logManager = logManager;
+        waitLog.setLogType(LogType.WAIT);
+        waitLog.computeAndSetLogSize();
     }
 
     @Override
@@ -199,23 +207,26 @@
         this.lastAccess = lastAccess;
     }
 
-    public synchronized void waitForIO() throws HyracksDataException {
-        while (numActiveIOOps > 0) {
-            try {
-                /**
-                 * Will be Notified by {@link 
DatasetInfo#undeclareActiveIOOperation()}
-                 */
-                wait();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw HyracksDataException.create(e);
+    public void waitForIO() throws HyracksDataException {
+        logManager.log(waitLog);
+        synchronized (this) {
+            while (numActiveIOOps > 0) {
+                try {
+                    /**
+                     * Will be Notified by {@link 
DatasetInfo#undeclareActiveIOOperation()}
+                     */
+                    wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw HyracksDataException.create(e);
+                }
             }
-        }
-        if (numActiveIOOps < 0) {
-            if (LOGGER.isErrorEnabled()) {
-                LOGGER.error("Number of IO operations cannot be negative for 
dataset: " + this);
+            if (numActiveIOOps < 0) {
+                if (LOGGER.isErrorEnabled()) {
+                    LOGGER.error("Number of IO operations cannot be negative 
for dataset: " + this);
+                }
+                throw new IllegalStateException("Number of IO operations 
cannot be negative");
             }
-            throw new IllegalStateException("Number of IO operations cannot be 
negative");
         }
     }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f25e4f6..b715eec 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -239,7 +239,7 @@
         synchronized (datasets) {
             dsr = datasets.get(did);
             if (dsr == null) {
-                DatasetInfo dsInfo = new DatasetInfo(did);
+                DatasetInfo dsInfo = new DatasetInfo(did, logManager);
                 int partitions = 
MetadataIndexImmutableProperties.isMetadataDataset(did) ? 
METADATA_DATASETS_PARTITIONS
                         : numPartitions;
                 DatasetVirtualBufferCaches vbcs = new 
DatasetVirtualBufferCaches(did, storageProperties,
diff --git 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index f9f742a..befbeed 100644
--- 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -189,7 +189,7 @@
         properties.put("max-tolerance-component-count", 
String.valueOf(MAX_COMPONENT_COUNT));
         properties.put("max-mergable-component-size", 
String.valueOf(MAX_COMPONENT_SIZE));
 
-        DatasetInfo dsInfo = new DatasetInfo(DATASET_ID);
+        DatasetInfo dsInfo = new DatasetInfo(DATASET_ID, null);
         for (IndexInfo index : indexInfos) {
             dsInfo.addIndex(index.getResourceId(), index);
         }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 6208cef..3ada608 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -161,7 +161,8 @@
     }
 
     synchronized void syncAppendToLogTail(ILogRecord logRecord) {
-        if (logRecord.getLogSource() == LogSource.LOCAL && 
logRecord.getLogType() != LogType.FLUSH) {
+        if (logRecord.getLogSource() == LogSource.LOCAL && 
logRecord.getLogType() != LogType.FLUSH
+                && logRecord.getLogType() != LogType.WAIT) {
             ITransactionContext txnCtx = logRecord.getTxnCtx();
             if (txnCtx.getTxnState() == ITransactionManager.ABORTED && 
logRecord.getLogType() != LogType.ABORT) {
                 throw new ACIDException(

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2495
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ibfa883410cd24e0af54732f7ea6f1b4eb2184e8e
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-Reviewer: Till Westmann <[email protected]>
Gerrit-Reviewer: abdullah alamoudi <[email protected]>

Reply via email to