tkalkirill commented on code in PR #947:
URL: https://github.com/apache/ignite-3/pull/947#discussion_r928619704
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java:
##########
@@ -166,503 +152,196 @@ public int pages() {
public void pages(int pageCount) {
assert pageCount >= 0 : pageCount;
- this.pageCount.set(pageCount);
+ this.pageCount = pageCount;
}
/**
- * Reads a page, unlike {@link #read(long, ByteBuffer, boolean)}, checks
the page offset in the file not logically (pageOffset <= {@link
- * #pages()} * {@link #pageSize}) but physically (pageOffset <= {@link
#size()}), which can affect performance when used in production
- * code.
+ * Reads a page, unlike {@link #read(long, ByteBuffer, boolean)}, does not
check the {@code pageId} so that its {@code pageIdx} is not
+ * greater than the {@link #pages() number of allocated pages}.
*
* @param pageId Page ID.
* @param pageBuf Page buffer to read into.
* @param keepCrc By default, reading zeroes CRC which was on page store,
but you can keep it in {@code pageBuf} if set {@code
* keepCrc}.
+ * @return {@code True} if the page was read successfully.
* @throws IgniteInternalCheckedException If reading failed (IO error
occurred).
*/
- public void readByPhysicalOffset(long pageId, ByteBuffer pageBuf, boolean
keepCrc) throws IgniteInternalCheckedException {
- read0(pageId, pageBuf, !skipCrc, keepCrc, false);
+ public boolean readWithoutPageIdCheck(long pageId, ByteBuffer pageBuf,
boolean keepCrc) throws IgniteInternalCheckedException {
+ for (DeltaFilePageStoreIo deltaFilePageStoreIo :
deltaFilePageStoreIos) {
+ long pageOff = deltaFilePageStoreIo.pageOffset(pageId);
+
+ if (pageOff >= 0) {
+ return deltaFilePageStoreIo.read(pageId, pageOff, pageBuf,
keepCrc);
+ }
+ }
+
+ return filePageStoreIo.read(pageId,
filePageStoreIo.pageOffset(pageId), pageBuf, keepCrc);
}
/** {@inheritDoc} */
@Override
public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws
IgniteInternalCheckedException {
- read0(pageId, pageBuf, !skipCrc, keepCrc, true);
- }
-
- /**
- * Reads a page from the page store.
- *
- * @param pageId Page ID.
- * @param pageBuf Page buffer to read into.
- * @param checkCrc Check CRC on page.
- * @param keepCrc By default reading zeroes CRC which was on file, but you
can keep it in pageBuf if set keepCrc.
- * @param checkPageOffsetLogically Check page offset by {@link
#allocatedBytes} or {@link #size}.
- * @throws IgniteInternalCheckedException If reading failed (IO error
occurred).
- */
- private void read0(
- long pageId,
- ByteBuffer pageBuf,
- boolean checkCrc,
- boolean keepCrc,
- boolean checkPageOffsetLogically
- ) throws IgniteInternalCheckedException {
- ensure();
-
- try {
- assert pageBuf.capacity() == pageSize : pageBuf.capacity();
- assert pageBuf.remaining() == pageSize : pageBuf.remaining();
- assert pageBuf.position() == 0 : pageBuf.position();
- assert pageBuf.order() == nativeOrder() : pageBuf.order();
-
- long pageOff = pageOffset(pageId);
-
- if (checkPageOffsetLogically) {
- assert pageOff <= allocatedBytes() : "calculatedOffset=" +
pageOff
- + ", allocated=" + allocatedBytes() + ", headerSize="
+ headerSize + ", filePath=" + filePath;
- } else {
- assert pageOff <= size() : "calculatedOffset=" + pageOff
- + ", size=" + size() + ", headerSize=" + headerSize +
", filePath=" + filePath;
- }
-
- int n = readWithFailover(pageBuf, pageOff);
+ assert pageIndex(pageId) <= pageCount : "pageIdx=" + pageIndex(pageId)
+ ", pageCount=" + pageCount;
- // If page was not written yet, nothing to read.
- if (n < 0) {
- pageBuf.put(new byte[pageBuf.remaining()]);
- }
+ for (DeltaFilePageStoreIo deltaFilePageStoreIo :
deltaFilePageStoreIos) {
+ long pageOff = deltaFilePageStoreIo.pageOffset(pageId);
- int savedCrc32 = PageIo.getCrc(pageBuf);
-
- PageIo.setCrc(pageBuf, 0);
-
- pageBuf.position(0);
-
- if (checkCrc) {
- int curCrc32 = FastCrc.calcCrc(pageBuf, pageSize);
-
- if ((savedCrc32 ^ curCrc32) != 0) {
- throw new
IgniteInternalDataIntegrityViolationException("Failed to read page (CRC
validation failed) "
- + "[id=" + hexLong(pageId) + ", off=" + pageOff
- + ", filePath=" + filePath + ", fileSize=" +
fileIo.size()
- + ", savedCrc=" + hexInt(savedCrc32) + ", curCrc="
+ hexInt(curCrc32)
- + ", page=" + toHexString(pageBuf) + "]");
- }
- }
+ if (pageOff >= 0) {
+ deltaFilePageStoreIo.read(pageId, pageOff, pageBuf, keepCrc);
- assert PageIo.getCrc(pageBuf) == 0;
-
- if (keepCrc) {
- PageIo.setCrc(pageBuf, savedCrc32);
+ return;
}
- } catch (IOException e) {
- throw new IgniteInternalCheckedException("Failed to read page
[file=" + filePath + ", pageId=" + pageId + "]", e);
}
+
+ filePageStoreIo.read(pageId, filePageStoreIo.pageOffset(pageId),
pageBuf, keepCrc);
}
/** {@inheritDoc} */
@Override
public void write(long pageId, ByteBuffer pageBuf, boolean calculateCrc)
throws IgniteInternalCheckedException {
- ensure();
-
- boolean interrupted = false;
-
- while (true) {
- FileIo fileIo = this.fileIo;
-
- try {
- readWriteLock.readLock().lock();
-
- try {
- assert pageBuf.position() == 0 : pageBuf.position();
- assert pageBuf.order() == nativeOrder() : "Page buffer
order " + pageBuf.order()
- + " should be same with " + nativeOrder();
- assert PageIo.getType(pageBuf) != 0 : "Invalid state. Type
is 0! pageId = " + hexLong(pageId);
- assert PageIo.getVersion(pageBuf) != 0 : "Invalid state.
Version is 0! pageId = " + hexLong(pageId);
-
- if (calculateCrc && !skipCrc) {
- assert PageIo.getCrc(pageBuf) == 0 : hexLong(pageId);
-
- PageIo.setCrc(pageBuf, calcCrc32(pageBuf, pageSize));
- }
-
- // Check whether crc was calculated somewhere above the
stack if it is forcibly skipped.
- assert skipCrc || PageIo.getCrc(pageBuf) != 0
- || calcCrc32(pageBuf, pageSize) == 0 : "CRC hasn't
been calculated, crc=0";
-
- assert pageBuf.position() == 0 : pageBuf.position();
-
- long pageOff = pageOffset(pageId);
-
- assert pageOff <= allocatedBytes() : "calculatedOffset=" +
pageOff
- + ", allocated=" + allocatedBytes() + ",
headerSize=" + headerSize + ", filePath=" + filePath;
-
- fileIo.writeFully(pageBuf, pageOff);
-
- PageIo.setCrc(pageBuf, 0);
-
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
-
- return;
- } finally {
- readWriteLock.readLock().unlock();
- }
- } catch (IOException e) {
- if (e instanceof ClosedChannelException) {
- try {
- if (e instanceof ClosedByInterruptException) {
- interrupted = true;
-
- Thread.interrupted();
- }
-
- reinit(fileIo);
-
- pageBuf.position(0);
-
- PageIo.setCrc(pageBuf, 0);
+ assert pageIndex(pageId) <= pageCount : "pageIdx=" + pageIndex(pageId)
+ ", pageCount=" + pageCount;
- continue;
- } catch (IOException e0) {
- e0.addSuppressed(e);
-
- e = e0;
- }
- }
-
- throw new IgniteInternalCheckedException(
- "Failed to write page [filePath=" + filePath + ",
pageId=" + pageId + "]",
- e
- );
- }
- }
+ filePageStoreIo.write(pageId, pageBuf, calculateCrc);
}
/** {@inheritDoc} */
@Override
public void sync() throws IgniteInternalCheckedException {
- readWriteLock.writeLock().lock();
-
- try {
- ensure();
-
- FileIo fileIo = this.fileIo;
-
- if (fileIo != null) {
- fileIo.force();
- }
- } catch (IOException e) {
- throw new IgniteInternalCheckedException("Failed to fsync
partition file [filePath=" + filePath + ']', e);
- } finally {
- readWriteLock.writeLock().unlock();
- }
+ filePageStoreIo.sync();
}
/** {@inheritDoc} */
@Override
public boolean exists() {
- if (fileExists == null) {
- readWriteLock.writeLock().lock();
-
- try {
- if (fileExists == null) {
- fileExists = Files.exists(filePath) &&
filePath.toFile().length() >= headerSize;
- }
- } finally {
- readWriteLock.writeLock().unlock();
- }
- }
-
- return fileExists;
+ return filePageStoreIo.exists();
}
/** {@inheritDoc} */
@Override
public void ensure() throws IgniteInternalCheckedException {
- if (!initialized) {
- readWriteLock.writeLock().lock();
-
- try {
- if (!initialized) {
- FileIo fileIo = null;
-
- IgniteInternalCheckedException err = null;
-
- try {
- boolean interrupted = false;
-
- while (true) {
- try {
- this.fileIo = fileIo =
ioFactory.create(filePath, CREATE, READ, WRITE);
-
- fileExists = true;
-
- if (fileIo.size() < headerSize) {
- fileIo.writeFully(new
FilePageStoreHeader(version, pageSize).toByteBuffer(), 0);
- } else {
- checkHeader(fileIo);
- }
-
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
-
- break;
- } catch (ClosedByInterruptException e) {
- interrupted = true;
-
- Thread.interrupted();
- }
- }
-
- initialized = true;
- } catch (IOException e) {
- err = new IgniteInternalCheckedException("Failed to
initialize partition file: " + filePath, e);
-
- throw err;
- } finally {
- if (err != null && fileIo != null) {
- try {
- fileIo.close();
- } catch (IOException e) {
- err.addSuppressed(e);
- }
- }
- }
- }
- } finally {
- readWriteLock.writeLock().unlock();
- }
- }
+ filePageStoreIo.ensure();
}
/** {@inheritDoc} */
@Override
public void close() throws IOException {
- stop0(false);
+ filePageStoreIo.close();
+
+ for (DeltaFilePageStoreIo deltaFilePageStoreIo :
deltaFilePageStoreIos) {
+ deltaFilePageStoreIo.close();
+ }
}
/**
* Returns size of the page store in bytes.
*
- * <p>May differ from {@link #pages} * {@link #pageSize} due to delayed
writes or due to other implementation specific details.
+ * <p>May differ from {@link #pages} * {@link FilePageStoreIo#pageSize()}
due to delayed writes or due to other implementation specific
+ * details.
*
* @throws IgniteInternalCheckedException If an I/O error occurs.
*/
public long size() throws IgniteInternalCheckedException {
- try {
- FileIo io = fileIo;
-
- return io == null ? 0 : io.size();
- } catch (IOException e) {
- throw new IgniteInternalCheckedException(e);
- }
+ return filePageStoreIo.size();
}
/**
- * Stops file page store.
- *
- * @param delete {@code True} to delete file.
- * @throws IOException If fails.
+ * Returns file page store path.
*/
- private void stop0(boolean delete) throws IOException {
- readWriteLock.writeLock().lock();
-
- try {
- if (!initialized) {
- // Ensure the file is closed even if not initialized yet.
- if (fileIo != null) {
- fileIo.close();
- }
-
- if (delete && exists()) {
- Files.delete(filePath);
- }
-
- return;
- }
-
- fileIo.force();
-
- fileIo.close();
-
- fileIo = null;
-
- if (delete) {
- Files.delete(filePath);
-
- fileExists = false;
- }
- } finally {
- initialized = false;
+ public Path filePath() {
+ return filePageStoreIo.filePath();
+ }
- readWriteLock.writeLock().unlock();
- }
+ /**
+ * Returns file page store header size.
+ */
+ public int headerSize() {
+ return filePageStoreIo.headerSize();
}
/**
- * Gets page offset within the store file.
+ * Sets the new page allocation listener.
*
- * @param pageId Page ID.
- * @return Page offset.
+ * @param listener New page allocation listener.
*/
- long pageOffset(long pageId) {
- return (long) PageIdUtils.pageIndex(pageId) * pageSize + headerSize;
+ public void setPageAllocationListener(PageAllocationListener listener) {
+ pageAllocationListener = listener;
}
/**
- * Reads from page storage with failover.
+ * Sets the delta file page store factory.
*
- * @param destBuf Destination buffer.
- * @param position Position.
- * @return Number of read bytes, or {@code -1} if the given position is
greater than or equal to the file's current size.
+ * @param factory Factory.
*/
- private int readWithFailover(ByteBuffer destBuf, long position) throws
IOException {
- boolean interrupted = false;
-
- int bufPos = destBuf.position();
-
- while (true) {
- FileIo fileIo = this.fileIo;
-
- if (fileIo == null) {
- throw new IOException("FileIo has stopped");
- }
-
- try {
- assert destBuf.remaining() > 0;
-
- int bytesRead = fileIo.readFully(destBuf, position);
-
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
-
- return bytesRead;
- } catch (ClosedChannelException e) {
- destBuf.position(bufPos);
-
- if (e instanceof ClosedByInterruptException) {
- interrupted = true;
-
- Thread.interrupted();
- }
-
- reinit(fileIo);
- }
- }
+ public void setDeltaFilePageStoreIoFactory(DeltaFilePageStoreIoFactory
factory) {
+ deltaFilePageStoreIoFactory = factory;
}
/**
- * Reinit page store after file channel was closed by thread interruption.
+ * Sets the callback on completion of delta file page store creation.
*
- * @param fileIo Old fileIo.
+ * @param callback Callback.
*/
- private void reinit(FileIo fileIo) throws IOException {
- if (!initialized) {
- return;
- }
-
- if (fileIo != this.fileIo) {
- return;
- }
-
- readWriteLock.writeLock().lock();
-
- try {
- if (fileIo != this.fileIo) {
- return;
- }
-
- try {
- boolean interrupted = false;
-
- while (true) {
- try {
- fileIo = null;
-
- fileIo = ioFactory.create(filePath, CREATE, READ,
WRITE);
+ public void
setCompleteCreationDeltaFilePageStoreIoCallback(CompleteCreationDeltaFilePageStoreIoCallback
callback) {
+ completeCreationDeltaFilePageStoreIoCallback = callback;
+ }
- fileExists = true;
+ /**
+ * Gets or creates a new delta file, a new delta file will be created when
the previous one is {@link #completeNewDeltaFile()
+ * completed}.
+ *
+ * <p>Thread safe.
+ *
+ * @param pageIndexesSupplier Page indexes supplier for the new delta file
page store.
+ * @return Future that will be completed when the new delta file page
store is created.
+ */
+ public CompletableFuture<DeltaFilePageStoreIo>
getOrCreateNewDeltaFile(Supplier<int[]> pageIndexesSupplier) {
Review Comment:
Discussed personally, it is needed to lazily calculate page indexes only
when creating delta files.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]