Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1896
Change subject: [ASTERIXDB-1995][STO] Abort write txn when index cannot be
flushed
......................................................................
[ASTERIXDB-1995][STO] Abort write txn when index cannot be flushed
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Fix LSM memory component state transition on flush/merge failure
- When index cannot be flushed, abort waiting threads
- Prevent NPE in MateralizerTaskState when file creation fails
- Check parent dirs creation for index metadata file
Change-Id: I28592c30c788f4a6f44db8b47a84bc77f6b3f8f3
---
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
6 files changed, 58 insertions(+), 16 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/96/1896/1
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index e530bc3..b8c9529 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.transaction.management.resource;
+import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -190,10 +192,14 @@
FileReference resourceFile = ioManager.resolve(relativePath);
if (resourceFile.getFile().exists()) {
throw new HyracksDataException("Duplicate resource: " +
resourceFile.getAbsolutePath());
- } else {
- resourceFile.getFile().getParentFile().mkdirs();
}
- resourceCache.put(resource.getPath(), resource);
+
+ final File parent = resourceFile.getFile().getParentFile();
+ if (!parent.exists()) {
+ if (!parent.mkdirs()) {
+ throw HyracksDataException.create(CANNOT_CREATE_FILE,
parent.getAbsolutePath());
+ }
+ }
try (FileOutputStream fos = new
FileOutputStream(resourceFile.getFile());
ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
@@ -203,6 +209,8 @@
throw new HyracksDataException(e);
}
+ resourceCache.put(resource.getPath(), resource);
+
//if replication enabled, send resource metadata info to remote nodes
if (isReplicationEnabled) {
createReplicationJob(ReplicationOperation.REPLICATE, resourceFile);
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 7dbade2..c2951fe 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -121,6 +121,7 @@
public static final int FOUND_MULTIPLE_TRANSACTIONS = 85;
public static final int UNRECOGNIZED_INDEX_COMPONENT_FILE = 86;
public static final int UNEQUAL_NUM_FILTERS_TREES = 87;
+ public static final int CANNOT_MODIFY_INDEX = 88;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index cd38917..472adad 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -104,5 +104,5 @@
85 = Found more than one transaction file in %1$s
86 = Found an unrecognized index file %1$s
87 = Unequal number of trees and filters found in %1$s
-
+88 = Cannot modify index [%1$s]
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index 918155d..34f3c2a 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -66,7 +66,9 @@
}
public void close() throws HyracksDataException {
- out.close();
+ if(out != null) {
+ out.close();
+ }
}
public void appendFrame(ByteBuffer buffer) throws HyracksDataException {
@@ -74,20 +76,25 @@
}
public void writeOut(IFrameWriter writer, IFrame frame, boolean failed)
throws HyracksDataException {
- RunFileReader in = out.createReader();
+ RunFileReader in = null;
+ if(out != null){
+ in = out.createReader();
+ }
writer.open();
try {
if (failed) {
writer.fail();
return;
}
- in.open();
- try {
- while (in.nextFrame(frame)) {
- writer.nextFrame(frame.getBuffer());
+ if(in != null) {
+ in.open();
+ try {
+ while (in.nextFrame(frame)) {
+ writer.nextFrame(frame.getBuffer());
+ }
+ } finally {
+ in.close();
}
- } finally {
- in.close();
}
} catch (Exception e) {
writer.fail();
@@ -96,10 +103,10 @@
try {
writer.close();
} finally {
- if (numConsumers.decrementAndGet() == 0) {
+ if (numConsumers.decrementAndGet() == 0 && out != null) {
out.getFileReference().delete();
}
}
}
}
-}
+}
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index 7cbe35f..1ee68d9 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -144,6 +144,11 @@
throw new IllegalStateException("Flush sees an illegal LSM
memory compoenent state: " + state);
}
readerCount--;
+ if (failedOperation) {
+ // if flush failed, return the component state to
READABLE_UNWRITABLE
+ state = ComponentState.READABLE_UNWRITABLE;
+ return;
+ }
if (readerCount == 0) {
state = ComponentState.INACTIVE;
} else {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 50eac67..d962ac2 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -47,6 +48,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -131,6 +133,10 @@
// Flush and merge operations should never reach this wait
call, because they are always try operations.
// If they fail to enter the components, then it means
that there are an ongoing flush/merge operation on
// the same components, so they should not proceed.
+ if (opType == LSMOperationType.MODIFICATION) {
+ // before waiting, make sure the index is in a
modifiable state to avoid waiting forever.
+ ensureIndexModifiable();
+ }
opTracker.wait();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
@@ -186,6 +192,7 @@
break;
case MERGE:
lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.MERGE);
+ break;
default:
break;
}
@@ -506,7 +513,7 @@
e.printStackTrace();
throw e;
} finally {
- exitComponents(ctx, LSMOperationType.FLUSH, newComponent, false);
+ exitComponents(ctx, LSMOperationType.FLUSH, newComponent,
newComponent == null);
operation.getCallback().afterFinalize(LSMOperationType.FLUSH,
newComponent);
}
if (LOGGER.isLoggable(Level.INFO)) {
@@ -553,7 +560,7 @@
e.printStackTrace();
throw e;
} finally {
- exitComponents(ctx, LSMOperationType.MERGE, newComponent, false);
+ exitComponents(ctx, LSMOperationType.MERGE, newComponent,
newComponent == null);
operation.getCallback().afterFinalize(LSMOperationType.MERGE,
newComponent);
}
if (LOGGER.isLoggable(Level.INFO)) {
@@ -660,4 +667,18 @@
exit(ctx);
}
}
+
+ /***
+ * Ensures the index is in a modifiable state
+ * @throws HyracksDataException if the index is not in a modifiable state
+ */
+ private void ensureIndexModifiable() throws HyracksDataException {
+ // find if there is any memory component which is in a writable state
or eventually will be in a writable state
+ final Optional<ILSMMemoryComponent> any =
lsmIndex.getMemoryComponents().stream()
+ .filter(c -> c.getState() == ComponentState.INACTIVE ||
c.getState() == ComponentState.READABLE_WRITABLE
+ || c.getState() ==
ComponentState.READABLE_UNWRITABLE_FLUSHING).findAny();
+ if (!any.isPresent()) {
+ throw HyracksDataException.create(ErrorCode.CANNOT_MODIFY_INDEX,
"Disk is full");
+ }
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1896
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I28592c30c788f4a6f44db8b47a84bc77f6b3f8f3
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>