tkalkirill commented on a change in pull request #602: URL: https://github.com/apache/ignite-3/pull/602#discussion_r798330554
########## File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/AbstractFreeList.java ########## @@ -0,0 +1,923 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.ignite.internal.pagememory.freelist; + +import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA; +import static org.apache.ignite.internal.pagememory.util.PageIdUtils.itemId; +import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId; +import static org.apache.ignite.internal.util.IgniteUtils.isPow2; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceArray; +import org.apache.ignite.internal.pagememory.PageIdAllocator; +import org.apache.ignite.internal.pagememory.PageMemory; +import org.apache.ignite.internal.pagememory.Storable; +import org.apache.ignite.internal.pagememory.evict.PageEvictionTracker; +import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder; +import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp; +import org.apache.ignite.internal.pagememory.reuse.LongListReuseBag; +import org.apache.ignite.internal.pagememory.reuse.ReuseBag; +import org.apache.ignite.internal.pagememory.reuse.ReuseList; +import org.apache.ignite.internal.pagememory.util.PageHandler; +import org.apache.ignite.internal.pagememory.util.PageIdUtils; +import org.apache.ignite.internal.pagememory.util.PageLockListener; +import org.apache.ignite.internal.util.IgniteCursor; +import org.apache.ignite.lang.IgniteInternalCheckedException; +import org.apache.ignite.lang.IgniteLogger; +import org.jetbrains.annotations.Nullable; + +/** + * Abstract free list. + */ +public abstract class AbstractFreeList<T extends Storable> extends PagesList implements FreeList<T>, ReuseList { + private static final int BUCKETS = 256; // Must be power of 2. + + private static final int REUSE_BUCKET = BUCKETS - 1; + + private static final Integer COMPLETE = Integer.MAX_VALUE; + + private static final Integer FAIL_I = Integer.MIN_VALUE; + + private static final Long FAIL_L = Long.MAX_VALUE; + + private static final int MIN_PAGE_FREE_SPACE = 8; + + /** + * Step between buckets in free list, measured in powers of two. For example, for page size 4096 and 256 buckets, shift is 4 and step is + * 16 bytes. + */ + private final int shift; + + /** Buckets. */ + private final AtomicReferenceArray<Stripe[]> buckets = new AtomicReferenceArray<>(BUCKETS); + + /** Onheap bucket page list caches. */ + private final AtomicReferenceArray<PagesCache> bucketCaches = new AtomicReferenceArray<>(BUCKETS); + + /** Min size for data page in bytes. */ + private final int minSizeForDataPage; + + /** Page list cache limit. */ + @Nullable + private final AtomicLong pageListCacheLimit; + + /** Page eviction tracker. */ + private final PageEvictionTracker evictionTracker; + + private final PageHandler<T, Boolean> updateRow = new UpdateRowHandler(); + + /** Write a single row on a single page. */ + private final WriteRowHandler writeRowHnd = new WriteRowHandler(); + + /** Write multiple rows on a single page. */ + private final WriteRowsHandler writeRowsHnd = new WriteRowsHandler(); + + private final PageHandler<ReuseBag, Long> rmvRow; + + private final class UpdateRowHandler implements PageHandler<T, Boolean> { + /** {@inheritDoc} */ + @Override + public Boolean run( + int cacheId, + long pageId, + long page, + long pageAddr, + PageIo iox, + T row, + int itemId, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + AbstractDataPageIo<T> io = (AbstractDataPageIo<T>) iox; + + int rowSize = row.size(); + + boolean updated = io.updateRow(pageAddr, itemId, pageSize(), null, row, rowSize); + + evictionTracker.touchPage(pageId); + + return updated; + } + } + + private class WriteRowHandler implements PageHandler<T, Integer> { + /** {@inheritDoc} */ + @Override + public Integer run( + int cacheId, + long pageId, + long page, + long pageAddr, + PageIo iox, + T row, + int written, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + written = addRow(pageId, pageAddr, iox, row, written); + + putPage(((AbstractDataPageIo<Storable>) iox).getFreeSpace(pageAddr), pageId, pageAddr, statHolder); + + return written; + } + + /** + * Writes row to data page. + * + * @param pageId Page ID. + * @param pageAddr Page address. + * @param iox IO. + * @param row Row to write. + * @param written Written size. + * @return Number of bytes written, {@link #COMPLETE} if the row was fully written. + * @throws IgniteInternalCheckedException If failed. + */ + protected Integer addRow( + long pageId, + long pageAddr, + PageIo iox, + T row, + int written + ) throws IgniteInternalCheckedException { + AbstractDataPageIo<T> io = (AbstractDataPageIo<T>) iox; + + int rowSize = row.size(); + int oldFreeSpace = io.getFreeSpace(pageAddr); + + assert oldFreeSpace > 0 : oldFreeSpace; + + // If the full row does not fit into this page write only a fragment. + written = (written == 0 && oldFreeSpace >= rowSize) ? addRowFull(pageId, pageAddr, io, row, rowSize) : + addRowFragment(pageId, pageAddr, io, row, written, rowSize); + + if (written == rowSize) { + evictionTracker.touchPage(pageId); + } + + // Avoid boxing with garbage generation for usual case. + return written == rowSize ? COMPLETE : written; + } + + /** + * Adds row to this data page and sets respective link to the given row object. + * + * @param pageId Page ID. + * @param pageAddr Page address. + * @param io IO. + * @param row Row. + * @param rowSize Row size. + * @return Written size which is always equal to row size here. + * @throws IgniteInternalCheckedException If failed. + */ + protected int addRowFull( + long pageId, + long pageAddr, + AbstractDataPageIo<T> io, + T row, + int rowSize + ) throws IgniteInternalCheckedException { + io.addRow(pageId, pageAddr, row, rowSize, pageSize()); + + return rowSize; + } + + /** + * Adds maximum possible fragment of the given row to this data page and sets respective link to the row. + * + * @param pageId Page ID. + * @param pageAddr Page address. + * @param io IO. + * @param row Row. + * @param written Written size. + * @param rowSize Row size. + * @return Updated written size. + * @throws IgniteInternalCheckedException If failed. + */ + protected int addRowFragment( + long pageId, + long pageAddr, + AbstractDataPageIo<T> io, + T row, + int written, + int rowSize + ) throws IgniteInternalCheckedException { + int payloadSize = io.addRowFragment(pageMem, pageId, pageAddr, row, written, rowSize, pageSize()); + + assert payloadSize > 0 : payloadSize; + + return written + payloadSize; + } + + /** + * Put page into the free list if needed. + * + * @param freeSpace Page free space. + * @param pageId Page ID. + * @param pageAddr Page address. + * @param statHolder Statistics holder to track IO operations. + */ + protected void putPage( + int freeSpace, + long pageId, + long pageAddr, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + if (freeSpace > MIN_PAGE_FREE_SPACE) { + int bucket = bucket(freeSpace, false); + + put(null, pageId, pageAddr, bucket, statHolder); + } + } + } + + private final class WriteRowsHandler implements PageHandler<IgniteCursor<T>, Integer> { + /** {@inheritDoc} */ + @Override + public Integer run( + int cacheId, + long pageId, + long page, + long pageAddr, + PageIo iox, + IgniteCursor<T> cur, + int written, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + AbstractDataPageIo<T> io = (AbstractDataPageIo<T>) iox; + + // Fill the page up to the end. + while (written != COMPLETE || (!evictionTracker.evictionRequired() && cur.next())) { + T row = cur.get(); + + if (written == COMPLETE) { + // If the data row was completely written without remainder, proceed to the next. + if ((written = writeWholePages(row, statHolder)) == COMPLETE) { + continue; + } + + if (io.getFreeSpace(pageAddr) < row.size() - written) { + break; + } + } + + written = writeRowHnd.addRow(pageId, pageAddr, io, row, written); + + assert written == COMPLETE; + + evictionTracker.touchPage(pageId); + } + + writeRowHnd.putPage(io.getFreeSpace(pageAddr), pageId, pageAddr, statHolder); + + return written; + } + } + + private final class RemoveRowHandler implements PageHandler<ReuseBag, Long> { + /** Indicates whether partition ID should be masked from page ID. */ + private final boolean maskPartId; + + /** + * Constructor. + * + * @param maskPartId Indicates whether partition ID should be masked from page ID. + */ + RemoveRowHandler(boolean maskPartId) { + this.maskPartId = maskPartId; + } + + /** {@inheritDoc} */ + @Override + public Long run( + int cacheId, + long pageId, + long page, + long pageAddr, + PageIo iox, + ReuseBag reuseBag, + int itemId, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + AbstractDataPageIo<T> io = (AbstractDataPageIo<T>) iox; + + int oldFreeSpace = io.getFreeSpace(pageAddr); + + assert oldFreeSpace >= 0 : oldFreeSpace; + + long nextLink = io.removeRow(pageAddr, itemId, pageSize()); + + int newFreeSpace = io.getFreeSpace(pageAddr); + + if (newFreeSpace > MIN_PAGE_FREE_SPACE) { + int newBucket = bucket(newFreeSpace, false); + + boolean putIsNeeded = oldFreeSpace <= MIN_PAGE_FREE_SPACE; + + if (!putIsNeeded) { + int oldBucket = bucket(oldFreeSpace, false); + + if (oldBucket != newBucket) { + // It is possible that page was concurrently taken for put, in this case put will handle bucket change. + pageId = maskPartId ? PageIdUtils.maskPartitionId(pageId) : pageId; + + putIsNeeded = removeDataPage(pageId, pageAddr, io, oldBucket, statHolder); + } + } + + if (io.isEmpty(pageAddr)) { + evictionTracker.forgetPage(pageId); + + if (putIsNeeded) { + reuseBag.addFreePage(recyclePage(pageId, pageAddr)); + } + } else if (putIsNeeded) { + put(null, pageId, pageAddr, newBucket, statHolder); + } + } + + // For common case boxed 0L will be cached inside of Long, so no garbage will be produced. + return nextLink; + } + } + + /** + * Constructor. + * + * @param grpId Group ID. + * @param name Structure name (for debug purpose). + * @param pageMem Page memory. + * @param reuseList Reuse list or {@code null} if this free list will be a reuse list for itself. + * @param lockLsnr Page lock listener. + * @param defaultPageFlag Default flag value for allocated pages. One of {@link PageIdAllocator#FLAG_DATA} or {@link + * PageIdAllocator#FLAG_AUX}. + * @param log Logger. + * @param metaPageId Metadata page ID. + * @param initNew {@code True} if new metadata should be initialized. + * @param pageListCacheLimit Page list cache limit. + * @param evictionTracker Page eviction tracker. + * @throws IgniteInternalCheckedException If failed. + */ + public AbstractFreeList( + int grpId, + String name, + PageMemory pageMem, + @Nullable ReuseList reuseList, + PageLockListener lockLsnr, + byte defaultPageFlag, + IgniteLogger log, + long metaPageId, + boolean initNew, + @Nullable AtomicLong pageListCacheLimit, + PageEvictionTracker evictionTracker + ) throws IgniteInternalCheckedException { + super( + grpId, + name, + pageMem, + lockLsnr, + defaultPageFlag, + log, + BUCKETS, + metaPageId + ); + + this.evictionTracker = evictionTracker; + this.pageListCacheLimit = pageListCacheLimit; + + this.reuseList = reuseList == null ? this : reuseList; + + rmvRow = new RemoveRowHandler(grpId == 0); + + int pageSize = pageMem.pageSize(); + + assert isPow2(pageSize) : "Page size must be a power of 2: " + pageSize; + assert isPow2(BUCKETS); + assert BUCKETS <= pageSize : pageSize; + + // TODO this constant is used because currently we cannot reuse data pages as index pages Review comment: Added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
