>From Wail Alkowaileet <[email protected]>: Wail Alkowaileet has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18352 )
Change subject: [ASTERIXDB-3421][STO] Multiple fixes in cloud disk caching ...................................................................... [ASTERIXDB-3421][STO] Multiple fixes in cloud disk caching - user model changes: no - storage format changes: no - interface changes: yes Details: - Metadata partition should not be evicted - Buffer cache read context should call onPin(ICachedPage) in a synchronized block on the pinning page - Written pages should contain the compressed page size and offset - Multiple issues when calculating page IDs of the requested columns Change-Id: Ic94cc1e63ee4618b18c6d4c6e3e74101a7753400 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18352 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Wail Alkowaileet <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java M hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java D hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java M hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java R hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/IEvictableLocalResourceFilter.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java 21 files changed, 205 insertions(+), 127 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Wail Alkowaileet: Looks good to me, but someone else must approve Jenkins: Verified; Verified Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java index 0f9a4de..3c0f1df 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java @@ -28,6 +28,7 @@ import org.apache.asterix.common.cloud.IPartitionBootstrapper; import org.apache.asterix.common.config.CloudProperties; import org.apache.asterix.common.utils.StorageConstants; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -37,6 +38,7 @@ import org.apache.hyracks.cloud.cache.service.CloudDiskCacheMonitoringAndPrefetchingService; import org.apache.hyracks.cloud.cache.service.CloudDiskResourceCacheLockNotifier; import org.apache.hyracks.cloud.cache.service.DiskCacheSweeperThread; +import org.apache.hyracks.cloud.cache.service.IEvictableLocalResourceFilter; import org.apache.hyracks.cloud.filesystem.PhysicalDrive; import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.nc.io.IOManager; @@ -55,6 +57,8 @@ import org.apache.hyracks.storage.common.file.BufferedFileHandle; public final class CloudConfigurator { + private static final IEvictableLocalResourceFilter FILTER = + (x -> StoragePathUtil.getPartitionNumFromRelativePath(x.getPath()) != StorageConstants.METADATA_PARTITION); private final CloudProperties cloudProperties; private final IOManager localIoManager; private final AbstractCloudIOManager cloudIOManager; @@ -157,7 +161,7 @@ private static IDiskResourceCacheLockNotifier createLockNotifier(boolean diskCacheManagerRequired) { if (diskCacheManagerRequired) { - return new CloudDiskResourceCacheLockNotifier(StorageConstants.METADATA_PARTITION); + return new CloudDiskResourceCacheLockNotifier(FILTER); } return NoOpDiskResourceCacheLockNotifier.INSTANCE; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java index f395362..97169db 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java @@ -120,7 +120,7 @@ public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException { guardian.checkReadAccess(bucket, path); profiler.objectGet(); - long readTo = offset + buffer.remaining(); + long readTo = offset + buffer.remaining() - 1; GetObjectRequest rangeGetObjectRequest = GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket).key(path).build(); @@ -163,7 +163,7 @@ public InputStream getObjectStream(String bucket, String path, long offset, long length) { guardian.checkReadAccess(bucket, path); profiler.objectGet(); - long readTo = offset + length; + long readTo = offset + length - 1; GetObjectRequest getReq = GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket).key(path).build(); try { diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java index 218015d..d8aecc1 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java @@ -48,6 +48,10 @@ @Override public void doEvict(FileReference directory) throws HyracksDataException { + if (!localIoManager.exists(directory)) { + return; + } + if (!directory.getFile().isDirectory()) { throw new IllegalStateException(directory + " is not a directory"); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 3e0fbd4..43b5d1b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -123,11 +123,11 @@ int did = getDIDfromResourcePath(resourcePath); LocalResource resource = resourceRepository.get(resourcePath); DatasetResource datasetResource = datasets.get(did); + lockNotifier.onRegister(resource, index); if (datasetResource == null) { datasetResource = getDatasetLifecycle(did); } datasetResource.register(resource, (ILSMIndex) index); - lockNotifier.onRegister(resource, index, datasetResource.getIndexInfo(resource.getId()).getPartition()); } private int getDIDfromResourcePath(String resourcePath) throws HyracksDataException { diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java index 036a812..fc55c7c 100644 --- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java +++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java @@ -38,14 +38,14 @@ public final class CloudDiskResourceCacheLockNotifier implements IDiskResourceCacheLockNotifier { private static final Logger LOGGER = LogManager.getLogger(); - private final int metadataPartition; + private final IEvictableLocalResourceFilter filter; private final Long2ObjectMap<LocalResource> inactiveResources; private final Long2ObjectMap<UnsweepableIndexUnit> unsweepableIndexes; private final Long2ObjectMap<SweepableIndexUnit> sweepableIndexes; private final ReentrantReadWriteLock evictionLock; - public CloudDiskResourceCacheLockNotifier(int metadataPartition) { - this.metadataPartition = metadataPartition; + public CloudDiskResourceCacheLockNotifier(IEvictableLocalResourceFilter filter) { + this.filter = filter; inactiveResources = Long2ObjectMaps.synchronize(new Long2ObjectOpenHashMap<>()); unsweepableIndexes = Long2ObjectMaps.synchronize(new Long2ObjectOpenHashMap<>()); sweepableIndexes = Long2ObjectMaps.synchronize(new Long2ObjectOpenHashMap<>()); @@ -53,19 +53,19 @@ } @Override - public void onRegister(LocalResource localResource, IIndex index, int partition) { + public void onRegister(LocalResource localResource, IIndex index) { ILSMIndex lsmIndex = (ILSMIndex) index; evictionLock.readLock().lock(); try { - if (partition != metadataPartition) { + if (filter.accept(localResource)) { long resourceId = localResource.getId(); if (lsmIndex.getDiskCacheManager().isSweepable()) { sweepableIndexes.put(resourceId, new SweepableIndexUnit(localResource, lsmIndex)); } else { unsweepableIndexes.put(resourceId, new UnsweepableIndexUnit(localResource)); } + inactiveResources.remove(localResource.getId()); } - inactiveResources.remove(localResource.getId()); } finally { evictionLock.readLock().unlock(); } @@ -75,7 +75,7 @@ public void onUnregister(long resourceId) { evictionLock.readLock().lock(); try { - AbstractIndexUnit indexUnit = getUnit(resourceId); + AbstractIndexUnit indexUnit = removeUnit(resourceId); if (indexUnit != null) { indexUnit.drop(); } else { @@ -86,14 +86,6 @@ } } - private AbstractIndexUnit getUnit(long resourceId) { - AbstractIndexUnit indexUnit = sweepableIndexes.get(resourceId); - if (indexUnit == null) { - indexUnit = unsweepableIndexes.get(resourceId); - } - return indexUnit; - } - @Override public void onOpen(long resourceId) { evictionLock.readLock().lock(); @@ -122,7 +114,22 @@ } finally { evictionLock.readLock().unlock(); } + } + private AbstractIndexUnit getUnit(long resourceId) { + AbstractIndexUnit indexUnit = sweepableIndexes.get(resourceId); + if (indexUnit == null) { + indexUnit = unsweepableIndexes.get(resourceId); + } + return indexUnit; + } + + private AbstractIndexUnit removeUnit(long resourceId) { + AbstractIndexUnit indexUnit = sweepableIndexes.remove(resourceId); + if (indexUnit == null) { + indexUnit = unsweepableIndexes.remove(resourceId); + } + return indexUnit; } ReentrantReadWriteLock getEvictionLock() { @@ -133,7 +140,8 @@ inactiveResources.clear(); // First check whatever we had already for (LocalResource lr : localResources.values()) { - if (unsweepableIndexes.containsKey(lr.getId()) || sweepableIndexes.containsKey(lr.getId())) { + if (!filter.accept(lr) || unsweepableIndexes.containsKey(lr.getId()) + || sweepableIndexes.containsKey(lr.getId())) { // We already have this resource continue; } diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java index f594e06..e466758 100644 --- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java +++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java @@ -31,7 +31,6 @@ import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit; import org.apache.hyracks.cloud.cache.unit.UnsweepableIndexUnit; import org.apache.hyracks.cloud.io.ICloudIOManager; -import org.apache.hyracks.cloud.sweeper.ISweeper; import org.apache.hyracks.cloud.sweeper.Sweeper; import org.apache.hyracks.storage.common.LocalResource; import org.apache.hyracks.storage.common.buffercache.BufferCache; @@ -48,7 +47,7 @@ private final IPhysicalDrive physicalDrive; private final List<SweepableIndexUnit> indexes; private final ICloudIOManager cloudIOManager; - private final ISweeper sweeper; + private final Sweeper sweeper; private final long inactiveTimeThreshold; public DiskCacheSweeperThread(ExecutorService executorService, long waitTime, @@ -161,7 +160,7 @@ } @CriticalPath - private static void sweepIndexes(ISweeper sweeper, List<SweepableIndexUnit> indexes) { + private static void sweepIndexes(Sweeper sweeper, List<SweepableIndexUnit> indexes) { for (int i = 0; i < indexes.size(); i++) { SweepableIndexUnit index = indexes.get(i); if (!index.isSweeping()) { diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/IEvictableLocalResourceFilter.java similarity index 67% rename from hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java rename to hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/IEvictableLocalResourceFilter.java index 9067a3f..3712918 100644 --- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java +++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/IEvictableLocalResourceFilter.java @@ -16,20 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.cloud.sweeper; +package org.apache.hyracks.cloud.cache.service; -import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit; -import org.apache.hyracks.storage.common.disk.IPhysicalDrive; +import org.apache.hyracks.storage.common.LocalResource; -/** - * Sweeps an index to relieve the pressure on a local {@link IPhysicalDrive} - */ @FunctionalInterface -public interface ISweeper { +public interface IEvictableLocalResourceFilter { /** - * Sweep an index + * Whether a local resource is evictable * - * @param indexUnit to sweep + * @param resource resource to test + * @return true if it is cacheable, false otherwise */ - void sweep(SweepableIndexUnit indexUnit) throws InterruptedException; + boolean accept(LocalResource resource); } diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java deleted file mode 100644 index ca103ab..0000000 --- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.cloud.sweeper; - -import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit; - -public final class NoOpSweeper implements ISweeper { - public static final ISweeper INSTANCE = new NoOpSweeper(); - - private NoOpSweeper() { - } - - @Override - public void sweep(SweepableIndexUnit indexUnit) { - // NoOp - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java index 245c957..6e12b79 100644 --- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java +++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java @@ -35,7 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public final class Sweeper implements ISweeper { +public final class Sweeper { private static final Logger LOGGER = LogManager.getLogger(); private static final SweepRequest POISON = new SweepRequest(); private final BlockingQueue<SweepRequest> requests; @@ -55,7 +55,6 @@ } } - @Override public void sweep(SweepableIndexUnit indexUnit) throws InterruptedException { SweepRequest request = freeRequests.take(); request.reset(indexUnit); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java index 9fc60fa..5c6fe09 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java @@ -20,7 +20,8 @@ import static org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeperUtil.EMPTY; import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnPageIndex; -import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfPages; +import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnStartOffset; +import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfRemainingPages; import java.util.BitSet; @@ -171,15 +172,18 @@ } /** - * Length of a column in pages + * The number of pages the column occupies * * @param columnIndex column index * @return number of pages */ public int getColumnNumberOfPages(int columnIndex) { int pageSize = leafFrame.getBuffer().capacity(); - int numberOfPages = getNumberOfPages(getColumnLength(columnIndex), pageSize); - return numberOfPages == 0 ? 1 : numberOfPages; + int offset = getColumnStartOffset(leafFrame.getColumnOffset(columnIndex), pageSize); + int firstBufferLength = pageSize - offset; + int remainingLength = getColumnLength(columnIndex) - firstBufferLength; + // 1 for the first page + the number of remaining pages + return 1 + getNumberOfRemainingPages(remainingLength, pageSize); } /** @@ -231,6 +235,10 @@ return columnsOrder; } + public int getTotalNumberOfPages() { + return leafFrame.getMegaLeafNodeNumberOfPages(); + } + private void init() { int numberOfColumns = leafFrame.getNumberOfColumns(); offsetColumnIndexPairs = LongArrays.ensureCapacity(offsetColumnIndexPairs, numberOfColumns + 1, 0); @@ -312,5 +320,4 @@ } builder.append('\n'); } - } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java index 2d6b2fd..7623698 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java @@ -19,6 +19,7 @@ package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read; import static org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext.readAndPersistPage; +import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE; import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY; import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY; @@ -45,9 +46,12 @@ import org.apache.hyracks.storage.common.disk.IPhysicalDrive; import org.apache.hyracks.storage.common.file.BufferedFileHandle; import org.apache.hyracks.util.annotations.NotThreadSafe; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; @NotThreadSafe public final class CloudColumnReadContext implements IColumnReadContext { + private static final Logger LOGGER = LogManager.getLogger(); private final ColumnProjectorType operation; private final IPhysicalDrive drive; private final BitSet plan; @@ -130,10 +134,26 @@ return; } - // TODO What if every other page is requested. That would do N/2 request, where N is the number of pages. - // TODO This should be optimized in a way that minimizes the number of requests columnRanges.reset(leafFrame, projectedColumns, plan, cloudOnlyColumns); int pageZeroId = leafFrame.getPageId(); + + if (operation == MERGE) { + pinAll(fileId, pageZeroId, leafFrame.getMegaLeafNodeNumberOfPages() - 1, bufferCache); + } else { + pinProjected(fileId, pageZeroId, bufferCache); + } + } + + private void pinAll(int fileId, int pageZeroId, int numberOfPages, IBufferCache bufferCache) + throws HyracksDataException { + columnCtx.prepare(numberOfPages); + pin(bufferCache, fileId, pageZeroId, 1, numberOfPages); + } + + private void pinProjected(int fileId, int pageZeroId, IBufferCache bufferCache) throws HyracksDataException { + // TODO What if every other page is requested. That would do N/2 request, where N is the number of pages. + // TODO This should be optimized in a way that minimizes the number of requests + int[] columnsOrders = columnRanges.getColumnsOrder(); int i = 0; int columnIndex = columnsOrders[i]; @@ -143,38 +163,52 @@ continue; } - int startPageId = columnRanges.getColumnStartPageIndex(columnIndex); - // Will increment the number pages if the next column's pages are contiguous to this column's pages - int numberOfPages = columnRanges.getColumnNumberOfPages(columnIndex); + int firstPageIdx = columnRanges.getColumnStartPageIndex(columnIndex); + // last page of the column + int lastPageIdx = firstPageIdx + columnRanges.getColumnNumberOfPages(columnIndex) - 1; // Advance to the next column to check if it has contiguous pages columnIndex = columnsOrders[++i]; while (columnIndex > -1) { + int sharedPageCount = 0; // Get the next column's start page ID - int nextStartPageId = columnRanges.getColumnStartPageIndex(columnIndex); - if (nextStartPageId > startPageId + numberOfPages + 1) { - // The next startPageId is not contiguous, stop. + int nextStartPageIdx = columnRanges.getColumnStartPageIndex(columnIndex); + if (nextStartPageIdx > lastPageIdx + 1) { + // The nextStartPageIdx is not contiguous, stop. break; + } else if (nextStartPageIdx == lastPageIdx) { + // A shared page + sharedPageCount = 1; } - // Last page of this column - int nextLastPage = nextStartPageId + columnRanges.getColumnNumberOfPages(columnIndex); - // The next column's pages are contiguous. Combine its ranges with the previous one. - numberOfPages = nextLastPage - startPageId; + lastPageIdx += columnRanges.getColumnNumberOfPages(columnIndex) - sharedPageCount; // Advance to the next column columnIndex = columnsOrders[++i]; } + if (lastPageIdx >= columnRanges.getTotalNumberOfPages()) { + throw new IndexOutOfBoundsException("lastPageIdx=" + lastPageIdx + ">=" + "megaLeafNodePages=" + + columnRanges.getTotalNumberOfPages()); + } + + int numberOfPages = lastPageIdx - firstPageIdx + 1; columnCtx.prepare(numberOfPages); - pin(bufferCache, fileId, pageZeroId, startPageId, numberOfPages); + pin(bufferCache, fileId, pageZeroId, firstPageIdx, numberOfPages); } } - private void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numOfRequestedPages) + private void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start, int numberOfPages) throws HyracksDataException { - for (int i = start; i < start + numOfRequestedPages; i++) { + for (int i = start; i < start + numberOfPages; i++) { long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId + i); - pinnedPages.add(bufferCache.pin(dpid, columnCtx)); + try { + pinnedPages.add(bufferCache.pin(dpid, columnCtx)); + } catch (Throwable e) { + LOGGER.error("Error while pinning page number {} with number of pages {}. {}\n columnRanges:\n {}", i, + numberOfPages, columnCtx, columnRanges); + throw e; + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java index f1b2fd4..21d5ce7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java @@ -19,7 +19,6 @@ package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read; import static org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE; -import static org.apache.hyracks.storage.common.buffercache.IBufferCache.RESERVED_HEADER_BYTES; import static org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider.DEFAULT; import java.io.IOException; @@ -49,10 +48,15 @@ private final ColumnProjectorType operation; private final ColumnRanges columnRanges; private final IPhysicalDrive drive; + private int numberOfContiguousPages; private int pageCounter; private InputStream gapStream; + // For debugging + private long streamOffset; + private long remainingStreamBytes; + CloudMegaPageReadContext(ColumnProjectorType operation, ColumnRanges columnRanges, IPhysicalDrive drive) { this.operation = operation; this.columnRanges = columnRanges; @@ -76,7 +80,7 @@ * up writing the bytes of this page in the position of another page. Therefore, we should skip the bytes * for this particular page to avoid placing the bytes of this page into another page's position. */ - skipCloudBytes(cachedPage); + skipStreamIfOpened(cachedPage); pageCounter++; } } @@ -102,8 +106,7 @@ boolean empty = BufferCacheCloudReadContextUtil.isEmpty(header); int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId()); boolean cloudOnly = columnRanges.isCloudOnly(pageId); - ByteBuffer buffer; - if (empty || cloudOnly || gapStream != null) { + if (empty || cloudOnly) { boolean evictable = columnRanges.isEvictable(pageId); /* * Persist iff the following conditions are satisfied: @@ -118,26 +121,30 @@ * 'cloudOnly' is true. */ boolean persist = empty && !cloudOnly && !evictable && operation != MERGE && drive.hasSpace(); - buffer = readFromStream(ioManager, fileHandle, header, cPage, persist); - buffer.position(RESERVED_HEADER_BYTES); + readFromStream(ioManager, fileHandle, header, cPage, persist); } else { /* - * Here we can find a page that is planned for eviction, but it has not being evicted yet - * (i.e., empty = false). This could happen if the cursor is at a point the sweeper hasn't - * reached yet (i.e., cloudOnly = false). + * Here we can find a page that is planned for eviction, but it has not being evicted yet + * (i.e., empty = false). This could happen if the cursor is at a point the sweeper hasn't + * reached yet (i.e., cloudOnly = false). Thus, whatever is read from the disk is valid. */ - buffer = DEFAULT.processHeader(ioManager, fileHandle, header, cPage); + skipStreamIfOpened(cPage); } if (++pageCounter == numberOfContiguousPages) { close(); } - return buffer; + // Finally process the header + return DEFAULT.processHeader(ioManager, fileHandle, header, cPage); } void close() throws HyracksDataException { if (gapStream != null) { + if (remainingStreamBytes != 0) { + LOGGER.warn("Closed cloud stream with nonzero bytes = {}", remainingStreamBytes); + } + try { gapStream.close(); gapStream = null; @@ -147,13 +154,14 @@ } } - private ByteBuffer readFromStream(IOManager ioManager, BufferedFileHandle fileHandle, - BufferCacheHeaderHelper header, CachedPage cPage, boolean persist) throws HyracksDataException { + private void readFromStream(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header, + CachedPage cPage, boolean persist) throws HyracksDataException { InputStream stream = getOrCreateStream(ioManager, fileHandle, cPage); ByteBuffer buffer = header.getBuffer(); buffer.position(0); + try { - while (buffer.remaining() != 0) { + while (buffer.remaining() > 0) { int length = stream.read(buffer.array(), buffer.position(), buffer.remaining()); if (length < 0) { throw new IllegalStateException("Stream should not be empty!"); @@ -164,6 +172,7 @@ throw HyracksDataException.create(e); } + // Flip the buffer after reading to restore the correct position buffer.flip(); if (persist) { @@ -172,7 +181,8 @@ BufferCacheCloudReadContextUtil.persist(cloudIOManager, fileHandle.getFileHandle(), buffer, offset); } - return buffer; + streamOffset += cPage.getCompressedPageSize(); + remainingStreamBytes -= cPage.getCompressedPageSize(); } private InputStream getOrCreateStream(IOManager ioManager, BufferedFileHandle fileHandle, CachedPage cPage) @@ -185,25 +195,31 @@ long offset = cPage.getCompressedPageOffset(); int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId()); long length = fileHandle.getPagesTotalSize(pageId, requiredNumOfPages); + remainingStreamBytes = length; + streamOffset = offset; + LOGGER.info( + "Cloud stream read for pageId={} starting from pageCounter={} out of " + + "numberOfContiguousPages={} (streamOffset = {}, remainingStreamBytes = {})", + pageId, pageCounter, numberOfContiguousPages, streamOffset, remainingStreamBytes); - LOGGER.info("Cloud stream read for {} pages [{}, {}]", numberOfContiguousPages - pageCounter, pageId, - pageId + requiredNumOfPages); ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager; gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(), offset, length); return gapStream; } - private void skipCloudBytes(CloudCachedPage cachedPage) throws HyracksDataException { + private void skipStreamIfOpened(CachedPage cPage) throws HyracksDataException { if (gapStream == null) { return; } try { - long remaining = cachedPage.getCompressedPageSize(); + long remaining = cPage.getCompressedPageSize(); while (remaining > 0) { remaining -= gapStream.skip(remaining); } + streamOffset += cPage.getCompressedPageSize(); + remainingStreamBytes -= cPage.getCompressedPageSize(); } catch (IOException e) { throw HyracksDataException.create(e); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java index 48633e7..d7b0764 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java @@ -19,7 +19,7 @@ package org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples; import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnStartOffset; -import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfPages; +import static org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfRemainingPages; import java.nio.ByteBuffer; import java.util.ArrayDeque; @@ -40,7 +40,7 @@ private final IColumnReadMultiPageOp multiPageOp; private final Queue<ICachedPage> pages; private final LongSet pinnedPages; - private int numberOfPages; + private int numberOfRemainingPages; private int startPage; private int startOffset; private int length; @@ -55,7 +55,7 @@ @Override public void reset(ColumnBTreeReadLeafFrame frame) throws HyracksDataException { if (columnIndex >= frame.getNumberOfColumns()) { - numberOfPages = 0; + numberOfRemainingPages = 0; length = 0; return; } @@ -70,8 +70,8 @@ length = ColumnUtil.readColumnLength(firstPage, startOffset, pageSize); // Get the remaining length of the column int remainingLength = length - firstPage.remaining(); - // Get the number of pages this column occupies - numberOfPages = getNumberOfPages(remainingLength, pageSize); + // Get the number of remaining pages this column occupies + numberOfRemainingPages = getNumberOfRemainingPages(remainingLength, pageSize); //+4-bytes after reading the length startOffset += Integer.BYTES; //-4-bytes after reading the length @@ -84,12 +84,12 @@ buffer.clear(); buffer.position(startOffset); buffers.add(buffer); - for (int i = 0; i < numberOfPages; i++) { + for (int i = 0; i < numberOfRemainingPages; i++) { buffer = readNext().duplicate(); buffer.clear(); buffers.add(buffer); } - numberOfPages = 0; + numberOfRemainingPages = 0; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java index 12bb64e..fc1e460 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java @@ -87,7 +87,7 @@ // Read the length of this column int length = firstPage.getInt(); // Ensure the page limit to at most a full page - firstPage.limit(Math.min(length, pageSize)); + firstPage.limit(Math.min(startOffset + length, pageSize)); return length; } @@ -98,7 +98,7 @@ * @param pageSize disk buffer cache page size * @return number of pages the column occupies */ - public static int getNumberOfPages(int remainingLength, int pageSize) { + public static int getNumberOfRemainingPages(int remainingLength, int pageSize) { return (int) Math.ceil((double) remainingLength / pageSize); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java index 21cefb6..a0ad045 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java @@ -321,4 +321,9 @@ final String path = fileHandle.getFileReference().getAbsolutePath(); throw new IllegalStateException(String.format(ERROR_MESSAGE, op, expected, actual, path)); } + + @Override + public String toString() { + return fileHandle != null ? fileHandle.getFileReference().getAbsolutePath() : ""; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java index ab37532..271afad 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java @@ -203,12 +203,11 @@ } } - // Notify context page is going to be pinned - context.onPin(cPage); - // Resolve race of multiple threads trying to read the page from // disk. synchronized (cPage) { + // Notify context page is going to be pinned + context.onPin(cPage); if (!cPage.valid) { try { tryRead(cPage, context); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java index 6fd18ff..b9d4e5a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java @@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hyracks.storage.common.file.BufferedFileHandle; + /** * @author yingyib */ @@ -209,4 +211,10 @@ public int getCompressedPageSize() { return compressedSize; } + + @Override + public String toString() { + return "CachedPage:[page:" + BufferedFileHandle.getPageId(dpid) + ", compressedPageOffset:" + compressedOffset + + ", compressedSize:" + compressedSize + "]"; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java index 1e986b3..6ce985e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java @@ -24,7 +24,6 @@ /** * A proxy to notify a disk-cache (a faster disk that is caching a slower resource) about resource lifecycle events. * The notifier could block a resource from being operated on if the disk-cache manager denying access to a resource - * TODO Do we care about dataset? */ public interface IDiskResourceCacheLockNotifier { /** @@ -33,9 +32,8 @@ * * @param localResource resource to be registered * @param index of the resource - * @param partition partition */ - void onRegister(LocalResource localResource, IIndex index, int partition); + void onRegister(LocalResource localResource, IIndex index); /** * Notify unregistering an existing resource diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java index b83c388..2c590cb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java @@ -28,7 +28,7 @@ } @Override - public void onRegister(LocalResource localResource, IIndex index, int partition) { + public void onRegister(LocalResource localResource, IIndex index) { // NoOp } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java index 970a0ae..ff2bd83 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java @@ -114,11 +114,13 @@ final boolean contiguousLargePages = getPageId(cPage.getDiskPageId()) + 1 == extraBlockPageId; IFileHandle handle = getFileHandle(); long bytesWritten; + long offset; try { buf.limit(contiguousLargePages ? bufferCache.getPageSize() * totalPages : bufferCache.getPageSize()); buf.position(0); ByteBuffer[] buffers = header.prepareWrite(cPage); - bytesWritten = context.write(ioManager, handle, getFirstPageOffset(cPage), buffers); + offset = getFirstPageOffset(cPage); + bytesWritten = context.write(ioManager, handle, offset, buffers); } finally { returnHeaderHelper(header); } @@ -130,6 +132,9 @@ final int expectedWritten = bufferCache.getPageSizeWithHeader() + bufferCache.getPageSize() * (totalPages - 1); verifyBytesWritten(expectedWritten, bytesWritten); + + cPage.setCompressedPageOffset(offset); + cPage.setCompressedPageSize((int) bytesWritten); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java index 6fe3846..cd882b5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java @@ -114,15 +114,16 @@ } else { uBuffer.position(0); } + long offset; if (compressToWriteBuffer(uBuffer, cBuffer) < bufferCache.getPageSize()) { cBuffer.position(0); - final long offset = compressedFileManager.writePageInfo(pageId, cBuffer.remaining()); + offset = compressedFileManager.writePageInfo(pageId, cBuffer.remaining()); expectedBytesWritten = cBuffer.limit(); bytesWritten = context.write(ioManager, handle, offset, cBuffer); } else { //Compression did not gain any savings final ByteBuffer[] buffers = header.prepareWrite(cPage); - final long offset = compressedFileManager.writePageInfo(pageId, bufferCache.getPageSizeWithHeader()); + offset = compressedFileManager.writePageInfo(pageId, bufferCache.getPageSizeWithHeader()); expectedBytesWritten = buffers[0].limit() + (long) buffers[1].limit(); bytesWritten = context.write(ioManager, handle, offset, buffers); } @@ -134,6 +135,9 @@ writeExtraCompressedPages(cPage, cBuffer, totalPages, extraBlockPageId); } + cPage.setCompressedPageOffset(offset); + cPage.setCompressedPageSize((int) bytesWritten); + } finally { returnHeaderHelper(header); } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18352 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: Ic94cc1e63ee4618b18c6d4c6e3e74101a7753400 Gerrit-Change-Number: 18352 Gerrit-PatchSet: 2 Gerrit-Owner: Wail Alkowaileet <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Wail Alkowaileet <[email protected]> Gerrit-MessageType: merged
