Mmuzaf commented on a change in pull request #8767:
URL: https://github.com/apache/ignite/pull/8767#discussion_r591748350
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1271,6 +1352,230 @@ static void copy(FileIOFactory factory, File from, File
to, long length) {
}
}
+ /**
+ * Ves pokrit assertami absolutely ves,
+ * PageScan iterator in the ignite core est.
+ */
+ private static class PageScanIterator extends
GridCloseableIteratorAdapter<CacheDataRow> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Page store to iterate over. */
+ @GridToStringExclude
+ private final PageStore store;
+
+ /** Page store partition id. */
+ private final int partId;
+
+ /** Grid cache shared context. */
+ private final GridCacheSharedContext<?, ?> sctx;
+
+ /** Cache object context for key/value deserialization. */
+ private final CacheObjectContext coctx;
+
+ /** Buffer to read pages. */
+ private final ByteBuffer locBuff;
+
+ /** Buffer to read the rest part of fragmented rows. */
+ private final ByteBuffer fragmentBuff;
+
+ /** Total pages in the page store. */
+ private final int pages;
+
+ /** Pages which already marked and postponed to be read on the second
iteration. */
+ private final BitSet markedPages;
+
+ /** Pages which already read and must be skipped. */
+ private final BitSet readPages;
+
+ /** Batch of rows read through iteration. */
+ private final Deque<CacheDataRow> rows = new LinkedList<>();
+
+ /** {@code true} if the iteration though partition reached its end. */
+ private boolean secondScanComplete;
+
+ /**
+ * Current partition page index for read. Due to we read the partition
twice it
+ * can't be greater that 2 * store.size().
+ */
+ private int currIdx;
+
+ /**
+ * During scanning a cache partition presented as {@code PageStore} we
must guarantee the following:
+ * all the pages of this storage remains unchanged during the Iterator
remains opened, the stored data
+ * keeps its consistency. We can't read the {@code PageStore} during
an ongoing checkpoint over it.
+ *
+ * @param coctx Cache object context.
+ * @param store Page store to read.
+ * @param partId Partition id.
+ * @throws IgniteCheckedException If fails.
+ */
+ public PageScanIterator(
+ GridCacheSharedContext<?, ?> sctx,
+ CacheObjectContext coctx,
+ PageStore store,
+ int partId
+ ) throws IgniteCheckedException {
+ this.store = store;
+ this.partId = partId;
+ this.coctx = coctx;
+ this.sctx = sctx;
+
+ store.ensure();
+ pages = store.pages();
+ markedPages = new BitSet(pages);
+ readPages = new BitSet(pages);
+
+ locBuff = ByteBuffer.allocateDirect(store.getPageSize())
+ .order(ByteOrder.nativeOrder());
+ fragmentBuff = ByteBuffer.allocateDirect(store.getPageSize())
+ .order(ByteOrder.nativeOrder());
+ }
+
+ /** {@inheritDoc */
+ @Override protected CacheDataRow onNext() throws
IgniteCheckedException {
+ if (secondScanComplete && rows.isEmpty())
+ throw new NoSuchElementException("[partId=" + partId + ",
store=" + store + ", skipPages=" + readPages + ']');
+
+ return rows.poll();
+ }
+
+ /** {@inheritDoc */
+ @Override protected boolean onHasNext() throws IgniteCheckedException {
+ if (secondScanComplete && rows.isEmpty())
+ return false;
+
+ try {
+ for (; currIdx < 2 * pages && rows.isEmpty(); currIdx++) {
+ boolean first = currIdx < pages;
+ int pageIdx = currIdx % pages;
+
+ if (readPages.get(pageIdx) || (!first &&
markedPages.get(pageIdx)))
+ continue;
+
+ if (!readPageFromStore(pageId(partId, FLAG_DATA, pageIdx),
locBuff)) {
+ // Skip not FLAG_DATA pages.
+ changeBit(readPages, pageIdx);
+
+ continue;
+ }
+
+ long pageAddr = bufferAddress(locBuff);
+ DataPageIO io = getPageIO(T_DATA, getVersion(pageAddr));
+ int freeSpace = io.getFreeSpace(pageAddr);
+ int rowsCnt = io.getDirectCount(pageAddr);
+
+ if (first) {
+ // Skip empty pages.
+ if (rowsCnt == 0) {
+ changeBit(readPages, pageIdx);
+
+ continue;
+ }
+
+ // There is no difference between a page containing an
incomplete DataRow fragment and
+ // the page where DataRow takes up all the free space.
There is no a dedicated
+ // flag for this case in page header.
+ // During the storage scan we can skip such pages at
the first iteration over the partition file,
+ // since all the fragmented pages will be marked by
BitSet array we will safely read the others
+ // on the second iteration.
+ if (freeSpace == 0 && rowsCnt == 1) {
+ DataPagePayload payload = io.readPayload(pageAddr,
0, locBuff.capacity());
+
+ long link = payload.nextLink();
+
+ if (link != 0)
+ changeBit(markedPages,
pageIndex(pageId(link)));
+
+ continue;
+ }
+ }
+
+ changeBit(readPages, pageIdx);
+
+ for (int itemId = 0; itemId < rowsCnt; itemId++) {
+ DataRow row = new DataRow();
+
+ row.partition(partId);
+
+ row.initFromPageBuffer(
+ sctx,
+ coctx,
+ new IgniteThrowableFunction<Long, ByteBuffer>() {
+ @Override public ByteBuffer apply(Long
nextPageId) throws IgniteCheckedException {
+ boolean success =
readPageFromStore(nextPageId, fragmentBuff);
+
+ assert success : "Only FLAG_DATA pages
allowed: " + toDetailString(nextPageId);
+
+ // Fragment of page has been read, might
be skipped further.
+ changeBit(readPages,
pageIndex(nextPageId));
+
+ return fragmentBuff;
+ }
+ },
+ locBuff,
+ io,
+ itemId,
+ false,
+ CacheDataRowAdapter.RowData.FULL,
+ false);
+
+ rows.add(row);
+ }
+ }
+
+ if (currIdx == 2 * pages) {
+ secondScanComplete = true;
+
+ boolean set = true;
+
+ for (int j = 0; j < pages; j++)
+ set &= readPages.get(j);
+
+ assert set : "readPages=" + readPages + ", pages=" + pages;
+ }
+
+ return !rows.isEmpty();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException("Error during iteration
through page store: " + this, e);
+ }
+ }
+
+ /**
+ * @param bitSet BitSet to change bit index.
+ * @param idx Index of bit to change.
+ */
+ private static void changeBit(BitSet bitSet, int idx) {
Review comment:
Fixed.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1271,6 +1352,230 @@ static void copy(FileIOFactory factory, File from, File
to, long length) {
}
}
+ /**
+ * Ves pokrit assertami absolutely ves,
+ * PageScan iterator in the ignite core est.
+ */
+ private static class PageScanIterator extends
GridCloseableIteratorAdapter<CacheDataRow> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Page store to iterate over. */
+ @GridToStringExclude
+ private final PageStore store;
+
+ /** Page store partition id. */
+ private final int partId;
+
+ /** Grid cache shared context. */
+ private final GridCacheSharedContext<?, ?> sctx;
+
+ /** Cache object context for key/value deserialization. */
+ private final CacheObjectContext coctx;
+
+ /** Buffer to read pages. */
+ private final ByteBuffer locBuff;
+
+ /** Buffer to read the rest part of fragmented rows. */
+ private final ByteBuffer fragmentBuff;
+
+ /** Total pages in the page store. */
+ private final int pages;
+
+ /** Pages which already marked and postponed to be read on the second
iteration. */
+ private final BitSet markedPages;
Review comment:
Fixed.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1271,6 +1352,230 @@ static void copy(FileIOFactory factory, File from, File
to, long length) {
}
}
+ /**
+ * Ves pokrit assertami absolutely ves,
+ * PageScan iterator in the ignite core est.
+ */
+ private static class PageScanIterator extends
GridCloseableIteratorAdapter<CacheDataRow> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Page store to iterate over. */
+ @GridToStringExclude
+ private final PageStore store;
+
+ /** Page store partition id. */
+ private final int partId;
+
+ /** Grid cache shared context. */
+ private final GridCacheSharedContext<?, ?> sctx;
+
+ /** Cache object context for key/value deserialization. */
+ private final CacheObjectContext coctx;
+
+ /** Buffer to read pages. */
+ private final ByteBuffer locBuff;
+
+ /** Buffer to read the rest part of fragmented rows. */
+ private final ByteBuffer fragmentBuff;
+
+ /** Total pages in the page store. */
+ private final int pages;
+
+ /** Pages which already marked and postponed to be read on the second
iteration. */
Review comment:
Fixed.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1271,6 +1352,230 @@ static void copy(FileIOFactory factory, File from, File
to, long length) {
}
}
+ /**
+ * Ves pokrit assertami absolutely ves,
+ * PageScan iterator in the ignite core est.
+ */
+ private static class PageScanIterator extends
GridCloseableIteratorAdapter<CacheDataRow> {
Review comment:
Fixed.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1113,6 +1142,58 @@ private static String snapshotMetaFileName(String
consId) {
return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT;
}
+ /**
+ * @param snpName Snapshot name.
+ * @param folderName The node folder name, usually it's the same as the
U.maskForFileName(consistentId).
+ * @param grpName Cache group name.
+ * @param partId Partition id.
+ * @return Iterator over partition.
+ */
+ public GridCloseableIterator<CacheDataRow> partitionRows(String snpName,
Review comment:
Fixed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]