Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1896

Change subject: [ASTERIXDB-1995][STO] Abort write txn when index cannot be 
flushed
......................................................................

[ASTERIXDB-1995][STO] Abort write txn when index cannot be flushed

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Fix LSM memory component state transition on flush/merge failure
- When index cannot be flushed, abort waiting threads
- Prevent NPE in MateralizerTaskState when file creation fails
- Check parent dirs creation for index metadata file

Change-Id: I28592c30c788f4a6f44db8b47a84bc77f6b3f8f3
---
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
6 files changed, 58 insertions(+), 16 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/96/1896/1

diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index e530bc3..b8c9529 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.transaction.management.resource;
 
+import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -190,10 +192,14 @@
         FileReference resourceFile = ioManager.resolve(relativePath);
         if (resourceFile.getFile().exists()) {
             throw new HyracksDataException("Duplicate resource: " + 
resourceFile.getAbsolutePath());
-        } else {
-            resourceFile.getFile().getParentFile().mkdirs();
         }
-        resourceCache.put(resource.getPath(), resource);
+
+        final File parent = resourceFile.getFile().getParentFile();
+        if (!parent.exists()) {
+            if (!parent.mkdirs()) {
+                throw HyracksDataException.create(CANNOT_CREATE_FILE, 
parent.getAbsolutePath());
+            }
+        }
 
         try (FileOutputStream fos = new 
FileOutputStream(resourceFile.getFile());
                 ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
@@ -203,6 +209,8 @@
             throw new HyracksDataException(e);
         }
 
+        resourceCache.put(resource.getPath(), resource);
+
         //if replication enabled, send resource metadata info to remote nodes
         if (isReplicationEnabled) {
             createReplicationJob(ReplicationOperation.REPLICATE, resourceFile);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 7dbade2..c2951fe 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -121,6 +121,7 @@
     public static final int FOUND_MULTIPLE_TRANSACTIONS = 85;
     public static final int UNRECOGNIZED_INDEX_COMPONENT_FILE = 86;
     public static final int UNEQUAL_NUM_FILTERS_TREES = 87;
+    public static final int CANNOT_MODIFY_INDEX = 88;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index cd38917..472adad 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -104,5 +104,5 @@
 85 = Found more than one transaction file in %1$s
 86 = Found an unrecognized index file %1$s
 87 = Unequal number of trees and filters found in %1$s
-
+88 = Cannot modify index [%1$s]
 10000 = The given rule collection %1$s is not an instance of the List class.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index 918155d..34f3c2a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -66,7 +66,9 @@
     }
 
     public void close() throws HyracksDataException {
-        out.close();
+        if(out != null) {
+            out.close();
+        }
     }
 
     public void appendFrame(ByteBuffer buffer) throws HyracksDataException {
@@ -74,20 +76,25 @@
     }
 
     public void writeOut(IFrameWriter writer, IFrame frame, boolean failed) 
throws HyracksDataException {
-        RunFileReader in = out.createReader();
+        RunFileReader in = null;
+        if(out != null){
+            in = out.createReader();
+        }
         writer.open();
         try {
             if (failed) {
                 writer.fail();
                 return;
             }
-            in.open();
-            try {
-                while (in.nextFrame(frame)) {
-                    writer.nextFrame(frame.getBuffer());
+            if(in != null) {
+                in.open();
+                try {
+                    while (in.nextFrame(frame)) {
+                        writer.nextFrame(frame.getBuffer());
+                    }
+                } finally {
+                    in.close();
                 }
-            } finally {
-                in.close();
             }
         } catch (Exception e) {
             writer.fail();
@@ -96,10 +103,10 @@
             try {
                 writer.close();
             } finally {
-                if (numConsumers.decrementAndGet() == 0) {
+                if (numConsumers.decrementAndGet() == 0 && out != null) {
                     out.getFileReference().delete();
                 }
             }
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index 7cbe35f..1ee68d9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -144,6 +144,11 @@
                     throw new IllegalStateException("Flush sees an illegal LSM 
memory compoenent state: " + state);
                 }
                 readerCount--;
+                if (failedOperation) {
+                    // if flush failed, return the component state to 
READABLE_UNWRITABLE
+                    state = ComponentState.READABLE_UNWRITABLE;
+                    return;
+                }
                 if (readerCount == 0) {
                     state = ComponentState.INACTIVE;
                 } else {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 50eac67..d962ac2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -47,6 +48,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -131,6 +133,10 @@
                     // Flush and merge operations should never reach this wait 
call, because they are always try operations.
                     // If they fail to enter the components, then it means 
that there are an ongoing flush/merge operation on
                     // the same components, so they should not proceed.
+                    if (opType == LSMOperationType.MODIFICATION) {
+                        // before waiting, make sure the index is in a 
modifiable state to avoid waiting forever.
+                        ensureIndexModifiable();
+                    }
                     opTracker.wait();
                 } catch (InterruptedException e) {
                     throw new HyracksDataException(e);
@@ -186,6 +192,7 @@
                 break;
             case MERGE:
                 
lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.MERGE);
+                break;
             default:
                 break;
         }
@@ -506,7 +513,7 @@
             e.printStackTrace();
             throw e;
         } finally {
-            exitComponents(ctx, LSMOperationType.FLUSH, newComponent, false);
+            exitComponents(ctx, LSMOperationType.FLUSH, newComponent, 
newComponent == null);
             operation.getCallback().afterFinalize(LSMOperationType.FLUSH, 
newComponent);
         }
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -553,7 +560,7 @@
             e.printStackTrace();
             throw e;
         } finally {
-            exitComponents(ctx, LSMOperationType.MERGE, newComponent, false);
+            exitComponents(ctx, LSMOperationType.MERGE, newComponent, 
newComponent == null);
             operation.getCallback().afterFinalize(LSMOperationType.MERGE, 
newComponent);
         }
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -660,4 +667,18 @@
             exit(ctx);
         }
     }
+
+    /***
+     * Ensures the index is in a modifiable state
+     * @throws HyracksDataException if the index is not in a modifiable state
+     */
+    private void ensureIndexModifiable() throws HyracksDataException {
+        // find if there is any memory component which is in a writable state 
or eventually will be in a writable state
+        final Optional<ILSMMemoryComponent> any = 
lsmIndex.getMemoryComponents().stream()
+                .filter(c -> c.getState() == ComponentState.INACTIVE || 
c.getState() == ComponentState.READABLE_WRITABLE
+                        || c.getState() == 
ComponentState.READABLE_UNWRITABLE_FLUSHING).findAny();
+        if (!any.isPresent()) {
+            throw HyracksDataException.create(ErrorCode.CANNOT_MODIFY_INDEX, 
"Disk is full");
+        }
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1896
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I28592c30c788f4a6f44db8b47a84bc77f6b3f8f3
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to