>From Ritik Raj <[email protected]>:
Ritik Raj has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18367 )
Change subject: WIP: Merging pages ranges to reduce the network call
......................................................................
WIP: Merging pages ranges to reduce the network call
Details:
Merging page ranges to reduce the number of
network calls to object storage, with an aim
to reduce the fetching of unwanted pages.
Change-Id: I343abcc5d2c2f1e65e8566e2a346e0a66da11d17
---
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
A
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
4 files changed, 260 insertions(+), 4 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/67/18367/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index fb3019f..9ea47c1 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -38,6 +38,7 @@
import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepLockInfo;
import
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.common.arraylist.IntArrayList;
import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
import org.apache.hyracks.storage.common.buffercache.CachedPage;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
@@ -49,9 +50,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import it.unimi.dsi.fastutil.Pair;
+
@NotThreadSafe
public final class CloudColumnReadContext implements IColumnReadContext {
private static final Logger LOGGER = LogManager.getLogger();
+ public static final Integer MAXIMUM_RESULTANT_RANGES = 2;
private final ColumnProjectorType operation;
private final IPhysicalDrive drive;
private final BitSet plan;
@@ -60,6 +64,7 @@
private final CloudMegaPageReadContext columnCtx;
private final List<ICachedPage> pinnedPages;
private final BitSet projectedColumns;
+ private final MergedPageRanges mergedPageRanges;
public CloudColumnReadContext(IColumnProjectionInfo projectionInfo,
IPhysicalDrive drive, BitSet plan) {
this.operation = projectionInfo.getProjectorType();
@@ -70,6 +75,7 @@
columnCtx = new CloudMegaPageReadContext(operation, columnRanges,
drive);
pinnedPages = new ArrayList<>();
projectedColumns = new BitSet();
+ mergedPageRanges = new MergedPageRanges(MAXIMUM_RESULTANT_RANGES);
if (operation == QUERY || operation == MODIFY) {
for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns();
i++) {
int columnIndex = projectionInfo.getColumnIndex(i);
@@ -157,6 +163,10 @@
int[] columnsOrder = columnRanges.getColumnsOrder();
int i = 0;
int columnIndex = columnsOrder[i];
+
+ IntArrayList pageRanges = new IntArrayList(40, 40);
+ int rangeIndex = 0;
+ int lastPageIdx = -1;
while (columnIndex > -1) {
if (columnIndex < columnRanges.getNumberOfPrimaryKeys()) {
columnIndex = columnsOrder[++i];
@@ -164,8 +174,12 @@
}
int firstPageIdx =
columnRanges.getColumnStartPageIndex(columnIndex);
+ // not the first range
+ if (lastPageIdx != -1) {
+ mergedPageRanges.addGap(firstPageIdx - lastPageIdx,
rangeIndex);
+ }
// last page of the column
- int lastPageIdx = firstPageIdx +
columnRanges.getColumnNumberOfPages(columnIndex) - 1;
+ lastPageIdx = firstPageIdx +
columnRanges.getColumnNumberOfPages(columnIndex) - 1;
// Advance to the next column to check if it has contiguous pages
columnIndex = columnsOrder[++i];
@@ -191,9 +205,34 @@
+ columnRanges.getTotalNumberOfPages());
}
- int numberOfPages = lastPageIdx - firstPageIdx + 1;
- columnCtx.prepare(numberOfPages);
- pin(bufferCache, fileId, pageZeroId, firstPageIdx, numberOfPages);
+ pageRanges.add(firstPageIdx);
+ pageRanges.add(lastPageIdx);
+ rangeIndex += 1;
+ }
+
+ mergedPageRanges.mergeRanges();
+ int totalRanges = rangeIndex + 1;
+ if (totalRanges < MAXIMUM_RESULTANT_RANGES) {
+ // no merge required.
+ for (int pageIndex = 1; pageIndex < pageRanges.size(); pageIndex
+= 2) {
+ int lastPageIndex = pageRanges.get(pageIndex);
+ int firstPageIndex = pageRanges.get(pageIndex - 1);
+ int numberOfPages = lastPageIndex - firstPageIndex + 1;
+ columnCtx.prepare(numberOfPages);
+ pin(bufferCache, fileId, pageZeroId, firstPageIndex,
numberOfPages);
+ }
+ } else {
+ // pull out the resultant merged Index
+ int rangeCnt = 0;
+ while (rangeCnt < MAXIMUM_RESULTANT_RANGES) {
+ Pair<Integer, Integer> mergedRange =
mergedPageRanges.getNextRange();
+ int firstPageIndex = mergedRange.first();
+ int lastPageIndex = mergedRange.second();
+ int numberOfPages = lastPageIndex - firstPageIndex + 1;
+ columnCtx.prepare(numberOfPages);
+ pin(bufferCache, fileId, pageZeroId, firstPageIndex,
numberOfPages);
+ rangeCnt++;
+ }
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index 2679d33..6ced726 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -160,6 +160,12 @@
ByteBuffer buffer = header.getBuffer();
buffer.position(0);
+ if (cPage.getCompressedPageOffset() != streamOffset) {
+ streamOffset += cPage.getCompressedPageOffset();
+ remainingStreamBytes -= cPage.getCompressedPageSize();
+ return;
+ }
+
try {
while (buffer.remaining() > 0) {
int length = stream.read(buffer.array(), buffer.position(),
buffer.remaining());
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
new file mode 100644
index 0000000..f799e7f
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import java.util.BitSet;
+
+import it.unimi.dsi.fastutil.Pair;
+import it.unimi.dsi.fastutil.PriorityQueue;
+import it.unimi.dsi.fastutil.objects.ObjectArrayPriorityQueue;
+
+/*
+Statement:
+ Merge the given ranges such that the maximum number of ranges <= N.
+ Merge should be greedy as the range having lower gaps should be given
priority.
+Solution:
+ Since the smaller gaps should be chosen first, for this we have to
+ sort the gaps, and pick the K smallest gap
+ where K = T - N,
+ T = total number of ranges,
+ N = maximum number of resultant ranges.
+
+ --> while generating ranges, calculate the gap and put in min-heap
+ --> once that has been done, pull K elements out of the heap
+ --> those ranges should be merged, set the range in bitset.
+ --> have a nextRangeApi which based on set bits
+ --> gives the next Index Range.
+ --> TimeComplexity: O(T * log(T))
+ --> Number of extra pass required = 1 (for merging)
+ */
+public class MergedPageRanges {
+ private final PriorityQueue<Pair<Integer, Integer>> gapRanges;
+ private final int numRequiredRanges;
+ private BitSet mergedIndex;
+ private int currentIndex = 0;
+ private int numRanges;
+
+ MergedPageRanges(int numRequiredRanges) {
+ this.numRequiredRanges = numRequiredRanges;
+ gapRanges = new ObjectArrayPriorityQueue<>(Pair.lexComparator());
+ }
+
+ private void setNumRanges(int numRanges) {
+ this.numRanges = numRanges;
+ mergedIndex = new BitSet(numRanges);
+ }
+
+ public void addGap(int gap, int rangeIndex) {
+ gapRanges.enqueue(Pair.of(gap, rangeIndex));
+ }
+
+ public void mergeRanges() {
+ setNumRanges(gapRanges.size() + 1);
+ // totalMerges = totalRanges - MAXIMUM_RESULTANT_RANGES
+ int merges = gapRanges.size() + 1 - numRequiredRanges;
+ int count = 0;
+ while (count < merges) {
+ int index = gapRanges.dequeue().second();
+ mergedIndex.set(index - 1, index + 1);
+ count++;
+ }
+ }
+
+ public Pair<Integer, Integer> getNextRange() {
+ int startIndex = currentIndex;
+ int endIndex = currentIndex;
+
+ while (endIndex < numRanges && mergedIndex.get(endIndex)) {
+ endIndex++;
+ }
+
+ if (startIndex == endIndex) {
+ currentIndex = endIndex + 1;
+ return Pair.of(startIndex, endIndex);
+ } else {
+ currentIndex = endIndex;
+ return Pair.of(startIndex, endIndex - 1);
+ }
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
new file mode 100644
index 0000000..1e17e29
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import it.unimi.dsi.fastutil.Pair;
+
+public class MergedPageRagesTest {
+ private MergedPageRanges mergedPageRanges;
+
+ @Test
+ public void mergePageRanges1() {
+ int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
+
+ int requiredRangeCount = 3;
+ mergedPageRanges = new MergedPageRanges(requiredRangeCount);
+ for (int i = 2; i < pageRanges.length; i += 2) {
+ mergedPageRanges.addGap(pageRanges[i] - pageRanges[i - 1], i / 2);
+ }
+
+ mergedPageRanges.mergeRanges();
+ // since the gaps are in following order
+ // ( 2, 1, 4, 4 )
+ // since we need 3 ranges, 5 - 3 = 2 merges should be done.
+ // hence the resultant ranges := ( 0, 2 ), ( 3, 3 ), ( 4, 4 )
+ List<Pair<Integer, Integer>> ranges = List.of(Pair.of(0, 2),
Pair.of(3, 3), Pair.of(4, 4));
+
+ for (int i = 0; i < requiredRangeCount; i++) {
+ Pair<Integer, Integer> nextRange = mergedPageRanges.getNextRange();
+ Assert.assertEquals(ranges.get(i).first(), nextRange.first());
+ Assert.assertEquals(ranges.get(i).second(), nextRange.second());
+ }
+ }
+
+ @Test
+ public void allMerges() {
+ int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
+
+ int requiredRangeCount = 1;
+ mergedPageRanges = new MergedPageRanges(requiredRangeCount);
+ for (int i = 2; i < pageRanges.length; i += 2) {
+ mergedPageRanges.addGap(pageRanges[i] - pageRanges[i - 1], i / 2);
+ }
+
+ mergedPageRanges.mergeRanges();
+ // since the gaps are in following order
+ // ( 2, 1, 4, 4 )
+ // since we need 1 ranges, 5 - 4 = 1 merge should be done.
+ // hence the resultant ranges := ( 0, 4)
+ List<Pair<Integer, Integer>> ranges = List.of(Pair.of(0, 4));
+
+ for (int i = 0; i < requiredRangeCount; i++) {
+ Pair<Integer, Integer> nextRange = mergedPageRanges.getNextRange();
+ Assert.assertEquals(ranges.get(i).first(), nextRange.first());
+ Assert.assertEquals(ranges.get(i).second(), nextRange.second());
+ }
+ }
+
+ @Test
+ public void noMerges() {
+ int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
+
+ int requiredRangeCount = 8;
+ mergedPageRanges = new MergedPageRanges(requiredRangeCount);
+ for (int i = 2; i < pageRanges.length; i += 2) {
+ mergedPageRanges.addGap(pageRanges[i] - pageRanges[i - 1], i / 2);
+ }
+
+ mergedPageRanges.mergeRanges();
+ // since the gaps are in following order
+ // ( 2, 1, 4, 4 )
+ // since we need 8 ranges, no merge should be done.
+ List<Pair<Integer, Integer>> ranges =
+ List.of(Pair.of(0, 0), Pair.of(1, 1), Pair.of(2, 2),
Pair.of(3, 3), Pair.of(4, 4));
+
+ for (int i = 0; i < ranges.size(); i++) {
+ Pair<Integer, Integer> nextRange = mergedPageRanges.getNextRange();
+ Assert.assertEquals(ranges.get(i).first(), nextRange.first());
+ Assert.assertEquals(ranges.get(i).second(), nextRange.second());
+ }
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18367
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I343abcc5d2c2f1e65e8566e2a346e0a66da11d17
Gerrit-Change-Number: 18367
Gerrit-PatchSet: 1
Gerrit-Owner: Ritik Raj <[email protected]>
Gerrit-MessageType: newchange