anton-vinogradov commented on a change in pull request #8767: URL: https://github.com/apache/ignite/pull/8767#discussion_r584524322
########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AtomicBitSet.java ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; Review comment: Wrong package for this class. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java ########## @@ -1113,6 +1136,59 @@ private static String snapshotMetaFileName(String consId) { return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT; } + /** + * @param snpName Snapshot name. + * @param grpName Cache group name. + * @param partId Partition id. + * @return Iterator over partition. + */ + public GridCloseableIterator<CacheDataRow> partitionRows(String snpName, + String consId, Review comment: Description missed at Javadoc. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java ########## @@ -1113,6 +1136,59 @@ private static String snapshotMetaFileName(String consId) { return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT; } + /** + * @param snpName Snapshot name. + * @param grpName Cache group name. + * @param partId Partition id. + * @return Iterator over partition. + */ + public GridCloseableIterator<CacheDataRow> partitionRows(String snpName, + String consId, + String grpName, + int partId + ) throws IgniteCheckedException { + File snpDir = snapshotLocalDir(snpName); + + if (!snpDir.exists()) + throw new IgniteCheckedException("Snapshot directory doesn't exists: " + snpDir.getAbsolutePath()); + + File nodePath = new File(snpDir, databaseRelativePath(U.maskForFileName(consId))); + + if (!nodePath.exists()) + throw new IgniteCheckedException("Consistent id directory doesn't exists: " + nodePath.getAbsolutePath()); + + File[] grps = nodePath.listFiles(path -> path.isDirectory() && Review comment: Should this lookup have explicitly one result? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java ########## @@ -1271,6 +1347,198 @@ static void copy(FileIOFactory factory, File from, File to, long length) { } } + /** */ + private static class PageScanIterator extends GridCloseableIteratorAdapter<CacheDataRow> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Page store to iterate over. */ + @GridToStringExclude + private final PageStore store; + + /** Page store partition id. */ + private final int partId; + + /** Grid cache shared context. */ + private final GridCacheSharedContext<?, ?> sctx; + + /** Cache object context for key/value deserialization. */ + private final CacheObjectContext coctx; + + /** Buffer to read pages. */ + private final ByteBuffer locBuff; + + /** Buffer to read the rest part of fragmented rows. */ + private final ByteBuffer fragmentBuff; + + /** Total pages in the page store. */ + private final int pages; + + /** Pages which must be skipped at the second iteration. */ + private final AtomicBitSet skipPages; + + /** Batch of rows read through iteration. */ + private final Deque<CacheDataRow> rows = new LinkedList<>(); + + /** {@code true} if the iteration reached its end. */ + private boolean finished; + + /** + * Current partition page index for read. Due to we read the partition twice it + * can't be greater that 2 * store.size(). + */ + private int currIdx = -1; + + /** + * @param coctx Cache object context. + * @param store Page store to read. + * @param partId Partition id. + * @throws IgniteCheckedException If fails. + */ + public PageScanIterator( + GridCacheSharedContext<?, ?> sctx, + CacheObjectContext coctx, + PageStore store, + int partId + ) throws IgniteCheckedException { + // CacheGroupContext may be null. It means that this cache group is under eviction for now. + // Since the persisted cache groups and their partitions are guarded by external machinery we + // can avoid it here. Review comment: Looks like the comment is not related to the 'store' field. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java ########## @@ -371,7 +371,7 @@ private void setDirectCount(long pageAddr, int cnt) { * @param pageAddr Page address. * @return Direct count. */ - private int getDirectCount(long pageAddr) { + public int getDirectCount(long pageAddr) { Review comment: It is not a good idea to make something public for tests only ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java ########## @@ -1271,6 +1347,198 @@ static void copy(FileIOFactory factory, File from, File to, long length) { } } + /** */ + private static class PageScanIterator extends GridCloseableIteratorAdapter<CacheDataRow> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Page store to iterate over. */ + @GridToStringExclude + private final PageStore store; + + /** Page store partition id. */ + private final int partId; + + /** Grid cache shared context. */ + private final GridCacheSharedContext<?, ?> sctx; + + /** Cache object context for key/value deserialization. */ + private final CacheObjectContext coctx; + + /** Buffer to read pages. */ + private final ByteBuffer locBuff; + + /** Buffer to read the rest part of fragmented rows. */ + private final ByteBuffer fragmentBuff; + + /** Total pages in the page store. */ + private final int pages; + + /** Pages which must be skipped at the second iteration. */ + private final AtomicBitSet skipPages; + + /** Batch of rows read through iteration. */ + private final Deque<CacheDataRow> rows = new LinkedList<>(); + + /** {@code true} if the iteration reached its end. */ + private boolean finished; + + /** + * Current partition page index for read. Due to we read the partition twice it + * can't be greater that 2 * store.size(). + */ + private int currIdx = -1; + + /** + * @param coctx Cache object context. + * @param store Page store to read. + * @param partId Partition id. + * @throws IgniteCheckedException If fails. + */ + public PageScanIterator( + GridCacheSharedContext<?, ?> sctx, + CacheObjectContext coctx, + PageStore store, + int partId + ) throws IgniteCheckedException { + // CacheGroupContext may be null. It means that this cache group is under eviction for now. + // Since the persisted cache groups and their partitions are guarded by external machinery we + // can avoid it here. + this.store = store; + this.partId = partId; + this.coctx = coctx; + this.sctx = sctx; + + store.sync(); + pages = store.pages(); + skipPages = new AtomicBitSet(pages); + + locBuff = ByteBuffer.allocateDirect(store.getPageSize()) + .order(ByteOrder.nativeOrder()); + fragmentBuff = ByteBuffer.allocateDirect(store.getPageSize()) + .order(ByteOrder.nativeOrder()); + } + + /** {@inheritDoc */ + @Override protected CacheDataRow onNext() throws IgniteCheckedException { + if (finished && rows.isEmpty()) + throw new NoSuchElementException("[partId=" + partId + ", store=" + store + ", skipPages=" + skipPages + ']'); + + return rows.poll(); + } + + /** {@inheritDoc */ + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (finished && rows.isEmpty()) + return false; + + try { + while (rows.isEmpty() && ++currIdx < 2 * pages) { + int pageIdx = currIdx % pages; + boolean firstScan = currIdx < pages; + + if (skipPages.check(pageIdx)) + continue; + + locBuff.clear(); + + long pageId = PageIdUtils.pageId(partId, PageIdAllocator.FLAG_DATA, pageIdx); + boolean success = store.read(pageId, locBuff, true); + + assert success : PageIdUtils.toDetailString(pageId); + + // Skip not data pages. + if (firstScan && PageIO.getType(locBuff) != T_DATA) { + skipPages.touch(pageIdx); + + continue; + } + + long pageAddr = GridUnsafe.bufferAddress(locBuff); + + DataPageIO io = PageIO.getPageIO(T_DATA, PageIO.getVersion(locBuff)); + int freeSpace = io.getFreeSpace(pageAddr); + int rowsCnt = io.getDirectCount(pageAddr); + + if (firstScan && rowsCnt == 0) { + skipPages.touch(pageIdx); + + continue; + } + + // For pages which contains only the incomplete fragment of a data row + // the rowsCnt will always be equal to 1. Skip such pages and read them + // on the second iteration. + if (firstScan && freeSpace == 0 && rowsCnt == 1) { Review comment: Looks like you may combine all `(firstScan && ... )` to a single section. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java ########## @@ -1271,6 +1347,198 @@ static void copy(FileIOFactory factory, File from, File to, long length) { } } + /** */ + private static class PageScanIterator extends GridCloseableIteratorAdapter<CacheDataRow> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Page store to iterate over. */ + @GridToStringExclude + private final PageStore store; + + /** Page store partition id. */ + private final int partId; + + /** Grid cache shared context. */ + private final GridCacheSharedContext<?, ?> sctx; + + /** Cache object context for key/value deserialization. */ + private final CacheObjectContext coctx; + + /** Buffer to read pages. */ + private final ByteBuffer locBuff; + + /** Buffer to read the rest part of fragmented rows. */ + private final ByteBuffer fragmentBuff; + + /** Total pages in the page store. */ + private final int pages; + + /** Pages which must be skipped at the second iteration. */ + private final AtomicBitSet skipPages; + + /** Batch of rows read through iteration. */ + private final Deque<CacheDataRow> rows = new LinkedList<>(); + + /** {@code true} if the iteration reached its end. */ + private boolean finished; + + /** + * Current partition page index for read. Due to we read the partition twice it + * can't be greater that 2 * store.size(). + */ + private int currIdx = -1; + + /** + * @param coctx Cache object context. + * @param store Page store to read. + * @param partId Partition id. + * @throws IgniteCheckedException If fails. + */ + public PageScanIterator( + GridCacheSharedContext<?, ?> sctx, + CacheObjectContext coctx, + PageStore store, + int partId + ) throws IgniteCheckedException { + // CacheGroupContext may be null. It means that this cache group is under eviction for now. + // Since the persisted cache groups and their partitions are guarded by external machinery we + // can avoid it here. + this.store = store; + this.partId = partId; + this.coctx = coctx; + this.sctx = sctx; + + store.sync(); + pages = store.pages(); + skipPages = new AtomicBitSet(pages); + + locBuff = ByteBuffer.allocateDirect(store.getPageSize()) + .order(ByteOrder.nativeOrder()); + fragmentBuff = ByteBuffer.allocateDirect(store.getPageSize()) + .order(ByteOrder.nativeOrder()); + } + + /** {@inheritDoc */ + @Override protected CacheDataRow onNext() throws IgniteCheckedException { + if (finished && rows.isEmpty()) Review comment: Should rows always be empty when finished? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java ########## @@ -1271,6 +1347,198 @@ static void copy(FileIOFactory factory, File from, File to, long length) { } } + /** */ + private static class PageScanIterator extends GridCloseableIteratorAdapter<CacheDataRow> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Page store to iterate over. */ + @GridToStringExclude + private final PageStore store; + + /** Page store partition id. */ + private final int partId; + + /** Grid cache shared context. */ + private final GridCacheSharedContext<?, ?> sctx; + + /** Cache object context for key/value deserialization. */ + private final CacheObjectContext coctx; + + /** Buffer to read pages. */ + private final ByteBuffer locBuff; + + /** Buffer to read the rest part of fragmented rows. */ + private final ByteBuffer fragmentBuff; + + /** Total pages in the page store. */ + private final int pages; + + /** Pages which must be skipped at the second iteration. */ + private final AtomicBitSet skipPages; + + /** Batch of rows read through iteration. */ + private final Deque<CacheDataRow> rows = new LinkedList<>(); + + /** {@code true} if the iteration reached its end. */ + private boolean finished; + + /** + * Current partition page index for read. Due to we read the partition twice it + * can't be greater that 2 * store.size(). + */ + private int currIdx = -1; + + /** + * @param coctx Cache object context. + * @param store Page store to read. + * @param partId Partition id. + * @throws IgniteCheckedException If fails. + */ + public PageScanIterator( + GridCacheSharedContext<?, ?> sctx, + CacheObjectContext coctx, + PageStore store, + int partId + ) throws IgniteCheckedException { + // CacheGroupContext may be null. It means that this cache group is under eviction for now. + // Since the persisted cache groups and their partitions are guarded by external machinery we + // can avoid it here. + this.store = store; + this.partId = partId; + this.coctx = coctx; + this.sctx = sctx; + + store.sync(); Review comment: Since you create this on a group read what writes should be synchronized? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java ########## @@ -1113,6 +1136,59 @@ private static String snapshotMetaFileName(String consId) { return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT; } + /** + * @param snpName Snapshot name. + * @param grpName Cache group name. + * @param partId Partition id. + * @return Iterator over partition. + */ + public GridCloseableIterator<CacheDataRow> partitionRows(String snpName, + String consId, + String grpName, + int partId + ) throws IgniteCheckedException { + File snpDir = snapshotLocalDir(snpName); + + if (!snpDir.exists()) + throw new IgniteCheckedException("Snapshot directory doesn't exists: " + snpDir.getAbsolutePath()); + + File nodePath = new File(snpDir, databaseRelativePath(U.maskForFileName(consId))); + + if (!nodePath.exists()) + throw new IgniteCheckedException("Consistent id directory doesn't exists: " + nodePath.getAbsolutePath()); + + File[] grps = nodePath.listFiles(path -> path.isDirectory() && + (path.getName().equalsIgnoreCase(cacheDirName(true, grpName)) || + path.getName().equalsIgnoreCase(cacheDirName(false, grpName)))); Review comment: Could we simplify this by using the explicit isSharedFroup flag got from Group Description? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java ########## @@ -1271,6 +1347,198 @@ static void copy(FileIOFactory factory, File from, File to, long length) { } } + /** */ + private static class PageScanIterator extends GridCloseableIteratorAdapter<CacheDataRow> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Page store to iterate over. */ + @GridToStringExclude + private final PageStore store; + + /** Page store partition id. */ + private final int partId; + + /** Grid cache shared context. */ + private final GridCacheSharedContext<?, ?> sctx; + + /** Cache object context for key/value deserialization. */ + private final CacheObjectContext coctx; + + /** Buffer to read pages. */ + private final ByteBuffer locBuff; + + /** Buffer to read the rest part of fragmented rows. */ + private final ByteBuffer fragmentBuff; + + /** Total pages in the page store. */ + private final int pages; + + /** Pages which must be skipped at the second iteration. */ + private final AtomicBitSet skipPages; + + /** Batch of rows read through iteration. */ + private final Deque<CacheDataRow> rows = new LinkedList<>(); + + /** {@code true} if the iteration reached its end. */ + private boolean finished; + + /** + * Current partition page index for read. Due to we read the partition twice it + * can't be greater that 2 * store.size(). + */ + private int currIdx = -1; + + /** + * @param coctx Cache object context. + * @param store Page store to read. + * @param partId Partition id. + * @throws IgniteCheckedException If fails. + */ + public PageScanIterator( + GridCacheSharedContext<?, ?> sctx, + CacheObjectContext coctx, + PageStore store, + int partId + ) throws IgniteCheckedException { + // CacheGroupContext may be null. It means that this cache group is under eviction for now. + // Since the persisted cache groups and their partitions are guarded by external machinery we + // can avoid it here. + this.store = store; + this.partId = partId; + this.coctx = coctx; + this.sctx = sctx; + + store.sync(); + pages = store.pages(); + skipPages = new AtomicBitSet(pages); + + locBuff = ByteBuffer.allocateDirect(store.getPageSize()) + .order(ByteOrder.nativeOrder()); + fragmentBuff = ByteBuffer.allocateDirect(store.getPageSize()) + .order(ByteOrder.nativeOrder()); + } + + /** {@inheritDoc */ + @Override protected CacheDataRow onNext() throws IgniteCheckedException { + if (finished && rows.isEmpty()) + throw new NoSuchElementException("[partId=" + partId + ", store=" + store + ", skipPages=" + skipPages + ']'); + + return rows.poll(); + } + + /** {@inheritDoc */ + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (finished && rows.isEmpty()) + return false; + + try { + while (rows.isEmpty() && ++currIdx < 2 * pages) { + int pageIdx = currIdx % pages; + boolean firstScan = currIdx < pages; + + if (skipPages.check(pageIdx)) + continue; + + locBuff.clear(); + + long pageId = PageIdUtils.pageId(partId, PageIdAllocator.FLAG_DATA, pageIdx); + boolean success = store.read(pageId, locBuff, true); + + assert success : PageIdUtils.toDetailString(pageId); + + // Skip not data pages. + if (firstScan && PageIO.getType(locBuff) != T_DATA) { + skipPages.touch(pageIdx); + + continue; + } + + long pageAddr = GridUnsafe.bufferAddress(locBuff); + + DataPageIO io = PageIO.getPageIO(T_DATA, PageIO.getVersion(locBuff)); + int freeSpace = io.getFreeSpace(pageAddr); + int rowsCnt = io.getDirectCount(pageAddr); + + if (firstScan && rowsCnt == 0) { + skipPages.touch(pageIdx); + + continue; + } + + // For pages which contains only the incomplete fragment of a data row + // the rowsCnt will always be equal to 1. Skip such pages and read them + // on the second iteration. Review comment: Could we simplify this to something like `Since the is no special marker for initial pages, but the goal is to detect all initial pages, marking all pages were referenced from others to keep only initial pages unmarked.` ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java ########## @@ -1271,6 +1347,198 @@ static void copy(FileIOFactory factory, File from, File to, long length) { } } + /** */ + private static class PageScanIterator extends GridCloseableIteratorAdapter<CacheDataRow> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Page store to iterate over. */ + @GridToStringExclude + private final PageStore store; + + /** Page store partition id. */ + private final int partId; + + /** Grid cache shared context. */ + private final GridCacheSharedContext<?, ?> sctx; + + /** Cache object context for key/value deserialization. */ + private final CacheObjectContext coctx; + + /** Buffer to read pages. */ + private final ByteBuffer locBuff; + + /** Buffer to read the rest part of fragmented rows. */ + private final ByteBuffer fragmentBuff; + + /** Total pages in the page store. */ + private final int pages; + + /** Pages which must be skipped at the second iteration. */ + private final AtomicBitSet skipPages; + + /** Batch of rows read through iteration. */ + private final Deque<CacheDataRow> rows = new LinkedList<>(); + + /** {@code true} if the iteration reached its end. */ + private boolean finished; + + /** + * Current partition page index for read. Due to we read the partition twice it + * can't be greater that 2 * store.size(). + */ + private int currIdx = -1; + + /** + * @param coctx Cache object context. + * @param store Page store to read. + * @param partId Partition id. + * @throws IgniteCheckedException If fails. + */ + public PageScanIterator( + GridCacheSharedContext<?, ?> sctx, + CacheObjectContext coctx, + PageStore store, + int partId + ) throws IgniteCheckedException { + // CacheGroupContext may be null. It means that this cache group is under eviction for now. + // Since the persisted cache groups and their partitions are guarded by external machinery we + // can avoid it here. + this.store = store; + this.partId = partId; + this.coctx = coctx; + this.sctx = sctx; + + store.sync(); + pages = store.pages(); + skipPages = new AtomicBitSet(pages); + + locBuff = ByteBuffer.allocateDirect(store.getPageSize()) + .order(ByteOrder.nativeOrder()); + fragmentBuff = ByteBuffer.allocateDirect(store.getPageSize()) + .order(ByteOrder.nativeOrder()); + } + + /** {@inheritDoc */ + @Override protected CacheDataRow onNext() throws IgniteCheckedException { + if (finished && rows.isEmpty()) + throw new NoSuchElementException("[partId=" + partId + ", store=" + store + ", skipPages=" + skipPages + ']'); + + return rows.poll(); + } + + /** {@inheritDoc */ + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (finished && rows.isEmpty()) + return false; + + try { + while (rows.isEmpty() && ++currIdx < 2 * pages) { + int pageIdx = currIdx % pages; + boolean firstScan = currIdx < pages; + + if (skipPages.check(pageIdx)) + continue; + + locBuff.clear(); + + long pageId = PageIdUtils.pageId(partId, PageIdAllocator.FLAG_DATA, pageIdx); + boolean success = store.read(pageId, locBuff, true); + + assert success : PageIdUtils.toDetailString(pageId); + + // Skip not data pages. + if (firstScan && PageIO.getType(locBuff) != T_DATA) { + skipPages.touch(pageIdx); + + continue; + } + + long pageAddr = GridUnsafe.bufferAddress(locBuff); + + DataPageIO io = PageIO.getPageIO(T_DATA, PageIO.getVersion(locBuff)); + int freeSpace = io.getFreeSpace(pageAddr); + int rowsCnt = io.getDirectCount(pageAddr); + + if (firstScan && rowsCnt == 0) { + skipPages.touch(pageIdx); + + continue; + } + + // For pages which contains only the incomplete fragment of a data row + // the rowsCnt will always be equal to 1. Skip such pages and read them + // on the second iteration. + if (firstScan && freeSpace == 0 && rowsCnt == 1) { + DataPagePayload payload = io.readPayload(pageAddr, 0, locBuff.capacity()); + + long link = payload.nextLink(); + + if (link != 0) + skipPages.touch(PageIdUtils.pageIndex(PageIdUtils.pageId(link))); + + continue; + } + + skipPages.touch(pageIdx); + + for (int itemId = 0; itemId < rowsCnt; itemId++) { + DataRow row = new DataRow(); + + row.partition(partId); + + row.initFromPageBuffer( + sctx, + coctx, + new IgniteThrowableFunction<Long, ByteBuffer>() { + @Override public ByteBuffer apply(Long nextPageId) throws IgniteCheckedException { + fragmentBuff.clear(); + + boolean read = store.read(nextPageId, fragmentBuff, true); + + assert read : nextPageId; + + // Fragment of page has been read, might be skipped further. + skipPages.touch(PageIdUtils.pageIndex(nextPageId)); Review comment: Looks like this page already at `skipPages` Removing this we able to use regular BitSet instead of the atomic one? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AtomicBitSet.java ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.util.concurrent.atomic.AtomicIntegerArray; + +/** + * Atomic bitset array. + */ +class AtomicBitSet { Review comment: Do we really need BitSet to be atomic here, how many threads use them when the snapshot is loading? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AtomicBitSet.java ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.util.concurrent.atomic.AtomicIntegerArray; + +/** + * Atomic bitset array. + */ +class AtomicBitSet { + /** + * Container of bits. + */ + private final AtomicIntegerArray arr; + + /** + * Size of array of bits. + */ + private final int size; + + /** + * @param size Size of array. + */ + public AtomicBitSet(int size) { + this.size = size; + + arr = new AtomicIntegerArray((size + 31) >>> 5); + } + + /** + * @return Size of bitset in bits. + */ + public int size() { + return size; + } + + /** + * @param idx Index in bitset array. + * @return {@code true} if the bit is set. + */ + public boolean check(long idx) { + if (idx >= size) + return false; + + int bit = 1 << idx; + int bucket = (int)(idx >>> 5); + + int cur = arr.get(bucket); + + return (cur & bit) == bit; + } + + /** + * @param off Bit position to change. + * @return {@code true} if bit has been set, + * {@code false} if bit changed by another thread or out of range. + */ + public boolean touch(long off) { Review comment: touch result never checked at "new code" ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java ########## @@ -223,6 +228,66 @@ public static String formatUpdateCountersDiff(IgniteEx ig, List<Integer> diff) { return diff; } + /** + * @param updCntr Partition update counter prior check. + * @param grpId Group id. + * @param partId Partition id. + * @param grpName Group name. + * @param consId Local node consistent id. + * @param state Partition state to check. + * @param isPrimary {@code true} if partition is primary. + * @param partSize Partition size on disk. + * @param it Iterator though partition data rows. + * @throws IgniteCheckedException If fails. + * @return Map of calculated partition. + */ + public static Map<PartitionKeyV2, PartitionHashRecordV2> calculatePartitionHash( + long updCntr, + int grpId, + int partId, + String grpName, + Object consId, + GridDhtPartitionState state, + boolean isPrimary, + long partSize, + GridIterator<CacheDataRow> it + ) throws IgniteCheckedException { + int partHash = 0; + + PartitionKeyV2 partKey = new PartitionKeyV2(grpId, partId, grpName); + + if (state == GridDhtPartitionState.MOVING || state == GridDhtPartitionState.LOST) { + PartitionHashRecordV2 movingHashRecord = new PartitionHashRecordV2( + partKey, + isPrimary, + consId, + partHash, + updCntr, + state == GridDhtPartitionState.MOVING ? PartitionHashRecordV2.MOVING_PARTITION_SIZE : 0, + state == GridDhtPartitionState.MOVING ? PartitionHashRecordV2.PartitionState.MOVING : PartitionHashRecordV2.PartitionState.LOST + ); + + return Collections.singletonMap(partKey, movingHashRecord); + } + + if (state != GridDhtPartitionState.OWNING) + return emptyMap(); + + while (it.hasNextX()) { + CacheDataRow row = it.nextX(); + + partHash += row.key().hashCode(); + + // Object context may be null since the value bytes have been read directly from page. Review comment: `may be null` -> `is not required` ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java ########## @@ -1271,6 +1347,198 @@ static void copy(FileIOFactory factory, File from, File to, long length) { } } + /** */ + private static class PageScanIterator extends GridCloseableIteratorAdapter<CacheDataRow> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Page store to iterate over. */ + @GridToStringExclude + private final PageStore store; + + /** Page store partition id. */ + private final int partId; + + /** Grid cache shared context. */ + private final GridCacheSharedContext<?, ?> sctx; + + /** Cache object context for key/value deserialization. */ + private final CacheObjectContext coctx; + + /** Buffer to read pages. */ + private final ByteBuffer locBuff; + + /** Buffer to read the rest part of fragmented rows. */ + private final ByteBuffer fragmentBuff; + + /** Total pages in the page store. */ + private final int pages; + + /** Pages which must be skipped at the second iteration. */ + private final AtomicBitSet skipPages; + + /** Batch of rows read through iteration. */ + private final Deque<CacheDataRow> rows = new LinkedList<>(); + + /** {@code true} if the iteration reached its end. */ + private boolean finished; + + /** + * Current partition page index for read. Due to we read the partition twice it + * can't be greater that 2 * store.size(). + */ + private int currIdx = -1; + + /** + * @param coctx Cache object context. + * @param store Page store to read. + * @param partId Partition id. + * @throws IgniteCheckedException If fails. + */ + public PageScanIterator( + GridCacheSharedContext<?, ?> sctx, + CacheObjectContext coctx, + PageStore store, + int partId + ) throws IgniteCheckedException { + // CacheGroupContext may be null. It means that this cache group is under eviction for now. + // Since the persisted cache groups and their partitions are guarded by external machinery we + // can avoid it here. + this.store = store; + this.partId = partId; + this.coctx = coctx; + this.sctx = sctx; + + store.sync(); + pages = store.pages(); + skipPages = new AtomicBitSet(pages); + + locBuff = ByteBuffer.allocateDirect(store.getPageSize()) + .order(ByteOrder.nativeOrder()); + fragmentBuff = ByteBuffer.allocateDirect(store.getPageSize()) + .order(ByteOrder.nativeOrder()); + } + + /** {@inheritDoc */ + @Override protected CacheDataRow onNext() throws IgniteCheckedException { + if (finished && rows.isEmpty()) + throw new NoSuchElementException("[partId=" + partId + ", store=" + store + ", skipPages=" + skipPages + ']'); + + return rows.poll(); + } + + /** {@inheritDoc */ + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (finished && rows.isEmpty()) + return false; + + try { + while (rows.isEmpty() && ++currIdx < 2 * pages) { + int pageIdx = currIdx % pages; + boolean firstScan = currIdx < pages; + + if (skipPages.check(pageIdx)) + continue; + + locBuff.clear(); + + long pageId = PageIdUtils.pageId(partId, PageIdAllocator.FLAG_DATA, pageIdx); + boolean success = store.read(pageId, locBuff, true); + + assert success : PageIdUtils.toDetailString(pageId); + + // Skip not data pages. + if (firstScan && PageIO.getType(locBuff) != T_DATA) { + skipPages.touch(pageIdx); + + continue; + } + + long pageAddr = GridUnsafe.bufferAddress(locBuff); + + DataPageIO io = PageIO.getPageIO(T_DATA, PageIO.getVersion(locBuff)); + int freeSpace = io.getFreeSpace(pageAddr); + int rowsCnt = io.getDirectCount(pageAddr); + + if (firstScan && rowsCnt == 0) { + skipPages.touch(pageIdx); + + continue; + } + + // For pages which contains only the incomplete fragment of a data row + // the rowsCnt will always be equal to 1. Skip such pages and read them + // on the second iteration. + if (firstScan && freeSpace == 0 && rowsCnt == 1) { + DataPagePayload payload = io.readPayload(pageAddr, 0, locBuff.capacity()); + + long link = payload.nextLink(); + + if (link != 0) + skipPages.touch(PageIdUtils.pageIndex(PageIdUtils.pageId(link))); + + continue; + } + + skipPages.touch(pageIdx); + + for (int itemId = 0; itemId < rowsCnt; itemId++) { + DataRow row = new DataRow(); + + row.partition(partId); + + row.initFromPageBuffer( + sctx, + coctx, + new IgniteThrowableFunction<Long, ByteBuffer>() { + @Override public ByteBuffer apply(Long nextPageId) throws IgniteCheckedException { + fragmentBuff.clear(); + + boolean read = store.read(nextPageId, fragmentBuff, true); + + assert read : nextPageId; + + // Fragment of page has been read, might be skipped further. + skipPages.touch(PageIdUtils.pageIndex(nextPageId)); + + return fragmentBuff; + } + }, + locBuff, + io, + itemId, + false, + CacheDataRowAdapter.RowData.FULL, + false); + + rows.add(row); + } + } + + if (currIdx == 2 * pages) { + finished = true; Review comment: Should be volatile in case more than one thread can use this method. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java ########## @@ -436,7 +436,7 @@ private boolean checkCount(int cnt) { * @param pageAddr Page address. * @return Indirect count. */ - private int getIndirectCount(long pageAddr) { + public int getIndirectCount(long pageAddr) { Review comment: It is not a good idea to make something public for tests only ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java ########## @@ -1271,6 +1347,198 @@ static void copy(FileIOFactory factory, File from, File to, long length) { } } + /** */ + private static class PageScanIterator extends GridCloseableIteratorAdapter<CacheDataRow> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Page store to iterate over. */ + @GridToStringExclude + private final PageStore store; + + /** Page store partition id. */ + private final int partId; + + /** Grid cache shared context. */ + private final GridCacheSharedContext<?, ?> sctx; + + /** Cache object context for key/value deserialization. */ + private final CacheObjectContext coctx; + + /** Buffer to read pages. */ + private final ByteBuffer locBuff; + + /** Buffer to read the rest part of fragmented rows. */ + private final ByteBuffer fragmentBuff; + + /** Total pages in the page store. */ + private final int pages; + + /** Pages which must be skipped at the second iteration. */ + private final AtomicBitSet skipPages; + + /** Batch of rows read through iteration. */ + private final Deque<CacheDataRow> rows = new LinkedList<>(); + + /** {@code true} if the iteration reached its end. */ + private boolean finished; + + /** + * Current partition page index for read. Due to we read the partition twice it + * can't be greater that 2 * store.size(). + */ + private int currIdx = -1; + + /** + * @param coctx Cache object context. + * @param store Page store to read. + * @param partId Partition id. + * @throws IgniteCheckedException If fails. + */ + public PageScanIterator( + GridCacheSharedContext<?, ?> sctx, + CacheObjectContext coctx, + PageStore store, + int partId + ) throws IgniteCheckedException { + // CacheGroupContext may be null. It means that this cache group is under eviction for now. + // Since the persisted cache groups and their partitions are guarded by external machinery we + // can avoid it here. + this.store = store; + this.partId = partId; + this.coctx = coctx; + this.sctx = sctx; + + store.sync(); + pages = store.pages(); + skipPages = new AtomicBitSet(pages); + + locBuff = ByteBuffer.allocateDirect(store.getPageSize()) + .order(ByteOrder.nativeOrder()); + fragmentBuff = ByteBuffer.allocateDirect(store.getPageSize()) + .order(ByteOrder.nativeOrder()); + } + + /** {@inheritDoc */ + @Override protected CacheDataRow onNext() throws IgniteCheckedException { + if (finished && rows.isEmpty()) + throw new NoSuchElementException("[partId=" + partId + ", store=" + store + ", skipPages=" + skipPages + ']'); + + return rows.poll(); + } + + /** {@inheritDoc */ + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (finished && rows.isEmpty()) + return false; + + try { + while (rows.isEmpty() && ++currIdx < 2 * pages) { + int pageIdx = currIdx % pages; + boolean firstScan = currIdx < pages; + + if (skipPages.check(pageIdx)) + continue; + + locBuff.clear(); + + long pageId = PageIdUtils.pageId(partId, PageIdAllocator.FLAG_DATA, pageIdx); + boolean success = store.read(pageId, locBuff, true); + + assert success : PageIdUtils.toDetailString(pageId); + + // Skip not data pages. + if (firstScan && PageIO.getType(locBuff) != T_DATA) { + skipPages.touch(pageIdx); + + continue; + } + + long pageAddr = GridUnsafe.bufferAddress(locBuff); + + DataPageIO io = PageIO.getPageIO(T_DATA, PageIO.getVersion(locBuff)); + int freeSpace = io.getFreeSpace(pageAddr); + int rowsCnt = io.getDirectCount(pageAddr); + + if (firstScan && rowsCnt == 0) { + skipPages.touch(pageIdx); + + continue; + } + + // For pages which contains only the incomplete fragment of a data row + // the rowsCnt will always be equal to 1. Skip such pages and read them + // on the second iteration. + if (firstScan && freeSpace == 0 && rowsCnt == 1) { + DataPagePayload payload = io.readPayload(pageAddr, 0, locBuff.capacity()); + + long link = payload.nextLink(); + + if (link != 0) + skipPages.touch(PageIdUtils.pageIndex(PageIdUtils.pageId(link))); + + continue; + } + + skipPages.touch(pageIdx); Review comment: Should we have "assert !firstScan" here? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java ########## @@ -1271,6 +1347,198 @@ static void copy(FileIOFactory factory, File from, File to, long length) { } } + /** */ + private static class PageScanIterator extends GridCloseableIteratorAdapter<CacheDataRow> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Page store to iterate over. */ + @GridToStringExclude + private final PageStore store; + + /** Page store partition id. */ + private final int partId; + + /** Grid cache shared context. */ + private final GridCacheSharedContext<?, ?> sctx; + + /** Cache object context for key/value deserialization. */ + private final CacheObjectContext coctx; + + /** Buffer to read pages. */ + private final ByteBuffer locBuff; + + /** Buffer to read the rest part of fragmented rows. */ + private final ByteBuffer fragmentBuff; + + /** Total pages in the page store. */ + private final int pages; + + /** Pages which must be skipped at the second iteration. */ + private final AtomicBitSet skipPages; + + /** Batch of rows read through iteration. */ + private final Deque<CacheDataRow> rows = new LinkedList<>(); + + /** {@code true} if the iteration reached its end. */ + private boolean finished; + + /** + * Current partition page index for read. Due to we read the partition twice it + * can't be greater that 2 * store.size(). + */ + private int currIdx = -1; + + /** + * @param coctx Cache object context. + * @param store Page store to read. + * @param partId Partition id. + * @throws IgniteCheckedException If fails. + */ + public PageScanIterator( + GridCacheSharedContext<?, ?> sctx, + CacheObjectContext coctx, + PageStore store, + int partId + ) throws IgniteCheckedException { + // CacheGroupContext may be null. It means that this cache group is under eviction for now. + // Since the persisted cache groups and their partitions are guarded by external machinery we + // can avoid it here. + this.store = store; + this.partId = partId; + this.coctx = coctx; + this.sctx = sctx; + + store.sync(); + pages = store.pages(); + skipPages = new AtomicBitSet(pages); + + locBuff = ByteBuffer.allocateDirect(store.getPageSize()) + .order(ByteOrder.nativeOrder()); + fragmentBuff = ByteBuffer.allocateDirect(store.getPageSize()) + .order(ByteOrder.nativeOrder()); + } + + /** {@inheritDoc */ + @Override protected CacheDataRow onNext() throws IgniteCheckedException { + if (finished && rows.isEmpty()) + throw new NoSuchElementException("[partId=" + partId + ", store=" + store + ", skipPages=" + skipPages + ']'); + + return rows.poll(); + } + + /** {@inheritDoc */ + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (finished && rows.isEmpty()) + return false; + + try { + while (rows.isEmpty() && ++currIdx < 2 * pages) { + int pageIdx = currIdx % pages; + boolean firstScan = currIdx < pages; + + if (skipPages.check(pageIdx)) + continue; + + locBuff.clear(); + + long pageId = PageIdUtils.pageId(partId, PageIdAllocator.FLAG_DATA, pageIdx); + boolean success = store.read(pageId, locBuff, true); + + assert success : PageIdUtils.toDetailString(pageId); + + // Skip not data pages. + if (firstScan && PageIO.getType(locBuff) != T_DATA) { + skipPages.touch(pageIdx); + + continue; + } + + long pageAddr = GridUnsafe.bufferAddress(locBuff); + + DataPageIO io = PageIO.getPageIO(T_DATA, PageIO.getVersion(locBuff)); + int freeSpace = io.getFreeSpace(pageAddr); + int rowsCnt = io.getDirectCount(pageAddr); + + if (firstScan && rowsCnt == 0) { Review comment: // Skip empty page. comment will be useful ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
