tkalkirill commented on code in PR #818:
URL: https://github.com/apache/ignite-3/pull/818#discussion_r880591035


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java:
##########
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import static java.nio.ByteOrder.nativeOrder;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.ignite.internal.util.IgniteUtils.hexInt;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
+import static org.apache.ignite.lang.IgniteSystemProperties.getBoolean;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.fileio.FileIo;
+import org.apache.ignite.internal.fileio.FileIoFactory;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.persistence.FastCrc;
+import 
org.apache.ignite.internal.pagememory.persistence.IgniteInternalDataIntegrityViolationException;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * FilePageStore is a {@link PageStore} implementation that uses regular files 
to store pages.
+ *
+ * <p>Actual read and write operations are performed with {@link FileIo} 
abstract interface, list of its implementations is a good source
+ * of information about functionality in Ignite Native Persistence.
+ *
+ * <p>On a physical level each instance of FilePageStore corresponds to a 
partition file assigned to the local node or to index file.
+ *
+ * <p>Consists of:
+ * <ul>
+ *     <li>Header - {@link #SIGNATURE signature} (8 byte) + {@link #version 
version} (4 byte) + {@link #type type} (1 byte) +
+ *     {@link #pageSize pageSize} (4 bytes) + version-specific information, 
total length {@link #headerSize}. </li>
+ *     <li>Body - data pages are multiples of {@link #pageSize pageSize}.</li>
+ * </ul>
+ */
+public class FilePageStore implements PageStore {
+    /** Page store file signature. */
+    private static final long SIGNATURE = 0xF19AC4FE60C530B8L;
+
+    /** File version. */
+    static final int VERSION = 1;
+
+    /** Size of the common file page store header for all versions, in bytes. 
*/
+    static final int COMMON_HEADER_SIZE = 8/*SIGNATURE*/ + 4/*VERSION*/ + 
1/*type*/ + 4/*page size*/;
+
+    /** Skip CRC calculation flag. */
+    // TODO: IGNITE-17011 Move to config
+    private final boolean skipCrc = getBoolean("IGNITE_PDS_SKIP_CRC");
+
+    /** Data type, can be {@link PageStore#TYPE_IDX} or {@link 
PageStore#TYPE_DATA}. */
+    private final byte type;
+
+    /** File page store path. */
+    private final Path filePath;
+
+    /** {@link FileIo} factory. */
+    private final FileIoFactory ioFactory;
+
+    /** Page size in bytes. */
+    private final int pageSize;
+
+    /** Number of allocated bytes. */
+    private final AtomicLong allocatedBytes = new AtomicLong();
+
+    /** List of listeners for current page store to handle. */
+    private final List<PageWriteListener> listeners = new 
CopyOnWriteArrayList<>();
+
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+    /** Caches the existence state of storage file. After it is initialized, 
it will be not {@code null} during lifecycle. */
+    private volatile Boolean fileExists;
+
+    /** {@link FileIo} for read/write operations with file. */
+    private volatile FileIo fileIo;
+
+    /** Initialized file page store. */
+    private volatile boolean initialized;
+
+    /**
+     * Constructor.
+     *
+     * @param type Data type, can be {@link PageStore#TYPE_IDX} or {@link 
PageStore#TYPE_DATA}.
+     * @param filePath File page store path.
+     * @param ioFactory {@link FileIo} factory.
+     * @param pageSize Page size in bytes.
+     */
+    public FilePageStore(
+            byte type,
+            Path filePath,
+            FileIoFactory ioFactory,
+            int pageSize
+    ) {
+        assert type == PageStore.TYPE_DATA || type == PageStore.TYPE_IDX : 
type;
+
+        this.type = type;
+        this.filePath = filePath;
+        this.ioFactory = ioFactory;
+        this.pageSize = pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void addWriteListener(PageWriteListener listener) {
+        listeners.add(listener);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void removeWriteListener(PageWriteListener listener) {
+        listeners.remove(listener);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop(boolean clean) throws IgniteInternalCheckedException {
+        try {
+            stop0(clean);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException(
+                    "Failed to stop serving partition file [file=" + filePath 
+ ", delete=" + clean + "]",
+                    e
+            );
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long allocatePage() throws IgniteInternalCheckedException {
+        init();
+
+        return allocatedBytes.getAndAdd(pageSize) / pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long pages() {
+        if (!initialized) {
+            return 0;
+        }
+
+        return allocatedBytes.get() / pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean read(long pageId, ByteBuffer pageBuf, boolean keepCrc) 
throws IgniteInternalCheckedException {
+        return read(pageId, pageBuf, !skipCrc, keepCrc);
+    }
+
+    /**
+     * Reads a page from the page store.
+     *
+     * @param pageId Page ID.
+     * @param pageBuf Page buffer to read into.
+     * @param checkCrc Check CRC on page.
+     * @param keepCrc By default reading zeroes CRC which was on file, but you 
can keep it in pageBuf if set keepCrc.
+     * @return {@code true} if page has been read successfully, {@code false} 
if page hasn't been written yet.
+     * @throws IgniteInternalCheckedException If reading failed (IO error 
occurred).
+     */
+    private boolean read(long pageId, ByteBuffer pageBuf, boolean checkCrc, 
boolean keepCrc) throws IgniteInternalCheckedException {
+        init();
+
+        try {
+            long off = pageOffset(pageId);
+
+            assert pageBuf.capacity() == pageSize;
+            assert pageBuf.remaining() == pageSize;
+            assert pageBuf.position() == 0;
+            assert pageBuf.order() == nativeOrder();
+            assert off <= allocatedBytes.get() : "calculatedOffset=" + off
+                    + ", allocated=" + allocatedBytes.get() + ", headerSize=" 
+ headerSize() + ", filePath=" + filePath;
+
+            int n = readWithFailover(pageBuf, off);
+
+            // If page was not written yet, nothing to read.
+            if (n < 0) {
+                pageBuf.put(new byte[pageBuf.remaining()]);

Review Comment:
   In personal correspondence, we decided to leave it as it is, added some 
documentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to