Luo Chen has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1761

Change subject: Implemented Disk Components Alignment Based on IDs
......................................................................

Implemented Disk Components Alignment Based on IDs

- Added IDs (using LSN) for disk components. When a disk component is
flushed, its initial ID is set as the LSN. When components are merged,
the result ID is the union of all IDs to be merged.
- Changed the correlated merge policy to correlate disk components of
the primary and secondaries using IDs.

Change-Id: I768ee9ac0a8d3c99c631086093a6b778b2e7588e
---
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
A 
asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
M 
asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
A 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentID.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
A 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentID.java
16 files changed, 738 insertions(+), 81 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/61/1761/1

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index cd1d95e..d4a80ea 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -31,9 +31,11 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentID;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy;
 
 public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
 
@@ -57,66 +59,30 @@
         // all such components for which the sum of their sizes exceeds 
MaxMrgCompSz.  Schedule a merge of those components into a new component.
         // 2.  If a merge from 1 doesn't happen, see if the set of candidate 
components for merging exceeds MaxTolCompCnt.  If so, schedule
         // a merge all of the current candidates into a new single component.
-        List<ILSMDiskComponent> immutableComponents = new 
ArrayList<>(index.getImmutableComponents());
-        // Reverse the components order so that we look at components from 
oldest to newest.
-        Collections.reverse(immutableComponents);
 
-        for (ILSMDiskComponent c : immutableComponents) {
-            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+        if (fullMergeIsRequested) {
+            //full merge request is handled by each index separately, since it 
is possible that
+            //when a primary index wants to send full merge requests for all 
secondaries,
+            //one secondary index is being merged and the request cannot be 
scheduled
+            ArrayList<ILSMDiskComponent> immutableComponents = new 
ArrayList<>(index.getImmutableComponents());
+            if (!areComponentsReadableUnwritableState(immutableComponents)) {
                 return;
             }
-        }
-        if (fullMergeIsRequested) {
+
             ILSMIndexAccessor accessor =
                     index.createAccessor(NoOpOperationCallback.INSTANCE, 
NoOpOperationCallback.INSTANCE);
             accessor.scheduleFullMerge(index.getIOOperationCallback());
             return;
         }
+
         if (!index.isPrimaryIndex()) {
             return;
         }
-        long totalSize = 0;
-        int startIndex = -1;
-        int minNumComponents = Integer.MAX_VALUE;
-
-        Set<ILSMIndex> indexes = 
datasetLifecycleManager.getDatasetInfo(datasetId).getDatasetIndexes();
-        for (ILSMIndex lsmIndex : indexes) {
-            minNumComponents = Math.min(minNumComponents, 
lsmIndex.getImmutableComponents().size());
+        ArrayList<ILSMDiskComponent> immutableComponents = new 
ArrayList<>(index.getImmutableComponents());
+        if (!areComponentsReadableUnwritableState(immutableComponents)) {
+            return;
         }
-
-        for (int i = 0; i < minNumComponents; i++) {
-            ILSMComponent c = immutableComponents.get(i);
-            long componentSize = ((ILSMDiskComponent) c).getComponentSize();
-            if (componentSize > maxMergableComponentSize) {
-                startIndex = i;
-                totalSize = 0;
-                continue;
-            }
-            totalSize += componentSize;
-            boolean isLastComponent = i + 1 == minNumComponents ? true : false;
-            if (totalSize > maxMergableComponentSize
-                    || (isLastComponent && i - startIndex >= 
maxToleranceComponentCount)) {
-
-                for (ILSMIndex lsmIndex : indexes) {
-                    List<ILSMDiskComponent> reversedImmutableComponents =
-                            new ArrayList<>(lsmIndex.getImmutableComponents());
-                    // Reverse the components order so that we look at 
components from oldest to newest.
-                    Collections.reverse(reversedImmutableComponents);
-
-                    List<ILSMDiskComponent> mergableComponents = new 
ArrayList<>();
-                    for (int j = startIndex + 1; j <= i; j++) {
-                        
mergableComponents.add(reversedImmutableComponents.get(j));
-                    }
-                    // Reverse the components order back to its original order
-                    Collections.reverse(mergableComponents);
-
-                    ILSMIndexAccessor accessor =
-                            
lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, 
NoOpOperationCallback.INSTANCE);
-                    accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), 
mergableComponents);
-                }
-                break;
-            }
-        }
+        scheduleMerge(index);
     }
 
     @Override
@@ -127,9 +93,184 @@
                 
Integer.parseInt(properties.get(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT));
     }
 
+    /**
+     * Adopts the similar logic to decide merge lagging based on {@link 
PrefixMergePolicy}
+     *
+     * @throws HyracksDataException
+     */
     @Override
