tkalkirill commented on code in PR #947:
URL: https://github.com/apache/ignite-3/pull/947#discussion_r928618638


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java:
##########
@@ -141,7 +127,7 @@ public void stop(boolean clean) throws 
IgniteInternalCheckedException {
     public int allocatePage() throws IgniteInternalCheckedException {
         ensure();
 
-        int pageIdx = pageCount.getAndIncrement();
+        int pageIdx = (Integer) PAGE_COUNT.getAndAdd(this, 1);

Review Comment:
   Fix it



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java:
##########
@@ -166,503 +152,196 @@ public int pages() {
     public void pages(int pageCount) {
         assert pageCount >= 0 : pageCount;
 
-        this.pageCount.set(pageCount);
+        this.pageCount = pageCount;
     }
 
     /**
-     * Reads a page, unlike {@link #read(long, ByteBuffer, boolean)}, checks 
the page offset in the file not logically (pageOffset <= {@link
-     * #pages()} * {@link #pageSize}) but physically (pageOffset <= {@link 
#size()}), which can affect performance when used in production
-     * code.
+     * Reads a page, unlike {@link #read(long, ByteBuffer, boolean)}, does not 
check the {@code pageId} so that its {@code pageIdx} is not
+     * greater than the {@link #pages() number of allocated pages}.
      *
      * @param pageId Page ID.
      * @param pageBuf Page buffer to read into.
      * @param keepCrc By default, reading zeroes CRC which was on page store, 
but you can keep it in {@code pageBuf} if set {@code
      * keepCrc}.
+     * @return {@code True} if the page was read successfully.
      * @throws IgniteInternalCheckedException If reading failed (IO error 
occurred).
      */
-    public void readByPhysicalOffset(long pageId, ByteBuffer pageBuf, boolean 
keepCrc) throws IgniteInternalCheckedException {
-        read0(pageId, pageBuf, !skipCrc, keepCrc, false);
+    public boolean readWithoutPageIdCheck(long pageId, ByteBuffer pageBuf, 
boolean keepCrc) throws IgniteInternalCheckedException {
+        for (DeltaFilePageStoreIo deltaFilePageStoreIo : 
deltaFilePageStoreIos) {
+            long pageOff = deltaFilePageStoreIo.pageOffset(pageId);
+
+            if (pageOff >= 0) {
+                return deltaFilePageStoreIo.read(pageId, pageOff, pageBuf, 
keepCrc);
+            }
+        }
+
+        return filePageStoreIo.read(pageId, 
filePageStoreIo.pageOffset(pageId), pageBuf, keepCrc);
     }
 
     /** {@inheritDoc} */
     @Override
     public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws 
IgniteInternalCheckedException {
-        read0(pageId, pageBuf, !skipCrc, keepCrc, true);
-    }
-
-    /**
-     * Reads a page from the page store.
-     *
-     * @param pageId Page ID.
-     * @param pageBuf Page buffer to read into.
-     * @param checkCrc Check CRC on page.
-     * @param keepCrc By default reading zeroes CRC which was on file, but you 
can keep it in pageBuf if set keepCrc.
-     * @param checkPageOffsetLogically Check page offset by {@link 
#allocatedBytes} or {@link #size}.
-     * @throws IgniteInternalCheckedException If reading failed (IO error 
occurred).
-     */
-    private void read0(
-            long pageId,
-            ByteBuffer pageBuf,
-            boolean checkCrc,
-            boolean keepCrc,
-            boolean checkPageOffsetLogically
-    ) throws IgniteInternalCheckedException {
-        ensure();
-
-        try {
-            assert pageBuf.capacity() == pageSize : pageBuf.capacity();
-            assert pageBuf.remaining() == pageSize : pageBuf.remaining();
-            assert pageBuf.position() == 0 : pageBuf.position();
-            assert pageBuf.order() == nativeOrder() : pageBuf.order();
-
-            long pageOff = pageOffset(pageId);
-
-            if (checkPageOffsetLogically) {
-                assert pageOff <= allocatedBytes() : "calculatedOffset=" + 
pageOff
-                        + ", allocated=" + allocatedBytes() + ", headerSize=" 
+ headerSize + ", filePath=" + filePath;
-            } else {
-                assert pageOff <= size() : "calculatedOffset=" + pageOff
-                        + ", size=" + size() + ", headerSize=" + headerSize + 
", filePath=" + filePath;
-            }
-
-            int n = readWithFailover(pageBuf, pageOff);
+        assert pageIndex(pageId) <= pageCount : "pageIdx=" + pageIndex(pageId) 
+ ", pageCount=" + pageCount;
 
-            // If page was not written yet, nothing to read.
-            if (n < 0) {
-                pageBuf.put(new byte[pageBuf.remaining()]);
-            }
+        for (DeltaFilePageStoreIo deltaFilePageStoreIo : 
deltaFilePageStoreIos) {
+            long pageOff = deltaFilePageStoreIo.pageOffset(pageId);
 
-            int savedCrc32 = PageIo.getCrc(pageBuf);
-
-            PageIo.setCrc(pageBuf, 0);
-
-            pageBuf.position(0);
-
-            if (checkCrc) {
-                int curCrc32 = FastCrc.calcCrc(pageBuf, pageSize);
-
-                if ((savedCrc32 ^ curCrc32) != 0) {
-                    throw new 
IgniteInternalDataIntegrityViolationException("Failed to read page (CRC 
validation failed) "
-                            + "[id=" + hexLong(pageId) + ", off=" + pageOff
-                            + ", filePath=" + filePath + ", fileSize=" + 
fileIo.size()
-                            + ", savedCrc=" + hexInt(savedCrc32) + ", curCrc=" 
+ hexInt(curCrc32)
-                            + ", page=" + toHexString(pageBuf) + "]");
-                }
-            }
+            if (pageOff >= 0) {
+                deltaFilePageStoreIo.read(pageId, pageOff, pageBuf, keepCrc);
 
-            assert PageIo.getCrc(pageBuf) == 0;
-
-            if (keepCrc) {
-                PageIo.setCrc(pageBuf, savedCrc32);
+                return;
             }
-        } catch (IOException e) {
-            throw new IgniteInternalCheckedException("Failed to read page 
[file=" + filePath + ", pageId=" + pageId + "]", e);
         }
+
+        filePageStoreIo.read(pageId, filePageStoreIo.pageOffset(pageId), 
pageBuf, keepCrc);
     }
 
     /** {@inheritDoc} */
     @Override
     public void write(long pageId, ByteBuffer pageBuf, boolean calculateCrc) 
throws IgniteInternalCheckedException {
-        ensure();
-
-        boolean interrupted = false;
-
-        while (true) {
-            FileIo fileIo = this.fileIo;
-
-            try {
-                readWriteLock.readLock().lock();
-
-                try {
-                    assert pageBuf.position() == 0 : pageBuf.position();
-                    assert pageBuf.order() == nativeOrder() : "Page buffer 
order " + pageBuf.order()
-                            + " should be same with " + nativeOrder();
-                    assert PageIo.getType(pageBuf) != 0 : "Invalid state. Type 
is 0! pageId = " + hexLong(pageId);
-                    assert PageIo.getVersion(pageBuf) != 0 : "Invalid state. 
Version is 0! pageId = " + hexLong(pageId);
-
-                    if (calculateCrc && !skipCrc) {
-                        assert PageIo.getCrc(pageBuf) == 0 : hexLong(pageId);
-
-                        PageIo.setCrc(pageBuf, calcCrc32(pageBuf, pageSize));
-                    }
-
-                    // Check whether crc was calculated somewhere above the 
stack if it is forcibly skipped.
-                    assert skipCrc || PageIo.getCrc(pageBuf) != 0
-                            || calcCrc32(pageBuf, pageSize) == 0 : "CRC hasn't 
been calculated, crc=0";
-
-                    assert pageBuf.position() == 0 : pageBuf.position();
-
-                    long pageOff = pageOffset(pageId);
-
-                    assert pageOff <= allocatedBytes() : "calculatedOffset=" + 
pageOff
-                            + ", allocated=" + allocatedBytes() + ", 
headerSize=" + headerSize + ", filePath=" + filePath;
-
-                    fileIo.writeFully(pageBuf, pageOff);
-
-                    PageIo.setCrc(pageBuf, 0);
-
-                    if (interrupted) {
-                        Thread.currentThread().interrupt();
-                    }
-
-                    return;
-                } finally {
-                    readWriteLock.readLock().unlock();
-                }
-            } catch (IOException e) {
-                if (e instanceof ClosedChannelException) {
-                    try {
-                        if (e instanceof ClosedByInterruptException) {
-                            interrupted = true;
-
-                            Thread.interrupted();
-                        }
-
-                        reinit(fileIo);
-
-                        pageBuf.position(0);
-
-                        PageIo.setCrc(pageBuf, 0);
+        assert pageIndex(pageId) <= pageCount : "pageIdx=" + pageIndex(pageId) 
+ ", pageCount=" + pageCount;
 
-                        continue;
-                    } catch (IOException e0) {
-                        e0.addSuppressed(e);
-
-                        e = e0;
-                    }
-                }
-
-                throw new IgniteInternalCheckedException(
-                        "Failed to write page [filePath=" + filePath + ", 
pageId=" + pageId + "]",
-                        e
-                );
-            }
-        }
+        filePageStoreIo.write(pageId, pageBuf, calculateCrc);
     }
 
     /** {@inheritDoc} */
     @Override
     public void sync() throws IgniteInternalCheckedException {
-        readWriteLock.writeLock().lock();
-
-        try {
-            ensure();
-
-            FileIo fileIo = this.fileIo;
-
-            if (fileIo != null) {
-                fileIo.force();
-            }
-        } catch (IOException e) {
-            throw new IgniteInternalCheckedException("Failed to fsync 
partition file [filePath=" + filePath + ']', e);
-        } finally {
-            readWriteLock.writeLock().unlock();
-        }
+        filePageStoreIo.sync();
     }
 
     /** {@inheritDoc} */
     @Override
     public boolean exists() {
-        if (fileExists == null) {
-            readWriteLock.writeLock().lock();
-
-            try {
-                if (fileExists == null) {
-                    fileExists = Files.exists(filePath) && 
filePath.toFile().length() >= headerSize;
-                }
-            } finally {
-                readWriteLock.writeLock().unlock();
-            }
-        }
-
-        return fileExists;
+        return filePageStoreIo.exists();
     }
 
     /** {@inheritDoc} */
     @Override
     public void ensure() throws IgniteInternalCheckedException {
-        if (!initialized) {
-            readWriteLock.writeLock().lock();
-
-            try {
-                if (!initialized) {
-                    FileIo fileIo = null;
-
-                    IgniteInternalCheckedException err = null;
-
-                    try {
-                        boolean interrupted = false;
-
-                        while (true) {
-                            try {
-                                this.fileIo = fileIo = 
ioFactory.create(filePath, CREATE, READ, WRITE);
-
-                                fileExists = true;
-
-                                if (fileIo.size() < headerSize) {
-                                    fileIo.writeFully(new 
FilePageStoreHeader(version, pageSize).toByteBuffer(), 0);
-                                } else {
-                                    checkHeader(fileIo);
-                                }
-
-                                if (interrupted) {
-                                    Thread.currentThread().interrupt();
-                                }
-
-                                break;
-                            } catch (ClosedByInterruptException e) {
-                                interrupted = true;
-
-                                Thread.interrupted();
-                            }
-                        }
-
-                        initialized = true;
-                    } catch (IOException e) {
-                        err = new IgniteInternalCheckedException("Failed to 
initialize partition file: " + filePath, e);
-
-                        throw err;
-                    } finally {
-                        if (err != null && fileIo != null) {
-                            try {
-                                fileIo.close();
-                            } catch (IOException e) {
-                                err.addSuppressed(e);
-                            }
-                        }
-                    }
-                }
-            } finally {
-                readWriteLock.writeLock().unlock();
-            }
-        }
+        filePageStoreIo.ensure();
     }
 
     /** {@inheritDoc} */
     @Override
     public void close() throws IOException {
-        stop0(false);
+        filePageStoreIo.close();
+
+        for (DeltaFilePageStoreIo deltaFilePageStoreIo : 
deltaFilePageStoreIos) {
+            deltaFilePageStoreIo.close();
+        }
     }
 
     /**
      * Returns size of the page store in bytes.
      *
-     * <p>May differ from {@link #pages} * {@link #pageSize} due to delayed 
writes or due to other implementation specific details.
+     * <p>May differ from {@link #pages} * {@link FilePageStoreIo#pageSize()} 
due to delayed writes or due to other implementation specific
+     * details.
      *
      * @throws IgniteInternalCheckedException If an I/O error occurs.
      */
     public long size() throws IgniteInternalCheckedException {
-        try {
-            FileIo io = fileIo;
-
-            return io == null ? 0 : io.size();
-        } catch (IOException e) {
-            throw new IgniteInternalCheckedException(e);
-        }
+        return filePageStoreIo.size();
     }
 
     /**
-     * Stops file page store.
-     *
-     * @param delete {@code True} to delete file.
-     * @throws IOException If fails.
+     * Returns file page store path.
      */
-    private void stop0(boolean delete) throws IOException {
-        readWriteLock.writeLock().lock();
-
-        try {
-            if (!initialized) {
-                // Ensure the file is closed even if not initialized yet.
-                if (fileIo != null) {
-                    fileIo.close();
-                }
-
-                if (delete && exists()) {
-                    Files.delete(filePath);
-                }
-
-                return;
-            }
-
-            fileIo.force();
-
-            fileIo.close();
-
-            fileIo = null;
-
-            if (delete) {
-                Files.delete(filePath);
-
-                fileExists = false;
-            }
-        } finally {
-            initialized = false;
+    public Path filePath() {

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to