tkalkirill commented on code in PR #947:
URL: https://github.com/apache/ignite-3/pull/947#discussion_r928619704


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java:
##########
@@ -166,503 +152,196 @@ public int pages() {
     public void pages(int pageCount) {
         assert pageCount >= 0 : pageCount;
 
-        this.pageCount.set(pageCount);
+        this.pageCount = pageCount;
     }
 
     /**
-     * Reads a page, unlike {@link #read(long, ByteBuffer, boolean)}, checks 
the page offset in the file not logically (pageOffset <= {@link
-     * #pages()} * {@link #pageSize}) but physically (pageOffset <= {@link 
#size()}), which can affect performance when used in production
-     * code.
+     * Reads a page, unlike {@link #read(long, ByteBuffer, boolean)}, does not 
check the {@code pageId} so that its {@code pageIdx} is not
+     * greater than the {@link #pages() number of allocated pages}.
      *
      * @param pageId Page ID.
      * @param pageBuf Page buffer to read into.
      * @param keepCrc By default, reading zeroes CRC which was on page store, 
but you can keep it in {@code pageBuf} if set {@code
      * keepCrc}.
+     * @return {@code True} if the page was read successfully.
      * @throws IgniteInternalCheckedException If reading failed (IO error 
occurred).
      */
-    public void readByPhysicalOffset(long pageId, ByteBuffer pageBuf, boolean 
keepCrc) throws IgniteInternalCheckedException {
-        read0(pageId, pageBuf, !skipCrc, keepCrc, false);
+    public boolean readWithoutPageIdCheck(long pageId, ByteBuffer pageBuf, 
boolean keepCrc) throws IgniteInternalCheckedException {
+        for (DeltaFilePageStoreIo deltaFilePageStoreIo : 
deltaFilePageStoreIos) {
+            long pageOff = deltaFilePageStoreIo.pageOffset(pageId);
+
+            if (pageOff >= 0) {
+                return deltaFilePageStoreIo.read(pageId, pageOff, pageBuf, 
keepCrc);
+            }
+        }
+
+        return filePageStoreIo.read(pageId, 
filePageStoreIo.pageOffset(pageId), pageBuf, keepCrc);
     }
 
     /** {@inheritDoc} */
     @Override
     public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws 
IgniteInternalCheckedException {
-        read0(pageId, pageBuf, !skipCrc, keepCrc, true);
-    }
-
-    /**
-     * Reads a page from the page store.
-     *
-     * @param pageId Page ID.
-     * @param pageBuf Page buffer to read into.
-     * @param checkCrc Check CRC on page.
-     * @param keepCrc By default reading zeroes CRC which was on file, but you 
can keep it in pageBuf if set keepCrc.
-     * @param checkPageOffsetLogically Check page offset by {@link 
#allocatedBytes} or {@link #size}.
-     * @throws IgniteInternalCheckedException If reading failed (IO error 
occurred).
-     */
-    private void read0(
-            long pageId,
-            ByteBuffer pageBuf,
-            boolean checkCrc,
-            boolean keepCrc,
-            boolean checkPageOffsetLogically
-    ) throws IgniteInternalCheckedException {
-        ensure();
-
-        try {
-            assert pageBuf.capacity() == pageSize : pageBuf.capacity();
-            assert pageBuf.remaining() == pageSize : pageBuf.remaining();
-            assert pageBuf.position() == 0 : pageBuf.position();
-            assert pageBuf.order() == nativeOrder() : pageBuf.order();
-
-            long pageOff = pageOffset(pageId);
-
-            if (checkPageOffsetLogically) {
-                assert pageOff <= allocatedBytes() : "calculatedOffset=" + 
pageOff
-                        + ", allocated=" + allocatedBytes() + ", headerSize=" 
+ headerSize + ", filePath=" + filePath;
-            } else {
-                assert pageOff <= size() : "calculatedOffset=" + pageOff
-                        + ", size=" + size() + ", headerSize=" + headerSize + 
", filePath=" + filePath;
-            }
-
-            int n = readWithFailover(pageBuf, pageOff);
+        assert pageIndex(pageId) <= pageCount : "pageIdx=" + pageIndex(pageId) 
+ ", pageCount=" + pageCount;
 
-            // If page was not written yet, nothing to read.
-            if (n < 0) {
-                pageBuf.put(new byte[pageBuf.remaining()]);
-            }
+        for (DeltaFilePageStoreIo deltaFilePageStoreIo : 
deltaFilePageStoreIos) {
+            long pageOff = deltaFilePageStoreIo.pageOffset(pageId);
 
-            int savedCrc32 = PageIo.getCrc(pageBuf);
-
-            PageIo.setCrc(pageBuf, 0);
-
-            pageBuf.position(0);
-
-            if (checkCrc) {
-                int curCrc32 = FastCrc.calcCrc(pageBuf, pageSize);
-
-                if ((savedCrc32 ^ curCrc32) != 0) {
-                    throw new 
IgniteInternalDataIntegrityViolationException("Failed to read page (CRC 
validation failed) "
-                            + "[id=" + hexLong(pageId) + ", off=" + pageOff
-                            + ", filePath=" + filePath + ", fileSize=" + 
fileIo.size()
-                            + ", savedCrc=" + hexInt(savedCrc32) + ", curCrc=" 
+ hexInt(curCrc32)
-                            + ", page=" + toHexString(pageBuf) + "]");
-                }
-            }
+            if (pageOff >= 0) {
+                deltaFilePageStoreIo.read(pageId, pageOff, pageBuf, keepCrc);
 
-            assert PageIo.getCrc(pageBuf) == 0;
-
-            if (keepCrc) {
-                PageIo.setCrc(pageBuf, savedCrc32);
+                return;
             }
-        } catch (IOException e) {
-            throw new IgniteInternalCheckedException("Failed to read page 
[file=" + filePath + ", pageId=" + pageId + "]", e);
         }
+
+        filePageStoreIo.read(pageId, filePageStoreIo.pageOffset(pageId), 
pageBuf, keepCrc);
     }
 
     /** {@inheritDoc} */
     @Override
     public void write(long pageId, ByteBuffer pageBuf, boolean calculateCrc) 
throws IgniteInternalCheckedException {
-        ensure();
-
-        boolean interrupted = false;
-
-        while (true) {
-            FileIo fileIo = this.fileIo;
-
-            try {
-                readWriteLock.readLock().lock();
-
-                try {
-                    assert pageBuf.position() == 0 : pageBuf.position();
-                    assert pageBuf.order() == nativeOrder() : "Page buffer 
order " + pageBuf.order()
-                            + " should be same with " + nativeOrder();
-                    assert PageIo.getType(pageBuf) != 0 : "Invalid state. Type 
is 0! pageId = " + hexLong(pageId);
-                    assert PageIo.getVersion(pageBuf) != 0 : "Invalid state. 
Version is 0! pageId = " + hexLong(pageId);
-
-                    if (calculateCrc && !skipCrc) {
-                        assert PageIo.getCrc(pageBuf) == 0 : hexLong(pageId);
-
-                        PageIo.setCrc(pageBuf, calcCrc32(pageBuf, pageSize));
-                    }
-
-                    // Check whether crc was calculated somewhere above the 
stack if it is forcibly skipped.
-                    assert skipCrc || PageIo.getCrc(pageBuf) != 0
-                            || calcCrc32(pageBuf, pageSize) == 0 : "CRC hasn't 
been calculated, crc=0";
-
-                    assert pageBuf.position() == 0 : pageBuf.position();
-
-                    long pageOff = pageOffset(pageId);
-
-                    assert pageOff <= allocatedBytes() : "calculatedOffset=" + 
pageOff
-                            + ", allocated=" + allocatedBytes() + ", 
headerSize=" + headerSize + ", filePath=" + filePath;
-
-                    fileIo.writeFully(pageBuf, pageOff);
-
-                    PageIo.setCrc(pageBuf, 0);
-
-                    if (interrupted) {
-                        Thread.currentThread().interrupt();
-                    }
-
-                    return;
-                } finally {
-                    readWriteLock.readLock().unlock();
-                }
-            } catch (IOException e) {
-                if (e instanceof ClosedChannelException) {
-                    try {
-                        if (e instanceof ClosedByInterruptException) {
-                            interrupted = true;
-
-                            Thread.interrupted();
-                        }
-
-                        reinit(fileIo);
-
-                        pageBuf.position(0);
-
-                        PageIo.setCrc(pageBuf, 0);
+        assert pageIndex(pageId) <= pageCount : "pageIdx=" + pageIndex(pageId) 
+ ", pageCount=" + pageCount;
 
-                        continue;
-                    } catch (IOException e0) {
-                        e0.addSuppressed(e);
-
-                        e = e0;
-                    }
-                }
-
-                throw new IgniteInternalCheckedException(
-                        "Failed to write page [filePath=" + filePath + ", 
pageId=" + pageId + "]",
-                        e
-                );
-            }
-        }
+        filePageStoreIo.write(pageId, pageBuf, calculateCrc);
     }
 
     /** {@inheritDoc} */
     @Override
     public void sync() throws IgniteInternalCheckedException {
-        readWriteLock.writeLock().lock();
-
-        try {
-            ensure();
-
-            FileIo fileIo = this.fileIo;
-
-            if (fileIo != null) {
-                fileIo.force();
-            }
-        } catch (IOException e) {
-            throw new IgniteInternalCheckedException("Failed to fsync 
partition file [filePath=" + filePath + ']', e);
-        } finally {
-            readWriteLock.writeLock().unlock();
-        }
+        filePageStoreIo.sync();
     }
 
     /** {@inheritDoc} */
     @Override
     public boolean exists() {
-        if (fileExists == null) {
-            readWriteLock.writeLock().lock();
-
-            try {
-                if (fileExists == null) {
-                    fileExists = Files.exists(filePath) && 
filePath.toFile().length() >= headerSize;
-                }
-            } finally {
-                readWriteLock.writeLock().unlock();
-            }
-        }
-
-        return fileExists;
+        return filePageStoreIo.exists();
     }
 
     /** {@inheritDoc} */
     @Override
     public void ensure() throws IgniteInternalCheckedException {
-        if (!initialized) {
-            readWriteLock.writeLock().lock();
-
-            try {
-                if (!initialized) {
-                    FileIo fileIo = null;
-
-                    IgniteInternalCheckedException err = null;
-
-                    try {
-                        boolean interrupted = false;
-
-                        while (true) {
-                            try {
-                                this.fileIo = fileIo = 
ioFactory.create(filePath, CREATE, READ, WRITE);
-
-                                fileExists = true;
-
-                                if (fileIo.size() < headerSize) {
-                                    fileIo.writeFully(new 
FilePageStoreHeader(version, pageSize).toByteBuffer(), 0);
-                                } else {
-                                    checkHeader(fileIo);
-                                }
-
-                                if (interrupted) {
-                                    Thread.currentThread().interrupt();
-                                }
-
-                                break;
-                            } catch (ClosedByInterruptException e) {
-                                interrupted = true;
-
-                                Thread.interrupted();
-                            }
-                        }
-
-                        initialized = true;
-                    } catch (IOException e) {
-                        err = new IgniteInternalCheckedException("Failed to 
initialize partition file: " + filePath, e);
-
-                        throw err;
-                    } finally {
-                        if (err != null && fileIo != null) {
-                            try {
-                                fileIo.close();
-                            } catch (IOException e) {
-                                err.addSuppressed(e);
-                            }
-                        }
-                    }
-                }
-            } finally {
-                readWriteLock.writeLock().unlock();
-            }
-        }
+        filePageStoreIo.ensure();
     }
 
     /** {@inheritDoc} */
     @Override
     public void close() throws IOException {
-        stop0(false);
+        filePageStoreIo.close();
+
+        for (DeltaFilePageStoreIo deltaFilePageStoreIo : 
deltaFilePageStoreIos) {
+            deltaFilePageStoreIo.close();
+        }
     }
 
     /**
      * Returns size of the page store in bytes.
      *
-     * <p>May differ from {@link #pages} * {@link #pageSize} due to delayed 
writes or due to other implementation specific details.
+     * <p>May differ from {@link #pages} * {@link FilePageStoreIo#pageSize()} 
due to delayed writes or due to other implementation specific
+     * details.
      *
      * @throws IgniteInternalCheckedException If an I/O error occurs.
      */
     public long size() throws IgniteInternalCheckedException {
-        try {
-            FileIo io = fileIo;
-
-            return io == null ? 0 : io.size();
-        } catch (IOException e) {
-            throw new IgniteInternalCheckedException(e);
-        }
+        return filePageStoreIo.size();
     }
 
     /**
-     * Stops file page store.
-     *
-     * @param delete {@code True} to delete file.
-     * @throws IOException If fails.
+     * Returns file page store path.
      */
-    private void stop0(boolean delete) throws IOException {
-        readWriteLock.writeLock().lock();
-
-        try {
-            if (!initialized) {
-                // Ensure the file is closed even if not initialized yet.
-                if (fileIo != null) {
-                    fileIo.close();
-                }
-
-                if (delete && exists()) {
-                    Files.delete(filePath);
-                }
-
-                return;
-            }
-
-            fileIo.force();
-
-            fileIo.close();
-
-            fileIo = null;
-
-            if (delete) {
-                Files.delete(filePath);
-
-                fileExists = false;
-            }
-        } finally {
-            initialized = false;
+    public Path filePath() {
+        return filePageStoreIo.filePath();
+    }
 
-            readWriteLock.writeLock().unlock();
-        }
+    /**
+     * Returns file page store header size.
+     */
+    public int headerSize() {
+        return filePageStoreIo.headerSize();
     }
 
     /**
-     * Gets page offset within the store file.
+     * Sets the new page allocation listener.
      *
-     * @param pageId Page ID.
-     * @return Page offset.
+     * @param listener New page allocation listener.
      */
-    long pageOffset(long pageId) {
-        return (long) PageIdUtils.pageIndex(pageId) * pageSize + headerSize;
+    public void setPageAllocationListener(PageAllocationListener listener) {
+        pageAllocationListener = listener;
     }
 
     /**
-     * Reads from page storage with failover.
+     * Sets the delta file page store factory.
      *
-     * @param destBuf Destination buffer.
-     * @param position Position.
-     * @return Number of read bytes, or {@code -1} if the given position is 
greater than or equal to the file's current size.
+     * @param factory Factory.
      */
-    private int readWithFailover(ByteBuffer destBuf, long position) throws 
IOException {
-        boolean interrupted = false;
-
-        int bufPos = destBuf.position();
-
-        while (true) {
-            FileIo fileIo = this.fileIo;
-
-            if (fileIo == null) {
-                throw new IOException("FileIo has stopped");
-            }
-
-            try {
-                assert destBuf.remaining() > 0;
-
-                int bytesRead = fileIo.readFully(destBuf, position);
-
-                if (interrupted) {
-                    Thread.currentThread().interrupt();
-                }
-
-                return bytesRead;
-            } catch (ClosedChannelException e) {
-                destBuf.position(bufPos);
-
-                if (e instanceof ClosedByInterruptException) {
-                    interrupted = true;
-
-                    Thread.interrupted();
-                }
-
-                reinit(fileIo);
-            }
-        }
+    public void setDeltaFilePageStoreIoFactory(DeltaFilePageStoreIoFactory 
factory) {
+        deltaFilePageStoreIoFactory = factory;
     }
 
     /**
-     * Reinit page store after file channel was closed by thread interruption.
+     * Sets the callback on completion of delta file page store creation.
      *
-     * @param fileIo Old fileIo.
+     * @param callback Callback.
      */
-    private void reinit(FileIo fileIo) throws IOException {
-        if (!initialized) {
-            return;
-        }
-
-        if (fileIo != this.fileIo) {
-            return;
-        }
-
-        readWriteLock.writeLock().lock();
-
-        try {
-            if (fileIo != this.fileIo) {
-                return;
-            }
-
-            try {
-                boolean interrupted = false;
-
-                while (true) {
-                    try {
-                        fileIo = null;
-
-                        fileIo = ioFactory.create(filePath, CREATE, READ, 
WRITE);
+    public void 
setCompleteCreationDeltaFilePageStoreIoCallback(CompleteCreationDeltaFilePageStoreIoCallback
 callback) {
+        completeCreationDeltaFilePageStoreIoCallback = callback;
+    }
 
-                        fileExists = true;
+    /**
+     * Gets or creates a new delta file, a new delta file will be created when 
the previous one is {@link #completeNewDeltaFile()
+     * completed}.
+     *
+     * <p>Thread safe.
+     *
+     * @param pageIndexesSupplier Page indexes supplier for the new delta file 
page store.
+     * @return Future that will be completed when the new delta file page 
store is created.
+     */
+    public CompletableFuture<DeltaFilePageStoreIo> 
getOrCreateNewDeltaFile(Supplier<int[]> pageIndexesSupplier) {

Review Comment:
   Discussed personally, it is needed to lazily calculate page indexes only 
when creating delta files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to