-    public boolean isMergeLagging(ILSMIndex index) {
-        //TODO implement properly according to the merge policy
+    public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException 
{
+        List<ILSMDiskComponent> immutableComponents = 
index.getImmutableComponents();
+        int mergableImmutableComponentCount = 
getMergableImmutableComponentCount(immutableComponents);
+
+        // [case 1]
+        if (mergableImmutableComponentCount < maxToleranceComponentCount) {
+            return false;
+        }
+
+        boolean isMergeOngoing = isMergeOngoing(immutableComponents);
+
+        if (isMergeOngoing) {
+            // [case 2]
+            return true;
+        }
+
+        if (index.isPrimaryIndex()) {
+            // [case 3]
+            // make sure that all components are of READABLE_UNWRITABLE state.
+            if (!areComponentsReadableUnwritableState(immutableComponents)) {
+                throw new IllegalStateException();
+            }
+            // schedule a merge operation
+            boolean isMergeTriggered = scheduleMerge(index);
+            if (!isMergeTriggered) {
+                throw new IllegalStateException();
+            }
+            return true;
+        } else {
+            //[case 3]
+            //if the index is secondary, then ignore the merge request (since 
merge should be coordinated by the primary),
+            //and here we simply treat it as not lagged.
+            return false;
+        }
+    }
+
+    private boolean scheduleMerge(ILSMIndex index) throws HyracksDataException 
{
+        List<ILSMDiskComponent> immutableComponents = new 
ArrayList<>(index.getImmutableComponents());
+        Collections.reverse(immutableComponents);
+
+        long totalSize = 0;
+        int startIndex = -1;
+
+        int numComponents = immutableComponents.size();
+
+        for (int i = 0; i < numComponents; i++) {
+            ILSMComponent c = immutableComponents.get(i);
+            long componentSize = ((ILSMDiskComponent) c).getComponentSize();
+            if (componentSize > maxMergableComponentSize || 
((ILSMDiskComponent) c).getComponentID().notFound()) {
+                startIndex = i;
+                totalSize = 0;
+                continue;
+            }
+            totalSize += componentSize;
+            boolean isLastComponent = i + 1 == numComponents ? true : false;
+            if (totalSize > maxMergableComponentSize
+                    || (isLastComponent && i - startIndex >= 
maxToleranceComponentCount)) {
+                //merge disk components from startIndex+1 to i
+                long minID = immutableComponents.get(startIndex + 
1).getComponentID().getMinID();
+                long maxID = 
immutableComponents.get(i).getComponentID().getMaxID();
+                Set<IndexInfo> indexInfos = 
datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos();
+                triggerScheduledMerge(minID, maxID, getIndexPartition(index, 
indexInfos), indexInfos);
+                return true;
+            }
+        }
         return false;
     }
