Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1926
Change subject: [ASTERIXDB-1990][ASTERIXDB-1991][STO] Ensure inactive
components deletion
......................................................................
[ASTERIXDB-1990][ASTERIXDB-1991][STO] Ensure inactive components deletion
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- The inactive disk components should be deleted as part of
the operation identiying them. i.e. the operation shouldn't
be decleared complete until the deletion is complete.
Change-Id: I7cf7c76f0613467ab307eca42f5ac0834a60fa44
---
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
1 file changed, 127 insertions(+), 140 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/26/1926/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index b1005bd..5374f1d 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -208,155 +208,110 @@
if (!ctx.isAccessingComponents() && opType != LSMOperationType.FLUSH
&& opType != LSMOperationType.MERGE) {
return;
}
- List<ILSMDiskComponent> inactiveDiskComponents = null;
- List<ILSMDiskComponent> inactiveDiskComponentsToBeDeleted = null;
- try {
- synchronized (opTracker) {
- try {
+ synchronized (opTracker) {
+ try {
- /**
- * [flow control]
- * If merge operations are lagged according to the merge
policy,
- * flushing in-memory components are hold until the merge
operation catches up.
- * See PrefixMergePolicy.isMergeLagging() for more details.
- */
- if (opType == LSMOperationType.FLUSH) {
- opTracker.notifyAll();
- while (mergePolicy.isMergeLagging(lsmIndex)) {
- try {
- opTracker.wait();
- } catch (InterruptedException e) {
- //ignore
- }
- }
- } else if (opType == LSMOperationType.MERGE) {
- opTracker.notifyAll();
- }
-
- int i = 0;
- // First check if there is any action that is needed to be
taken based on the state of each component.
- for (ILSMComponent c : ctx.getComponentHolder()) {
- boolean isMutableComponent = i == 0 && c.getType() ==
LSMComponentType.MEMORY ? true : false;
- c.threadExit(opType, failedOperation,
isMutableComponent);
- if (c.getType() == LSMComponentType.MEMORY) {
- switch (c.getState()) {
- case READABLE_UNWRITABLE:
- if (isMutableComponent && (opType ==
LSMOperationType.MODIFICATION
- || opType ==
LSMOperationType.FORCE_MODIFICATION)) {
-
lsmIndex.changeFlushStatusForCurrentMutableCompoent(true);
- }
- break;
- case INACTIVE:
- ((AbstractLSMMemoryComponent) c).reset();
- // Notify all waiting threads whenever the
mutable component's has change to inactive. This is important because
- // even though we switched the mutable
components, it is possible that the component that we just switched
- // to is still busy flushing its data to
disk. Thus, the notification that was issued upon scheduling the flush
- // is not enough.
- opTracker.notifyAll();
- break;
- default:
- break;
- }
- } else {
- switch (c.getState()) {
- case INACTIVE:
-
lsmIndex.addInactiveDiskComponent((AbstractLSMDiskComponent) c);
- break;
- default:
- break;
- }
- }
- i++;
- }
- ctx.setAccessingComponents(false);
- // Then, perform any action that is needed to be taken
based on the operation type.
- switch (opType) {
- case FLUSH:
- // newComponent is null if the flush op. was not
performed.
- if (newComponent != null) {
- lsmIndex.addDiskComponent(newComponent);
- if (replicationEnabled) {
- componentsToBeReplicated.clear();
- componentsToBeReplicated.add(newComponent);
-
triggerReplication(componentsToBeReplicated, false, opType);
- }
- mergePolicy.diskComponentAdded(lsmIndex,
false);
- }
- break;
- case MERGE:
- // newComponent is null if the merge op. was not
performed.
- if (newComponent != null) {
- lsmIndex.subsumeMergedComponents(newComponent,
ctx.getComponentHolder());
- if (replicationEnabled) {
- componentsToBeReplicated.clear();
- componentsToBeReplicated.add(newComponent);
-
triggerReplication(componentsToBeReplicated, false, opType);
- }
- mergePolicy.diskComponentAdded(lsmIndex,
fullMergeIsRequested.get());
- }
- break;
- default:
- break;
- }
- } catch (Throwable e) {
- e.printStackTrace();
- throw e;
- } finally {
- if (failedOperation && (opType ==
LSMOperationType.MODIFICATION
- || opType == LSMOperationType.FORCE_MODIFICATION))
{
- //When the operation failed, completeOperation()
method must be called
- //in order to decrement active operation count which
was incremented in beforeOperation() method.
- opTracker.completeOperation(lsmIndex, opType,
ctx.getSearchOperationCallback(),
- ctx.getModificationCallback());
- } else {
- opTracker.afterOperation(lsmIndex, opType,
ctx.getSearchOperationCallback(),
- ctx.getModificationCallback());
- }
-
- /*
- * = Inactive disk components lazy cleanup if any =
- * Prepare to cleanup inactive diskComponents which were
old merged components
- * and not anymore accessed.
- * This cleanup is done outside of optracker synchronized
block.
- */
- inactiveDiskComponents =
lsmIndex.getInactiveDiskComponents();
- if (!inactiveDiskComponents.isEmpty()) {
- for (ILSMDiskComponent inactiveComp :
inactiveDiskComponents) {
- if (inactiveComp.getFileReferenceCount() == 1) {
- if (inactiveDiskComponentsToBeDeleted == null)
{
- inactiveDiskComponentsToBeDeleted = new
LinkedList<>();
- }
-
inactiveDiskComponentsToBeDeleted.add(inactiveComp);
- }
- }
- if (inactiveDiskComponentsToBeDeleted != null) {
-
inactiveDiskComponents.removeAll(inactiveDiskComponentsToBeDeleted);
+ /**
+ * [flow control]
+ * If merge operations are lagged according to the merge
policy,
+ * flushing in-memory components are hold until the merge
operation catches up.
+ * See PrefixMergePolicy.isMergeLagging() for more details.
+ */
+ if (opType == LSMOperationType.FLUSH) {
+ opTracker.notifyAll();
+ while (mergePolicy.isMergeLagging(lsmIndex)) {
+ try {
+ opTracker.wait();
+ } catch (InterruptedException e) {
+ //ignore
}
}
+ } else if (opType == LSMOperationType.MERGE) {
+ opTracker.notifyAll();
}
- }
- } finally {
- /*
- * cleanup inactive disk components if any
- */
- if (inactiveDiskComponentsToBeDeleted != null) {
- try {
- //schedule a replication job to delete these inactive disk
components from replicas
- if (replicationEnabled) {
- lsmIndex.scheduleReplication(null,
inactiveDiskComponentsToBeDeleted, false,
- ReplicationOperation.DELETE, opType);
- }
- for (ILSMComponent c : inactiveDiskComponentsToBeDeleted) {
- ((AbstractLSMDiskComponent) c).destroy();
+ int i = 0;
+ // First check if there is any action that is needed to be
taken based on the state of each component.
+ for (ILSMComponent c : ctx.getComponentHolder()) {
+ boolean isMutableComponent = i == 0 && c.getType() ==
LSMComponentType.MEMORY ? true : false;
+ c.threadExit(opType, failedOperation, isMutableComponent);
+ if (c.getType() == LSMComponentType.MEMORY) {
+ switch (c.getState()) {
+ case READABLE_UNWRITABLE:
+ if (isMutableComponent && (opType ==
LSMOperationType.MODIFICATION
+ || opType ==
LSMOperationType.FORCE_MODIFICATION)) {
+
lsmIndex.changeFlushStatusForCurrentMutableCompoent(true);
+ }
+ break;
+ case INACTIVE:
+ ((AbstractLSMMemoryComponent) c).reset();
+ // Notify all waiting threads whenever the
mutable component's has change to inactive. This is important because
+ // even though we switched the mutable
components, it is possible that the component that we just switched
+ // to is still busy flushing its data to disk.
Thus, the notification that was issued upon scheduling the flush
+ // is not enough.
+ opTracker.notifyAll();
+ break;
+ default:
+ break;
+ }
+ } else {
+ switch (c.getState()) {
+ case INACTIVE:
+
lsmIndex.addInactiveDiskComponent((AbstractLSMDiskComponent) c);
+ break;
+ default:
+ break;
+ }
}
- } catch (Throwable e) {
- e.printStackTrace();
- throw e;
+ i++;
+ }
+ ctx.setAccessingComponents(false);
+ // Then, perform any action that is needed to be taken based
on the operation type.
+ switch (opType) {
+ case FLUSH:
+ // newComponent is null if the flush op. was not
performed.
+ if (newComponent != null) {
+ lsmIndex.addDiskComponent(newComponent);
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(newComponent);
+ triggerReplication(componentsToBeReplicated,
false, opType);
+ }
+ mergePolicy.diskComponentAdded(lsmIndex, false);
+ }
+ break;
+ case MERGE:
+ // newComponent is null if the merge op. was not
performed.
+ if (newComponent != null) {
+ lsmIndex.subsumeMergedComponents(newComponent,
ctx.getComponentHolder());
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(newComponent);
+ triggerReplication(componentsToBeReplicated,
false, opType);
+ }
+ mergePolicy.diskComponentAdded(lsmIndex,
fullMergeIsRequested.get());
+ }
+ break;
+ default:
+ break;
+ }
+ deleteInactiveDiskComponents(opType);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ if (failedOperation && (opType == LSMOperationType.MODIFICATION
+ || opType == LSMOperationType.FORCE_MODIFICATION)) {
+ //When the operation failed, completeOperation() method
must be called
+ //in order to decrement active operation count which was
incremented in beforeOperation() method.
+ opTracker.completeOperation(lsmIndex, opType,
ctx.getSearchOperationCallback(),
+ ctx.getModificationCallback());
+ } else {
+ opTracker.afterOperation(lsmIndex, opType,
ctx.getSearchOperationCallback(),
+ ctx.getModificationCallback());
}
}
}
-
}
@Override
@@ -701,4 +656,36 @@
}
throw
HyracksDataException.create(ErrorCode.CANNOT_MODIFY_INDEX_DISK_IS_FULL);
}
+
+ private void deleteInactiveDiskComponents(LSMOperationType opType) throws
HyracksDataException {
+ final List<ILSMDiskComponent> inactiveDiskComponents =
lsmIndex.getInactiveDiskComponents();
+ if (inactiveDiskComponents.isEmpty()) {
+ // nothing to delete
+ return;
+ }
+
+ /*
+ * Prepare to cleanup inactive diskComponents which were old merged
components
+ * and not anymore accessed.
+ */
+ List<ILSMDiskComponent> inactiveDiskComponentsToBeDeleted = new
LinkedList<>();
+ for (ILSMDiskComponent inactiveComp : inactiveDiskComponents) {
+ if (inactiveComp.getFileReferenceCount() == 1) {
+ inactiveDiskComponentsToBeDeleted.add(inactiveComp);
+ }
+ }
+
+ // cleanup inactive disk components if any
+ if (!inactiveDiskComponentsToBeDeleted.isEmpty()) {
+ //schedule a replication job to delete these inactive disk
components from replicas
+ if (replicationEnabled) {
+ lsmIndex.scheduleReplication(null,
inactiveDiskComponentsToBeDeleted, false,
+ ReplicationOperation.DELETE, opType);
+ }
+ for (ILSMComponent c : inactiveDiskComponentsToBeDeleted) {
+ ((AbstractLSMDiskComponent) c).destroy();
+ inactiveDiskComponents.remove(c);
+ }
+ }
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1926
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7cf7c76f0613467ab307eca42f5ac0834a60fa44
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>