Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1926

Change subject: [ASTERIXDB-1990][ASTERIXDB-1991][STO] Ensure inactive 
components deletion
......................................................................

[ASTERIXDB-1990][ASTERIXDB-1991][STO] Ensure inactive components deletion

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- The inactive disk components should be deleted as part of
  the operation identiying them. i.e. the operation shouldn't
  be decleared complete until the deletion is complete.

Change-Id: I7cf7c76f0613467ab307eca42f5ac0834a60fa44
---
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
1 file changed, 127 insertions(+), 140 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/26/1926/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index b1005bd..5374f1d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -208,155 +208,110 @@
         if (!ctx.isAccessingComponents() && opType != LSMOperationType.FLUSH 
&& opType != LSMOperationType.MERGE) {
             return;
         }
-        List<ILSMDiskComponent> inactiveDiskComponents = null;
-        List<ILSMDiskComponent> inactiveDiskComponentsToBeDeleted = null;
-        try {
-            synchronized (opTracker) {
-                try {
+        synchronized (opTracker) {
+            try {
 
-                    /**
-                     * [flow control]
-                     * If merge operations are lagged according to the merge 
policy,
-                     * flushing in-memory components are hold until the merge 
operation catches up.
-                     * See PrefixMergePolicy.isMergeLagging() for more details.
-                     */
-                    if (opType == LSMOperationType.FLUSH) {
-                        opTracker.notifyAll();
-                        while (mergePolicy.isMergeLagging(lsmIndex)) {
-                            try {
-                                opTracker.wait();
-                            } catch (InterruptedException e) {
-                                //ignore
-                            }
-                        }
-                    } else if (opType == LSMOperationType.MERGE) {
-                        opTracker.notifyAll();
-                    }
-
-                    int i = 0;
-                    // First check if there is any action that is needed to be 
taken based on the state of each component.
-                    for (ILSMComponent c : ctx.getComponentHolder()) {
-                        boolean isMutableComponent = i == 0 && c.getType() == 
LSMComponentType.MEMORY ? true : false;
-                        c.threadExit(opType, failedOperation, 
isMutableComponent);
-                        if (c.getType() == LSMComponentType.MEMORY) {
-                            switch (c.getState()) {
-                                case READABLE_UNWRITABLE:
-                                    if (isMutableComponent && (opType == 
LSMOperationType.MODIFICATION
-                                            || opType == 
LSMOperationType.FORCE_MODIFICATION)) {
-                                        
lsmIndex.changeFlushStatusForCurrentMutableCompoent(true);
-                                    }
-                                    break;
-                                case INACTIVE:
-                                    ((AbstractLSMMemoryComponent) c).reset();
-                                    // Notify all waiting threads whenever the 
mutable component's has change to inactive. This is important because
-                                    // even though we switched the mutable 
components, it is possible that the component that we just switched
-                                    // to is still busy flushing its data to 
disk. Thus, the notification that was issued upon scheduling the flush
-                                    // is not enough.
-                                    opTracker.notifyAll();
-                                    break;
-                                default:
-                                    break;
-                            }
-                        } else {
-                            switch (c.getState()) {
-                                case INACTIVE:
-                                    
lsmIndex.addInactiveDiskComponent((AbstractLSMDiskComponent) c);
-                                    break;
-                                default:
-                                    break;
-                            }
-                        }
-                        i++;
-                    }
-                    ctx.setAccessingComponents(false);
-                    // Then, perform any action that is needed to be taken 
based on the operation type.
-                    switch (opType) {
-                        case FLUSH:
-                            // newComponent is null if the flush op. was not 
performed.
-                            if (newComponent != null) {
-                                lsmIndex.addDiskComponent(newComponent);
-                                if (replicationEnabled) {
-                                    componentsToBeReplicated.clear();
-                                    componentsToBeReplicated.add(newComponent);
-                                    
triggerReplication(componentsToBeReplicated, false, opType);
-                                }
-                                mergePolicy.diskComponentAdded(lsmIndex, 
false);
-                            }
-                            break;
-                        case MERGE:
-                            // newComponent is null if the merge op. was not 
performed.
-                            if (newComponent != null) {
-                                lsmIndex.subsumeMergedComponents(newComponent, 
ctx.getComponentHolder());
-                                if (replicationEnabled) {
-                                    componentsToBeReplicated.clear();
-                                    componentsToBeReplicated.add(newComponent);
-                                    
triggerReplication(componentsToBeReplicated, false, opType);
-                                }
-                                mergePolicy.diskComponentAdded(lsmIndex, 
fullMergeIsRequested.get());
-                            }
-                            break;
-                        default:
-                            break;
-                    }
-                } catch (Throwable e) {
-                    e.printStackTrace();
-                    throw e;
-                } finally {
-                    if (failedOperation && (opType == 
LSMOperationType.MODIFICATION
-                            || opType == LSMOperationType.FORCE_MODIFICATION)) 
{
-                        //When the operation failed, completeOperation() 
method must be called
-                        //in order to decrement active operation count which 
was incremented in beforeOperation() method.
-                        opTracker.completeOperation(lsmIndex, opType, 
ctx.getSearchOperationCallback(),
-                                ctx.getModificationCallback());
-                    } else {
-                        opTracker.afterOperation(lsmIndex, opType, 
ctx.getSearchOperationCallback(),
-                                ctx.getModificationCallback());
-                    }
-
-                    /*
-                     * = Inactive disk components lazy cleanup if any =
-                     * Prepare to cleanup inactive diskComponents which were 
old merged components
-                     * and not anymore accessed.
-                     * This cleanup is done outside of optracker synchronized 
block.
-                     */
-                    inactiveDiskComponents = 
lsmIndex.getInactiveDiskComponents();
-                    if (!inactiveDiskComponents.isEmpty()) {
-                        for (ILSMDiskComponent inactiveComp : 
inactiveDiskComponents) {
-                            if (inactiveComp.getFileReferenceCount() == 1) {
-                                if (inactiveDiskComponentsToBeDeleted == null) 
{
-                                    inactiveDiskComponentsToBeDeleted = new 
LinkedList<>();
-                                }
-                                
inactiveDiskComponentsToBeDeleted.add(inactiveComp);
-                            }
-                        }
-                        if (inactiveDiskComponentsToBeDeleted != null) {
-                            
inactiveDiskComponents.removeAll(inactiveDiskComponentsToBeDeleted);
+                /**
+                 * [flow control]
+                 * If merge operations are lagged according to the merge 
policy,
+                 * flushing in-memory components are hold until the merge 
operation catches up.
+                 * See PrefixMergePolicy.isMergeLagging() for more details.
+                 */
+                if (opType == LSMOperationType.FLUSH) {
+                    opTracker.notifyAll();
+                    while (mergePolicy.isMergeLagging(lsmIndex)) {
+                        try {
+                            opTracker.wait();
+                        } catch (InterruptedException e) {
+                            //ignore
                         }
                     }
+                } else if (opType == LSMOperationType.MERGE) {
+                    opTracker.notifyAll();
                 }
-            }
-        } finally {
-            /*
-             * cleanup inactive disk components if any
-             */
-            if (inactiveDiskComponentsToBeDeleted != null) {
-                try {
-                    //schedule a replication job to delete these inactive disk 
components from replicas
-                    if (replicationEnabled) {
-                        lsmIndex.scheduleReplication(null, 
inactiveDiskComponentsToBeDeleted, false,
-                                ReplicationOperation.DELETE, opType);
-                    }
 
-                    for (ILSMComponent c : inactiveDiskComponentsToBeDeleted) {
-                        ((AbstractLSMDiskComponent) c).destroy();
+                int i = 0;
+                // First check if there is any action that is needed to be 
taken based on the state of each component.
+                for (ILSMComponent c : ctx.getComponentHolder()) {
+                    boolean isMutableComponent = i == 0 && c.getType() == 
LSMComponentType.MEMORY ? true : false;
+                    c.threadExit(opType, failedOperation, isMutableComponent);
+                    if (c.getType() == LSMComponentType.MEMORY) {
+                        switch (c.getState()) {
+                            case READABLE_UNWRITABLE:
+                                if (isMutableComponent && (opType == 
LSMOperationType.MODIFICATION
+                                        || opType == 
LSMOperationType.FORCE_MODIFICATION)) {
+                                    
lsmIndex.changeFlushStatusForCurrentMutableCompoent(true);
+                                }
+                                break;
+                            case INACTIVE:
+                                ((AbstractLSMMemoryComponent) c).reset();
+                                // Notify all waiting threads whenever the 
mutable component's has change to inactive. This is important because
+                                // even though we switched the mutable 
components, it is possible that the component that we just switched
+                                // to is still busy flushing its data to disk. 
Thus, the notification that was issued upon scheduling the flush
+                                // is not enough.
+                                opTracker.notifyAll();
+                                break;
+                            default:
+                                break;
+                        }
+                    } else {
+                        switch (c.getState()) {
+                            case INACTIVE:
+                                
lsmIndex.addInactiveDiskComponent((AbstractLSMDiskComponent) c);
+                                break;
+                            default:
+                                break;
+                        }
                     }
-                } catch (Throwable e) {
-                    e.printStackTrace();
-                    throw e;
+                    i++;
+                }
+                ctx.setAccessingComponents(false);
+                // Then, perform any action that is needed to be taken based 
on the operation type.
+                switch (opType) {
+                    case FLUSH:
+                        // newComponent is null if the flush op. was not 
performed.
+                        if (newComponent != null) {
+                            lsmIndex.addDiskComponent(newComponent);
+                            if (replicationEnabled) {
+                                componentsToBeReplicated.clear();
+                                componentsToBeReplicated.add(newComponent);
+                                triggerReplication(componentsToBeReplicated, 
false, opType);
+                            }
+                            mergePolicy.diskComponentAdded(lsmIndex, false);
+                        }
+                        break;
+                    case MERGE:
+                        // newComponent is null if the merge op. was not 
performed.
+                        if (newComponent != null) {
+                            lsmIndex.subsumeMergedComponents(newComponent, 
ctx.getComponentHolder());
+                            if (replicationEnabled) {
+                                componentsToBeReplicated.clear();
+                                componentsToBeReplicated.add(newComponent);
+                                triggerReplication(componentsToBeReplicated, 
false, opType);
+                            }
+                            mergePolicy.diskComponentAdded(lsmIndex, 
fullMergeIsRequested.get());
+                        }
+                        break;
+                    default:
+                        break;
+                }
+                deleteInactiveDiskComponents(opType);
+            } catch (Throwable e) {
+                e.printStackTrace();
+                throw e;
+            } finally {
+                if (failedOperation && (opType == LSMOperationType.MODIFICATION
+                        || opType == LSMOperationType.FORCE_MODIFICATION)) {
+                    //When the operation failed, completeOperation() method 
must be called
+                    //in order to decrement active operation count which was 
incremented in beforeOperation() method.
+                    opTracker.completeOperation(lsmIndex, opType, 
ctx.getSearchOperationCallback(),
+                            ctx.getModificationCallback());
+                } else {
+                    opTracker.afterOperation(lsmIndex, opType, 
ctx.getSearchOperationCallback(),
+                            ctx.getModificationCallback());
                 }
             }
         }
-
     }
 
     @Override
@@ -701,4 +656,36 @@
         }
         throw 
HyracksDataException.create(ErrorCode.CANNOT_MODIFY_INDEX_DISK_IS_FULL);
     }
+
+    private void deleteInactiveDiskComponents(LSMOperationType opType) throws 
HyracksDataException {
+        final List<ILSMDiskComponent> inactiveDiskComponents = 
lsmIndex.getInactiveDiskComponents();
+        if (inactiveDiskComponents.isEmpty()) {
+            // nothing to delete
+            return;
+        }
+
+        /*
+         * Prepare to cleanup inactive diskComponents which were old merged 
components
+         * and not anymore accessed.
+         */
+        List<ILSMDiskComponent> inactiveDiskComponentsToBeDeleted = new 
LinkedList<>();
+        for (ILSMDiskComponent inactiveComp : inactiveDiskComponents) {
+            if (inactiveComp.getFileReferenceCount() == 1) {
+                inactiveDiskComponentsToBeDeleted.add(inactiveComp);
+            }
+        }
+
+        // cleanup inactive disk components if any
+        if (!inactiveDiskComponentsToBeDeleted.isEmpty()) {
+            //schedule a replication job to delete these inactive disk 
components from replicas
+            if (replicationEnabled) {
+                lsmIndex.scheduleReplication(null, 
inactiveDiskComponentsToBeDeleted, false,
+                        ReplicationOperation.DELETE, opType);
+            }
+            for (ILSMComponent c : inactiveDiskComponentsToBeDeleted) {
+                ((AbstractLSMDiskComponent) c).destroy();
+                inactiveDiskComponents.remove(c);
+            }
+        }
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1926
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7cf7c76f0613467ab307eca42f5ac0834a60fa44
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>

Reply via email to