+
+    /**
+     * Submit merge requests for all disk components within [minID, maxID] of 
all indexes of a given dataset in the given partition
+     * @param minID
+     * @param maxID
+     * @param partition
+     * @param indexInfos
+     * @throws HyracksDataException
+     */
+    private void triggerScheduledMerge(long minID, long maxID, int partition, 
Set<IndexInfo> indexInfos)
+            throws HyracksDataException {
+        for (IndexInfo info : indexInfos) {
+            if (info.getPartition() == partition) {
+                ILSMIndex lsmIndex = info.getIndex();
+
+                List<ILSMDiskComponent> immutableComponents = new 
ArrayList<>(lsmIndex.getImmutableComponents());
+                if (isMergeOngoing(immutableComponents)) {
+                    continue;
+                }
+                List<ILSMDiskComponent> mergableComponents = new ArrayList<>();
+
+                for (ILSMDiskComponent component : immutableComponents) {
+                    ILSMDiskComponentID id = component.getComponentID();
+                    if (id.notFound()) {
+                        //ignore invalid IDs
+                        continue;
+                    }
+                    if (id.getMinID() >= minID && id.getMaxID() <= maxID) {
+                        mergableComponents.add(component);
+                    }
+                    if (id.getMaxID() < minID) {
+                        //disk components are ordered from latest (with 
largest IDs) to oldest (with smallest IDs)
+                        //if the component.maxID < minID, then we can safely 
assume the rest disk components in the list
+                        break;
+                    }
+                }
+                ILSMIndexAccessor accessor =
+                        
lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, 
NoOpOperationCallback.INSTANCE);
+                accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), 
mergableComponents);
+            }
+        }
+    }
+
+    /**
+     * This method returns the number of mergable components among the given 
list
+     * of immutable components that are ordered from the latest component to 
order ones. A caller
+     * need to make sure the order in the list.
+     *
+     * @param immutableComponents
+     * @return the number of mergable component
+     * @throws HyracksDataException
+     */
+    private int getMergableImmutableComponentCount(List<ILSMDiskComponent> 
immutableComponents)
+            throws HyracksDataException {
+        int count = 0;
+        for (ILSMComponent c : immutableComponents) {
+            long componentSize = ((ILSMDiskComponent) c).getComponentSize();
+            //stop when the first non-mergable component is found.
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE || 
componentSize > maxMergableComponentSize
+                    || ((ILSMDiskComponent) c).getComponentID().notFound()) {
+                break;
+            }
+            ++count;
+        }
+        return count;
+    }
+
+    /**
+     * This method returns whether there is an ongoing merge operation or not 
by checking
+     * each component state of given components.
+     *
+     * @param immutableComponents
+     * @return true if there is an ongoing merge operation, false otherwise.
+     */
+    private boolean isMergeOngoing(List<ILSMDiskComponent> 
immutableComponents) {
+        int size = immutableComponents.size();
+        for (int i = 0; i < size; i++) {
+            if (immutableComponents.get(i).getState() == 
ComponentState.READABLE_MERGING) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * checks whether all given components are of READABLE_UNWRITABLE state
+     *
+     * @param immutableComponents
+     * @return true if all components are of READABLE_UNWRITABLE state, false 
otherwise.
+     */
+    private boolean 
areComponentsReadableUnwritableState(List<ILSMDiskComponent> 
immutableComponents) {
+        for (ILSMComponent c : immutableComponents) {
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) {
+        for (IndexInfo info : indexInfos) {
+            if (info.getIndex() == index) {
+                return info.getPartition();
+            }
+        }
+        return -1;
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 6a2cc56..71d4a96 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -76,6 +76,17 @@
         return datasetIndexes;
     }
 
+    public synchronized Set<IndexInfo> getDatsetIndexInfos() {
+        Set<IndexInfo> infos = new HashSet<>();
+        for (IndexInfo iInfo : getIndexes().values()) {
+            if (iInfo.isOpen()) {
+                infos.add(iInfo);
+            }
+        }
+
+        return infos;
+    }
+
     @Override
     public int compareTo(DatasetInfo i) {
         // sort by (isOpen, referenceCount, lastAccess) ascending, where true 
< false
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 9529366..51a535a 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -93,12 +93,12 @@
     public synchronized void register(String resourcePath, IIndex index) 
throws HyracksDataException {
         validateDatasetLifecycleManagerState();
         int did = getDIDfromResourcePath(resourcePath);
-        long resourceID = getResourceIDfromResourcePath(resourcePath);
+        LocalResource resource = resourceRepository.get(resourcePath);
         DatasetResource datasetResource = datasets.get(did);
         if (datasetResource == null) {
             datasetResource = getDatasetLifecycle(did);
         }
-        datasetResource.register(resourceID, index);
+        datasetResource.register(resource, index);
     }
 
     public int getDIDfromResourcePath(String resourcePath) throws 
HyracksDataException {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index a880ce2..f2f3b93 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -20,9 +20,11 @@
 
 import java.util.Map;
 
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.LocalResource;
 
 /**
  * A dataset can be in one of two states { EVICTED , LOADED }.
@@ -41,8 +43,7 @@
     private final PrimaryIndexOperationTracker datasetPrimaryOpTracker;
     private final DatasetVirtualBufferCaches datasetVirtualBufferCaches;
 
-    public DatasetResource(DatasetInfo datasetInfo,
-            PrimaryIndexOperationTracker datasetPrimaryOpTracker,
+    public DatasetResource(DatasetInfo datasetInfo, 
PrimaryIndexOperationTracker datasetPrimaryOpTracker,
             DatasetVirtualBufferCaches datasetVirtualBufferCaches) {
         this.datasetInfo = datasetInfo;
         this.datasetPrimaryOpTracker = datasetPrimaryOpTracker;
@@ -86,7 +87,8 @@
         return (iInfo == null) ? null : iInfo.getIndex();
     }
 
-    public void register(long resourceID, IIndex index) throws 
HyracksDataException {
+    public void register(LocalResource resource, IIndex index) throws 
HyracksDataException {
+        long resourceID = resource.getId();
         if (!datasetInfo.isRegistered()) {
             synchronized (datasetInfo) {
                 if (!datasetInfo.isRegistered()) {
@@ -102,8 +104,8 @@
         if (index == null) {
             throw new HyracksDataException("Attempt to register a null index");
         }
-        datasetInfo.getIndexes().put(resourceID,
-                new IndexInfo((ILSMIndex) index, datasetInfo.getDatasetID(), 
resourceID));
+        datasetInfo.getIndexes().put(resourceID, new IndexInfo((ILSMIndex) 
index, datasetInfo.getDatasetID(),
+                resourceID, ((DatasetLocalResource) 
resource.getResource()).getPartition()));
     }
 
     public DatasetInfo getDatasetInfo() {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
index 34ccce0..56f15e8 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
@@ -22,13 +22,15 @@
 
 public class IndexInfo extends Info {
     private final ILSMIndex index;
-    private final long resourceId;
     private final int datasetId;
+    private final long resourceID;
+    private final int partition;
 
-    public IndexInfo(ILSMIndex index, int datasetId, long resourceId) {
+    public IndexInfo(ILSMIndex index, int datasetId, long resourceID, int 
partition) {
         this.index = index;
         this.datasetId = datasetId;
-        this.resourceId = resourceId;
+        this.resourceID = resourceID;
+        this.partition = partition;
     }
 
     public ILSMIndex getIndex() {
@@ -36,7 +38,11 @@
     }
 
     public long getResourceId() {
-        return resourceId;
+        return resourceID;
+    }
+
+    public int getPartition() {
+        return partition;
     }
 
     public int getDatasetId() {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index f903b65..8f5bad5 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -20,6 +20,8 @@
 package org.apache.asterix.common.ioopcallbacks;
 
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.LongPointable;
@@ -28,11 +30,16 @@
 import 
org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentID;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentID;
 
 // A single LSMIOOperationCallback per LSM index used to perform actions 
around Flush and Merge operations
 public abstract class AbstractLSMIOOperationCallback implements 
ILSMIOOperationCallback {
+    private static final Logger logger = 
Logger.getLogger(AbstractLSMIOOperationCallback.class.getName());
+
     public static final MutableArrayValueReference LSN_KEY = new 
MutableArrayValueReference("LSN".getBytes());
     public static final long INVALID = -1L;
 
@@ -100,6 +107,44 @@
         index.getMetadata().put(LSN_KEY, 
LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents)));
     }
 
+    public ILSMDiskComponentID getComponentID(ILSMDiskComponent component, 
List<ILSMComponent> oldComponents)
+            throws HyracksDataException {
+        if (oldComponents == null) {
+            //a flush operation
+            synchronized (this) {
+                long id = getComponentLSN(oldComponents);
+                if (id == 0) {
+                    logger.log(Level.WARNING, "Flushing a memory component 
without setting the LSN");
+                    id = ILSMDiskComponentID.NOT_FOUND;
+                }
+                return new LSMDiskComponentID(id, id);
+            }
+        } else {
+            long minID = Long.MAX_VALUE;
+            long maxID = Long.MIN_VALUE;
+            for (ILSMComponent oldComponent : oldComponents) {
+                ILSMDiskComponentID oldComponentID = ((ILSMDiskComponent) 
oldComponent).getComponentID();
+                if (oldComponentID.getMinID() < minID) {
+                    minID = oldComponentID.getMinID();
+                }
+                if (oldComponentID.getMaxID() > maxID) {
+                    maxID = oldComponentID.getMaxID();
+                }
+            }
+            return new LSMDiskComponentID(minID, maxID);
+        }
+    }
+
+    public void putComponentIDIntoMetadata(ILSMDiskComponent component, 
List<ILSMComponent> oldComponents)
+            throws HyracksDataException {
+        DiskComponentMetadata metadata = component.getMetadata();
+        ILSMDiskComponentID componentID = getComponentID(component, 
oldComponents);
+        metadata.put(ILSMDiskComponentID.COMPONENT_ID_MIN_KEY,
+                LongPointable.FACTORY.createPointable(componentID.getMinID()));
+        metadata.put(ILSMDiskComponentID.COMPONENT_ID_MAX_KEY,
+                LongPointable.FACTORY.createPointable(componentID.getMaxID()));
+    }
+
     public static long getTreeIndexLSN(ITreeIndex treeIndex) throws 
HyracksDataException {
         LongPointable pointable = new LongPointable();
         IMetadataPageManager metadataPageManager = (IMetadataPageManager) 
treeIndex.getPageManager();
@@ -111,6 +156,7 @@
         mutableLastLSNs[writeIndex] = lastLSN;
     }
 
+
     public void setFirstLSN(long firstLSN) {
         // We make sure that this method is only called on an empty component 
so the first LSN is not set incorrectly
         firstLSNs[writeIndex] = firstLSN;
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 173c962..e8934fd 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -45,6 +45,7 @@
         if (newComponent != null) {
             LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent) 
newComponent;
             putLSNIntoMetadata(btreeComponent, oldComponents);
+            putComponentIDIntoMetadata(newComponent, oldComponents);
             if (opType == LSMOperationType.MERGE) {
                 LongPointable markerLsn = LongPointable.FACTORY
                         
.createPointable(ComponentMetadataUtil.getLong(oldComponents.get(0).getMetadata(),
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index 6c987d6..e186569 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -36,6 +36,7 @@
         if (newComponent != null) {
             LSMBTreeWithBuddyDiskComponent btreeComponent = 
(LSMBTreeWithBuddyDiskComponent) newComponent;
             putLSNIntoMetadata(btreeComponent, oldComponents);
+            putComponentIDIntoMetadata(newComponent, oldComponents);
         }
     }
 
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 657d908..1428fe0 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -41,6 +41,7 @@
         if (newComponent != null) {
             LSMInvertedIndexDiskComponent invIndexComponent = 
(LSMInvertedIndexDiskComponent) newComponent;
             putLSNIntoMetadata(invIndexComponent, oldComponents);
+            putComponentIDIntoMetadata(invIndexComponent, oldComponents);
         }
     }
 
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 2dc06f7..55f1c60 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -41,6 +41,7 @@
         if (newComponent != null) {
             LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) 
newComponent;
             putLSNIntoMetadata(rtreeComponent, oldComponents);
+            putComponentIDIntoMetadata(rtreeComponent, oldComponents);
         }
     }
 
diff --git 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
new file mode 100644
index 0000000..bf8e8f4
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.context;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.context.CorrelatedPrefixMergePolicy;
+import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentID;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentID;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import junit.framework.TestCase;
+
+public class CorrelatedPrefixMergePolicyTest extends TestCase {
+
+    private final long DEFAULT_COMPONENT_SIZE = 1L;
+
+    private final int MAX_COMPONENT_SIZE = 3;
+
+    private final int MAX_COMPONENT_COUNT = 3;
+
+    private final int DATASET_ID = 1;
+
+    @Test
+    public void testBasic() {
+        try {
+            List<ILSMDiskComponentID> componentIDs =
+                    Arrays.asList(new LSMDiskComponentID(5, 5), new 
LSMDiskComponentID(4, 4),
+                            new LSMDiskComponentID(3, 3), new 
LSMDiskComponentID(2, 2), new LSMDiskComponentID(1, 1));
+
+            List<ILSMDiskComponentID> resultPrimaryIDs = new ArrayList<>();
+            IndexInfo primary = mockIndex(true, componentIDs, 
resultPrimaryIDs, 0);
+
+            List<ILSMDiskComponentID> resultSecondaryIDs = new ArrayList<>();
+            IndexInfo secondary = mockIndex(false, componentIDs, 
resultSecondaryIDs, 0);
+
+            ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+            policy.diskComponentAdded(secondary.getIndex(), false);
+            Assert.assertTrue(resultPrimaryIDs.isEmpty());
+            Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+            policy.diskComponentAdded(primary.getIndex(), false);
+
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(4, 4), 
new LSMDiskComponentID(3, 3),
+                    new LSMDiskComponentID(2, 2), new LSMDiskComponentID(1, 
1)), resultPrimaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(4, 4), 
new LSMDiskComponentID(3, 3),
+                    new LSMDiskComponentID(2, 2), new LSMDiskComponentID(1, 
1)), resultSecondaryIDs);
+
+        } catch (HyracksDataException e) {
+            Assert.fail(e.getMessage());
+        }
+
+    }
+
+    @Test
+    public void testIDIntervals() {
+        try {
+            List<ILSMDiskComponentID> componentIDs = Arrays.asList(new 
LSMDiskComponentID(40, 50),
+                    new LSMDiskComponentID(30, 35), new LSMDiskComponentID(25, 
29), new LSMDiskComponentID(20, 24),
+                    new LSMDiskComponentID(10, 19));
+
+            List<ILSMDiskComponentID> resultPrimaryIDs = new ArrayList<>();
+            IndexInfo primary = mockIndex(true, componentIDs, 
resultPrimaryIDs, 0);
+
+            List<ILSMDiskComponentID> resultSecondaryIDs = new ArrayList<>();
+            IndexInfo secondary = mockIndex(false, componentIDs, 
resultSecondaryIDs, 0);
+
+            ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+            policy.diskComponentAdded(secondary.getIndex(), false);
+            Assert.assertTrue(resultPrimaryIDs.isEmpty());
+            Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+            policy.diskComponentAdded(primary.getIndex(), false);
+
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35), 
new LSMDiskComponentID(25, 29),
+                    new LSMDiskComponentID(20, 24), new LSMDiskComponentID(10, 
19)), resultPrimaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35), 
new LSMDiskComponentID(25, 29),
+                    new LSMDiskComponentID(20, 24), new LSMDiskComponentID(10, 
19)), resultSecondaryIDs);
+
+        } catch (HyracksDataException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testSecondaryMissing() {
+        try {
+            List<ILSMDiskComponentID> primaryComponentIDs = Arrays.asList(new 
LSMDiskComponentID(40, 50),
+                    new LSMDiskComponentID(30, 35), new LSMDiskComponentID(25, 
29), new LSMDiskComponentID(20, 24),
+                    new LSMDiskComponentID(10, 19));
+            List<ILSMDiskComponentID> resultPrimaryIDs = new ArrayList<>();
+            IndexInfo primary = mockIndex(true, primaryComponentIDs, 
resultPrimaryIDs, 0);
+
+            List<ILSMDiskComponentID> secondaryComponentIDs = 
Arrays.asList(new LSMDiskComponentID(30, 35),
+                    new LSMDiskComponentID(25, 29), new LSMDiskComponentID(20, 
24));
+            List<ILSMDiskComponentID> resultSecondaryIDs = new ArrayList<>();
+            IndexInfo secondary = mockIndex(false, secondaryComponentIDs, 
resultSecondaryIDs, 0);
+
+            ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+
+            policy.diskComponentAdded(secondary.getIndex(), false);
+            Assert.assertTrue(resultPrimaryIDs.isEmpty());
+            Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+            policy.diskComponentAdded(primary.getIndex(), false);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35), 
new LSMDiskComponentID(25, 29),
+                    new LSMDiskComponentID(20, 24), new LSMDiskComponentID(10, 
19)), resultPrimaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35), 
new LSMDiskComponentID(25, 29),
+                    new LSMDiskComponentID(20, 24)), resultSecondaryIDs);
+
+        } catch (HyracksDataException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testPrimaryNotFound() {
+        try {
+            List<ILSMDiskComponentID> primaryComponentIDs = Arrays.asList(new 
LSMDiskComponentID(40, 50),
+                    new LSMDiskComponentID(30, 35), new LSMDiskComponentID(25, 
29),
+                    new LSMDiskComponentID(ILSMDiskComponentID.NOT_FOUND, 
ILSMDiskComponentID.NOT_FOUND),
+                    new LSMDiskComponentID(10, 19));
+            List<ILSMDiskComponentID> resultPrimaryIDs = new ArrayList<>();
+            IndexInfo primary = mockIndex(true, primaryComponentIDs, 
resultPrimaryIDs, 0);
+
+            List<ILSMDiskComponentID> secondaryComponentIDs = 
Arrays.asList(new LSMDiskComponentID(30, 35),
+                    new LSMDiskComponentID(25, 29), new LSMDiskComponentID(20, 
24));
+            List<ILSMDiskComponentID> resultSecondaryIDs = new ArrayList<>();
+            IndexInfo secondary = mockIndex(false, secondaryComponentIDs, 
resultSecondaryIDs, 0);
+
+            ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+
+            policy.diskComponentAdded(secondary.getIndex(), false);
+            Assert.assertTrue(resultPrimaryIDs.isEmpty());
+            Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+            policy.diskComponentAdded(primary.getIndex(), false);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(40, 50), 
new LSMDiskComponentID(30, 35),
+                    new LSMDiskComponentID(25, 29)), resultPrimaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35), 
new LSMDiskComponentID(25, 29)),
+                    resultSecondaryIDs);
+        } catch (HyracksDataException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testSecondaryNotFound() {
+        try {
+            List<ILSMDiskComponentID> primaryComponentIDs = Arrays.asList(new 
LSMDiskComponentID(40, 50),
+                    new LSMDiskComponentID(30, 35), new LSMDiskComponentID(25, 
29), new LSMDiskComponentID(20, 24),
+                    new LSMDiskComponentID(10, 19));
+            List<ILSMDiskComponentID> resultPrimaryIDs = new ArrayList<>();
+            IndexInfo primary = mockIndex(true, primaryComponentIDs, 
resultPrimaryIDs, 0);
+
+            List<ILSMDiskComponentID> secondaryComponentIDs = 
Arrays.asList(new LSMDiskComponentID(30, 35),
+                    new LSMDiskComponentID(ILSMDiskComponentID.NOT_FOUND, 
ILSMDiskComponentID.NOT_FOUND),
+                    new LSMDiskComponentID(20, 24));
+            List<ILSMDiskComponentID> resultSecondaryIDs = new ArrayList<>();
+            IndexInfo secondary = mockIndex(false, secondaryComponentIDs, 
resultSecondaryIDs, 0);
+
+            ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+
+            policy.diskComponentAdded(secondary.getIndex(), false);
+            Assert.assertTrue(resultPrimaryIDs.isEmpty());
+            Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+            policy.diskComponentAdded(primary.getIndex(), false);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35), 
new LSMDiskComponentID(25, 29),
+                    new LSMDiskComponentID(20, 24), new LSMDiskComponentID(10, 
19)), resultPrimaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35), 
new LSMDiskComponentID(20, 24)),
+                    resultSecondaryIDs);
+
+        } catch (HyracksDataException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testMultiPartition() {
+        try {
+            List<ILSMDiskComponentID> componentIDs = Arrays.asList(new 
LSMDiskComponentID(40, 50),
+                    new LSMDiskComponentID(30, 35), new LSMDiskComponentID(25, 
29), new LSMDiskComponentID(20, 24),
+                    new LSMDiskComponentID(10, 19));
+
+            List<ILSMDiskComponentID> resultPrimaryIDs = new ArrayList<>();
+            IndexInfo primary = mockIndex(true, componentIDs, 
resultPrimaryIDs, 0);
+
+            List<ILSMDiskComponentID> resultSecondaryIDs = new ArrayList<>();
+            IndexInfo secondary = mockIndex(false, componentIDs, 
resultSecondaryIDs, 0);
+
+            List<ILSMDiskComponentID> resultSecondaryIDs1 = new ArrayList<>();
+            IndexInfo secondary1 = mockIndex(false, componentIDs, 
resultSecondaryIDs, 1);
+
+            ILSMMergePolicy policy = mockMergePolicy(primary, secondary, 
secondary1);
+            policy.diskComponentAdded(secondary.getIndex(), false);
+            Assert.assertTrue(resultPrimaryIDs.isEmpty());
+            Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+            policy.diskComponentAdded(primary.getIndex(), false);
+
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35), 
new LSMDiskComponentID(25, 29),
+                    new LSMDiskComponentID(20, 24), new LSMDiskComponentID(10, 
19)), resultPrimaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentID(30, 35), 
new LSMDiskComponentID(25, 29),
+                    new LSMDiskComponentID(20, 24), new LSMDiskComponentID(10, 
19)), resultSecondaryIDs);
+            Assert.assertTrue(resultSecondaryIDs1.isEmpty());
+        } catch (HyracksDataException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    private ILSMMergePolicy mockMergePolicy(IndexInfo... indexes) {
+        Map<String, String> properties = new HashMap<>();
+        
properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT, 
String.valueOf(MAX_COMPONENT_COUNT));
+        
properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_SIZE, 
String.valueOf(MAX_COMPONENT_SIZE));
+
+        Set<IndexInfo> indexInfos = new HashSet<>();
+        for (IndexInfo info : indexes) {
+            indexInfos.add(info);
+        }
+
+        DatasetInfo dsInfo = Mockito.mock(DatasetInfo.class);
+        Mockito.when(dsInfo.getDatsetIndexInfos()).thenReturn(indexInfos);
+
+        IDatasetLifecycleManager manager = 
Mockito.mock(IDatasetLifecycleManager.class);
+        Mockito.when(manager.getDatasetInfo(DATASET_ID)).thenReturn(dsInfo);
+
+        ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(manager, 
DATASET_ID);
+        policy.configure(properties);
+        return policy;
+    }
+
+    private IndexInfo mockIndex(boolean isPrimary, List<ILSMDiskComponentID> 
componentIDs,
+            List<ILSMDiskComponentID> resultComponentIDs, int partition) 
throws HyracksDataException {
+        List<ILSMDiskComponent> components = new ArrayList<>();
+        for (ILSMDiskComponentID id : componentIDs) {
+            ILSMDiskComponent component = 
Mockito.mock(ILSMDiskComponent.class);
+            Mockito.when(component.getComponentID()).thenReturn(id);
+            
Mockito.when(component.getComponentSize()).thenReturn(DEFAULT_COMPONENT_SIZE);
+            
Mockito.when(component.getState()).thenReturn(ComponentState.READABLE_UNWRITABLE);
+            components.add(component);
+        }
+
+        ILSMIndex index = Mockito.mock(ILSMIndex.class);
+        Mockito.when(index.getImmutableComponents()).thenReturn(components);
+
+        ILSMIndexAccessor accessor = Mockito.mock(ILSMIndexAccessor.class);
+        Mockito.doAnswer(new Answer<Void>() {
+
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                List<ILSMDiskComponent> mergedComponents = 
invocation.getArgumentAt(1, List.class);
+                mergedComponents.forEach(component -> {
+                    try {
+                        resultComponentIDs.add(component.getComponentID());
+                    } catch (HyracksDataException e) {
+                        e.printStackTrace();
+                    }
+                });
+                return null;
+            }
+        
}).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class),
+                Mockito.anyListOf(ILSMDiskComponent.class));
+
+        
Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class),
+                
Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor);
+        Mockito.when(index.isPrimaryIndex()).thenReturn(isPrimary);
+
+        return new IndexInfo(index, DATASET_ID, 0, partition);
+    }
+
+}
diff --git 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
index 9be5837..0243a63 100644
--- 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
+++ 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
@@ -60,8 +60,8 @@
     public void testMemoryManager() {
         ActiveProperties afp = Mockito.mock(ActiveProperties.class);
         
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-        ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
-                DEFAULT_FRAME_SIZE);
+        ConcurrentFramePool fmm =
+                new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
         int i = 0;
         while (fmm.get() != null) {
             i++;
@@ -75,8 +75,8 @@
         try {
             ActiveProperties afp = Mockito.mock(ActiveProperties.class);
             
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
-                    DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
             FixedSizeAllocator[] runners = new FixedSizeAllocator[NUM_THREADS];
             Thread[] threads = new Thread[NUM_THREADS];
             Arrays.parallelSetAll(runners, (int i) -> new 
FixedSizeAllocator(fmm));
@@ -106,8 +106,8 @@
         try {
             ActiveProperties afp = Mockito.mock(ActiveProperties.class);
             
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
-                    DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
             Random random = new Random();
             int i = 0;
             int req;
@@ -141,8 +141,8 @@
         try {
             ActiveProperties afp = Mockito.mock(ActiveProperties.class);
             
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
-                    DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
 
             VarSizeAllocator[] runners = new VarSizeAllocator[NUM_THREADS];
             Thread[] threads = new Thread[NUM_THREADS];
@@ -180,8 +180,8 @@
     public void testAcquireReleaseMemoryManager() throws HyracksDataException {
         ActiveProperties afp = Mockito.mock(ActiveProperties.class);
         
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-        ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
-                DEFAULT_FRAME_SIZE);
+        ConcurrentFramePool fmm =
+                new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
         Random random = new Random();
         ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
         while (true) {
@@ -213,8 +213,8 @@
         try {
             ActiveProperties afp = Mockito.mock(ActiveProperties.class);
             
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
-                    DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
             FixedSizeGoodAllocator[] runners = new 
FixedSizeGoodAllocator[NUM_THREADS];
             Thread[] threads = new Thread[NUM_THREADS];
             Arrays.parallelSetAll(runners, (int i) -> new 
FixedSizeGoodAllocator(fmm));
@@ -244,8 +244,8 @@
         try {
             ActiveProperties afp = Mockito.mock(ActiveProperties.class);
             
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
-                    DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
             Random random = new Random();
             ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
             int i = 0;
@@ -297,8 +297,8 @@
         try {
             ActiveProperties afp = Mockito.mock(ActiveProperties.class);
             
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
-                    DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
             VarSizeGoodAllocator[] runners = new 
VarSizeGoodAllocator[NUM_THREADS];
             Thread[] threads = new Thread[NUM_THREADS];
             Arrays.parallelSetAll(runners, (int i) -> new 
VarSizeGoodAllocator(fmm));
@@ -333,8 +333,8 @@
         try {
             ActiveProperties afp = Mockito.mock(ActiveProperties.class);
             
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
-                    DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
             int i = 0;
             ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
             LinkedBlockingDeque<ByteBuffer> buffers = new 
LinkedBlockingDeque<>();
@@ -399,8 +399,8 @@
         try {
             ActiveProperties afp = Mockito.mock(ActiveProperties.class);
             
Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(),
-                    DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", 
afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
             int i = 0;
             ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
             LinkedBlockingDeque<ByteBuffer> buffers = new 
LinkedBlockingDeque<>();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
index 335e84e..c863e5a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
@@ -46,4 +46,7 @@
      * @throws HyracksDataException
      */
     void destroy() throws HyracksDataException;
+
+    ILSMDiskComponentID getComponentID() throws HyracksDataException;
+
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentID.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentID.java
new file mode 100644
index 0000000..3f6b516
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentID.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import 
org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+
+public interface ILSMDiskComponentID {
+
+    public static final long NOT_FOUND = -1;
+
+    public static final MutableArrayValueReference COMPONENT_ID_MIN_KEY =
+            new MutableArrayValueReference("Component_ID_Min".getBytes());
+
+    public static final MutableArrayValueReference COMPONENT_ID_MAX_KEY =
+            new MutableArrayValueReference("Component_ID_Max".getBytes());
+
+    long getMinID();
+
+    long getMaxID();
+
+    default boolean notFound() {
+        return getMinID() == NOT_FOUND || getMaxID() == NOT_FOUND;
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 508a6cc..9371437 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -18,13 +18,19 @@
  */
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
+import java.util.logging.Logger;
+
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentID;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil;
 
 public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent 
implements ILSMDiskComponent {
+
+    private static final Logger LOGGER = 
Logger.getLogger(AbstractLSMDiskComponent.class.getName());
 
     private final DiskComponentMetadata metadata;
 
@@ -96,4 +102,14 @@
     public DiskComponentMetadata getMetadata() {
         return metadata;
     }
+
+    @Override
+    public ILSMDiskComponentID getComponentID() throws HyracksDataException {
+        long minID = ComponentMetadataUtil.getLong(metadata, 
ILSMDiskComponentID.COMPONENT_ID_MIN_KEY,
+                ILSMDiskComponentID.NOT_FOUND);
+        long maxID = ComponentMetadataUtil.getLong(metadata, 
ILSMDiskComponentID.COMPONENT_ID_MAX_KEY,
+                ILSMDiskComponentID.NOT_FOUND);
+        //TODO: do we need to throw an exception when ID is not found?
+        return new LSMDiskComponentID(minID, maxID);
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentID.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentID.java
new file mode 100644
index 0000000..dedff15
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentID.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentID;
+
+public class LSMDiskComponentID implements ILSMDiskComponentID {
+
+    private final long minID;
+
+    private final long maxID;
+
+    public LSMDiskComponentID(long minID, long maxID) {
+        this.minID = minID;
+        this.maxID = maxID;
+    }
+
+    @Override
+    public long getMinID() {
+        return this.minID;
+    }
+
+    @Override
+    public long getMaxID() {
+        return this.maxID;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + minID + "," + maxID + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (int) (maxID ^ (maxID >>> 32));
+        result = prime * result + (int) (minID ^ (minID >>> 32));
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        LSMDiskComponentID other = (LSMDiskComponentID) obj;
+        if (maxID != other.maxID)
+            return false;
+        if (minID != other.minID)
+            return false;
+        return true;
+    }
+
+}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1761
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I768ee9ac0a8d3c99c631086093a6b778b2e7588e
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Luo Chen <[email protected]>

Reply via email to