>From Ritik Raj <[email protected]>:

Ritik Raj has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18367 )


Change subject: WIP: Merging pages ranges to reduce the network call
......................................................................

WIP: Merging pages ranges to reduce the network call

Details:
Merging page ranges to reduce the number of
network calls to object storage, with an aim
to reduce the fetching of unwanted pages.

Change-Id: I343abcc5d2c2f1e65e8566e2a346e0a66da11d17
---
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
A 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
A 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
4 files changed, 260 insertions(+), 4 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/67/18367/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index fb3019f..9ea47c1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -38,6 +38,7 @@
 import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepLockInfo;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.common.arraylist.IntArrayList;
 import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
 import org.apache.hyracks.storage.common.buffercache.CachedPage;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
@@ -49,9 +50,12 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;

+import it.unimi.dsi.fastutil.Pair;
+
 @NotThreadSafe
 public final class CloudColumnReadContext implements IColumnReadContext {
     private static final Logger LOGGER = LogManager.getLogger();
+    public static final Integer MAXIMUM_RESULTANT_RANGES = 2;
     private final ColumnProjectorType operation;
     private final IPhysicalDrive drive;
     private final BitSet plan;
@@ -60,6 +64,7 @@
     private final CloudMegaPageReadContext columnCtx;
     private final List<ICachedPage> pinnedPages;
     private final BitSet projectedColumns;
+    private final MergedPageRanges mergedPageRanges;

     public CloudColumnReadContext(IColumnProjectionInfo projectionInfo, 
IPhysicalDrive drive, BitSet plan) {
         this.operation = projectionInfo.getProjectorType();
@@ -70,6 +75,7 @@
         columnCtx = new CloudMegaPageReadContext(operation, columnRanges, 
drive);
         pinnedPages = new ArrayList<>();
         projectedColumns = new BitSet();
+        mergedPageRanges = new MergedPageRanges(MAXIMUM_RESULTANT_RANGES);
         if (operation == QUERY || operation == MODIFY) {
             for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns(); 
i++) {
                 int columnIndex = projectionInfo.getColumnIndex(i);
@@ -157,6 +163,10 @@
         int[] columnsOrder = columnRanges.getColumnsOrder();
         int i = 0;
         int columnIndex = columnsOrder[i];
+
+        IntArrayList pageRanges = new IntArrayList(40, 40);
+        int rangeIndex = 0;
+        int lastPageIdx = -1;
         while (columnIndex > -1) {
             if (columnIndex < columnRanges.getNumberOfPrimaryKeys()) {
                 columnIndex = columnsOrder[++i];
@@ -164,8 +174,12 @@
             }

             int firstPageIdx = 
columnRanges.getColumnStartPageIndex(columnIndex);
+            // not the first range
+            if (lastPageIdx != -1) {
+                mergedPageRanges.addGap(firstPageIdx - lastPageIdx, 
rangeIndex);
+            }
             // last page of the column
-            int lastPageIdx = firstPageIdx + 
columnRanges.getColumnNumberOfPages(columnIndex) - 1;
+            lastPageIdx = firstPageIdx + 
columnRanges.getColumnNumberOfPages(columnIndex) - 1;

             // Advance to the next column to check if it has contiguous pages
             columnIndex = columnsOrder[++i];
@@ -191,9 +205,34 @@
                         + columnRanges.getTotalNumberOfPages());
             }

-            int numberOfPages = lastPageIdx - firstPageIdx + 1;
-            columnCtx.prepare(numberOfPages);
-            pin(bufferCache, fileId, pageZeroId, firstPageIdx, numberOfPages);
+            pageRanges.add(firstPageIdx);
+            pageRanges.add(lastPageIdx);
+            rangeIndex += 1;
+        }
+
+        mergedPageRanges.mergeRanges();
+        int totalRanges = rangeIndex + 1;
+        if (totalRanges < MAXIMUM_RESULTANT_RANGES) {
+            // no merge required.
+            for (int pageIndex = 1; pageIndex < pageRanges.size(); pageIndex 
+= 2) {
+                int lastPageIndex = pageRanges.get(pageIndex);
+                int firstPageIndex = pageRanges.get(pageIndex - 1);
+                int numberOfPages = lastPageIndex - firstPageIndex + 1;
+                columnCtx.prepare(numberOfPages);
+                pin(bufferCache, fileId, pageZeroId, firstPageIndex, 
numberOfPages);
+            }
+        } else {
+            // pull out the resultant merged Index
+            int rangeCnt = 0;
+            while (rangeCnt < MAXIMUM_RESULTANT_RANGES) {
+                Pair<Integer, Integer> mergedRange = 
mergedPageRanges.getNextRange();
+                int firstPageIndex = mergedRange.first();
+                int lastPageIndex = mergedRange.second();
+                int numberOfPages = lastPageIndex - firstPageIndex + 1;
+                columnCtx.prepare(numberOfPages);
+                pin(bufferCache, fileId, pageZeroId, firstPageIndex, 
numberOfPages);
+                rangeCnt++;
+            }
         }
     }

diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index 2679d33..6ced726 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -160,6 +160,12 @@
         ByteBuffer buffer = header.getBuffer();
         buffer.position(0);

+        if (cPage.getCompressedPageOffset() != streamOffset) {
+            streamOffset += cPage.getCompressedPageOffset();
+            remainingStreamBytes -= cPage.getCompressedPageSize();
+            return;
+        }
+
         try {
             while (buffer.remaining() > 0) {
                 int length = stream.read(buffer.array(), buffer.position(), 
buffer.remaining());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
new file mode 100644
index 0000000..f799e7f
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import java.util.BitSet;
+
+import it.unimi.dsi.fastutil.Pair;
+import it.unimi.dsi.fastutil.PriorityQueue;
+import it.unimi.dsi.fastutil.objects.ObjectArrayPriorityQueue;
+
+/*
+Statement:
+    Merge the given ranges such that the maximum number of ranges <= N.
+    Merge should be greedy as the range having lower gaps should be given 
priority.
+Solution:
+    Since the smaller gaps should be chosen first, for this we have to
+    sort the gaps, and pick the K smallest gap
+    where K = T - N,
+    T = total number of ranges,
+    N = maximum number of resultant ranges.
+
+    --> while generating ranges, calculate the gap and put in min-heap
+    --> once that has been done, pull K elements out of the heap
+    --> those ranges should be merged, set the range in bitset.
+    --> have a nextRangeApi which based on set bits
+    --> gives the next Index Range.
+    --> TimeComplexity: O(T * log(T))
+    --> Number of extra pass required = 1 (for merging)
+ */
+public class MergedPageRanges {
+    private final PriorityQueue<Pair<Integer, Integer>> gapRanges;
+    private final int numRequiredRanges;
+    private BitSet mergedIndex;
+    private int currentIndex = 0;
+    private int numRanges;
+
+    MergedPageRanges(int numRequiredRanges) {
+        this.numRequiredRanges = numRequiredRanges;
+        gapRanges = new ObjectArrayPriorityQueue<>(Pair.lexComparator());
+    }
+
+    private void setNumRanges(int numRanges) {
+        this.numRanges = numRanges;
+        mergedIndex = new BitSet(numRanges);
+    }
+
+    public void addGap(int gap, int rangeIndex) {
+        gapRanges.enqueue(Pair.of(gap, rangeIndex));
+    }
+
+    public void mergeRanges() {
+        setNumRanges(gapRanges.size() + 1);
+        // totalMerges = totalRanges - MAXIMUM_RESULTANT_RANGES
+        int merges = gapRanges.size() + 1 - numRequiredRanges;
+        int count = 0;
+        while (count < merges) {
+            int index = gapRanges.dequeue().second();
+            mergedIndex.set(index - 1, index + 1);
+            count++;
+        }
+    }
+
+    public Pair<Integer, Integer> getNextRange() {
+        int startIndex = currentIndex;
+        int endIndex = currentIndex;
+
+        while (endIndex < numRanges && mergedIndex.get(endIndex)) {
+            endIndex++;
+        }
+
+        if (startIndex == endIndex) {
+            currentIndex = endIndex + 1;
+            return Pair.of(startIndex, endIndex);
+        } else {
+            currentIndex = endIndex;
+            return Pair.of(startIndex, endIndex - 1);
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
new file mode 100644
index 0000000..1e17e29
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import it.unimi.dsi.fastutil.Pair;
+
+public class MergedPageRagesTest {
+    private MergedPageRanges mergedPageRanges;
+
+    @Test
+    public void mergePageRanges1() {
+        int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
+
+        int requiredRangeCount = 3;
+        mergedPageRanges = new MergedPageRanges(requiredRangeCount);
+        for (int i = 2; i < pageRanges.length; i += 2) {
+            mergedPageRanges.addGap(pageRanges[i] - pageRanges[i - 1], i / 2);
+        }
+
+        mergedPageRanges.mergeRanges();
+        // since the gaps are in following order
+        // ( 2, 1, 4, 4 )
+        // since we need 3 ranges, 5 - 3 = 2 merges should be done.
+        // hence the resultant ranges := ( 0, 2 ), ( 3, 3 ), ( 4, 4 )
+        List<Pair<Integer, Integer>> ranges = List.of(Pair.of(0, 2), 
Pair.of(3, 3), Pair.of(4, 4));
+
+        for (int i = 0; i < requiredRangeCount; i++) {
+            Pair<Integer, Integer> nextRange = mergedPageRanges.getNextRange();
+            Assert.assertEquals(ranges.get(i).first(), nextRange.first());
+            Assert.assertEquals(ranges.get(i).second(), nextRange.second());
+        }
+    }
+
+    @Test
+    public void allMerges() {
+        int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
+
+        int requiredRangeCount = 1;
+        mergedPageRanges = new MergedPageRanges(requiredRangeCount);
+        for (int i = 2; i < pageRanges.length; i += 2) {
+            mergedPageRanges.addGap(pageRanges[i] - pageRanges[i - 1], i / 2);
+        }
+
+        mergedPageRanges.mergeRanges();
+        // since the gaps are in following order
+        // ( 2, 1, 4, 4 )
+        // since we need 1 ranges, 5 - 4 = 1 merge should be done.
+        // hence the resultant ranges := ( 0, 4)
+        List<Pair<Integer, Integer>> ranges = List.of(Pair.of(0, 4));
+
+        for (int i = 0; i < requiredRangeCount; i++) {
+            Pair<Integer, Integer> nextRange = mergedPageRanges.getNextRange();
+            Assert.assertEquals(ranges.get(i).first(), nextRange.first());
+            Assert.assertEquals(ranges.get(i).second(), nextRange.second());
+        }
+    }
+
+    @Test
+    public void noMerges() {
+        int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
+
+        int requiredRangeCount = 8;
+        mergedPageRanges = new MergedPageRanges(requiredRangeCount);
+        for (int i = 2; i < pageRanges.length; i += 2) {
+            mergedPageRanges.addGap(pageRanges[i] - pageRanges[i - 1], i / 2);
+        }
+
+        mergedPageRanges.mergeRanges();
+        // since the gaps are in following order
+        // ( 2, 1, 4, 4 )
+        // since we need 8 ranges, no merge should be done.
+        List<Pair<Integer, Integer>> ranges =
+                List.of(Pair.of(0, 0), Pair.of(1, 1), Pair.of(2, 2), 
Pair.of(3, 3), Pair.of(4, 4));
+
+        for (int i = 0; i < ranges.size(); i++) {
+            Pair<Integer, Integer> nextRange = mergedPageRanges.getNextRange();
+            Assert.assertEquals(ranges.get(i).first(), nextRange.first());
+            Assert.assertEquals(ranges.get(i).second(), nextRange.second());
+        }
+    }
+}

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18367
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I343abcc5d2c2f1e65e8566e2a346e0a66da11d17
Gerrit-Change-Number: 18367
Gerrit-PatchSet: 1
Gerrit-Owner: Ritik Raj <[email protected]>
Gerrit-MessageType: newchange

Reply via email to