ibessonov commented on a change in pull request #697:
URL: https://github.com/apache/ignite-3/pull/697#discussion_r837151498



##########
File path: 
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageUtils.java
##########
@@ -279,4 +279,27 @@ public static void copyMemory(ByteBuffer src, long srcOff, 
ByteBuffer dst, long
     public static void copyMemory(long srcAddr, long srcOff, long dstAddr, 
long dstOff, long cnt) {
         GridUnsafe.copyMemory(null, srcAddr + srcOff, null, dstAddr + dstOff, 
cnt);
     }
+
+    /**
+     * Writes a {@link ByteBuffer} into the memory.
+     *
+     * @param addr Address.
+     * @param off Offset.
+     * @param buf Byte buffer.
+     */
+    public static void putByteBuffer(long addr, int off, ByteBuffer buf) {
+        assert addr > 0 : addr;
+        assert off >= 0 : off;
+        assert buf != null;
+
+        if (buf.isDirect()) {
+            GridUnsafe.copyMemory(buf, GridUnsafe.bufferAddress(buf), null, 
addr + off, buf.limit());
+        } else {
+            assert !buf.isReadOnly();
+
+            byte[] arr = buf.array();
+
+            GridUnsafe.copyMemory(arr, GridUnsafe.BYTE_ARR_OFF + 
buf.position(), null, addr + off, buf.limit());

Review comment:
       Position is not enough, you should use array offset. I asked to use 
array offset last time. Please add unit tests

##########
File path: 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryDataRegion.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import 
org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionView;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstract data region for {@link PageMemoryStorageEngine}. Based on a {@link 
PageMemory}.
+ */
+// TODO: IGNITE-16641 Add support for persistent case.
+abstract class PageMemoryDataRegion implements DataRegion {
+    protected final PageMemoryDataRegionConfiguration cfg;
+
+    protected final PageIoRegistry ioRegistry;
+
+    protected PageMemory pageMemory;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Data region configuration.
+     * @param ioRegistry IO registry.
+     */
+    public PageMemoryDataRegion(PageMemoryDataRegionConfiguration cfg, 
PageIoRegistry ioRegistry) {
+        this.cfg = cfg;
+        this.ioRegistry = ioRegistry;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() {
+        if (pageMemory != null) {
+            pageMemory.stop(true);
+        }
+    }
+
+    /**
+     * Returns {@link true} if the date region is persistent.
+     */
+    public boolean persistent() {
+        return ((PageMemoryDataRegionView) cfg.value()).persistent();
+    }
+
+    /**
+     * Returns page memory, {@code null} if not {@link #start started}.
+     */
+    public @Nullable PageMemory pageMemory() {
+        return pageMemory;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return "PageMemoryDataRegion [name=" + cfg.value().name() + "]";

Review comment:
       I think that this method should only be defined in non-abstract 
implementations, otherwise how are you going to distinguish persistent region 
from a volatile one?

##########
File path: 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.itemId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getBytes;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
+import static 
org.apache.ignite.internal.storage.pagememory.TableTree.RowData.FULL;
+import static 
org.apache.ignite.internal.storage.pagememory.TableTree.RowData.KEY_ONLY;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.pagememory.util.PageLockListener;
+import org.apache.ignite.internal.storage.pagememory.io.RowIo;
+import org.apache.ignite.internal.storage.pagememory.io.TableDataIo;
+import org.apache.ignite.internal.storage.pagememory.io.TableInnerIo;
+import org.apache.ignite.internal.storage.pagememory.io.TableLeafIo;
+import org.apache.ignite.internal.storage.pagememory.io.TableMetaIo;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link BplusTree} implementation for storage-page-memory module.
+ */
+public class TableTree extends BplusTree<TableSearchRow, TableDataRow> {
+    private final int partId;
+
+    /**
+     * Constructor.
+     *
+     * @param grpId Group ID.
+     * @param grpName Group name.
+     * @param pageMem Page memory.
+     * @param lockLsnr Page lock listener.
+     * @param globalRmvId Global remove ID.
+     * @param metaPageId Meta page ID.
+     * @param reuseList Reuse list.
+     * @param partId Partition id.
+     */
+    public TableTree(
+            int grpId,
+            String grpName,
+            PageMemory pageMem,
+            PageLockListener lockLsnr,
+            AtomicLong globalRmvId,
+            long metaPageId,
+            @Nullable ReuseList reuseList,
+            int partId
+    ) throws IgniteInternalCheckedException {
+        super(
+                "TableTree_" + grpId,
+                grpId,
+                grpName,
+                pageMem,
+                lockLsnr,
+                FLAG_AUX,
+                globalRmvId,
+                metaPageId,
+                reuseList
+        );
+
+        this.partId = partId;
+
+        setIos(TableInnerIo.VERSIONS, TableLeafIo.VERSIONS, 
TableMetaIo.VERSIONS);
+
+        initTree(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected long allocatePageNoReuse() throws IgniteInternalCheckedException 
{
+        return pageMem.allocatePage(grpId, partId, defaultPageFlag);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected int compare(BplusIo<TableSearchRow> io, long pageAddr, int idx, 
TableSearchRow row) throws IgniteInternalCheckedException {
+        RowIo rowIo = (RowIo) io;
+
+        int cmp = Integer.compare(rowIo.hash(pageAddr, idx), row.hash());
+
+        return cmp != 0 ? cmp : compareRows(rowIo.link(pageAddr, idx), row);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public TableDataRow getRow(BplusIo<TableSearchRow> io, long pageAddr, int 
idx, Object x) throws IgniteInternalCheckedException {
+        RowIo rowIo = (RowIo) io;
+
+        int hash = rowIo.hash(pageAddr, idx);
+        long link = rowIo.link(pageAddr, idx);
+
+        return getRowByLink(link, hash, FULL);
+    }
+
+    /**
+     * Returns a row by link.
+     *
+     * @param link Row link.
+     * @param hash Row hash.
+     * @param rowData Specifies what data to lookup.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public TableDataRow getRowByLink(final long link, int hash, RowData 
rowData) throws IgniteInternalCheckedException {
+        assert link != 0;
+
+        FragmentedByteArray keyBytes = null;
+        FragmentedByteArray valueBytes = null;
+
+        long nextLink = link;
+
+        do {
+            final long pageId = pageId(nextLink);
+
+            final long page = pageMem.acquirePage(grpId, pageId, 
statisticsHolder());
+
+            try {
+                long pageAddr = pageMem.readLock(grpId, pageId, page);
+
+                assert pageAddr != 0L : nextLink;
+
+                try {
+                    TableDataIo dataIo = 
pageMem.ioRegistry().resolve(pageAddr);
+
+                    int itemId = itemId(nextLink);
+
+                    int pageSize = pageMem.realPageSize(grpId);
+
+                    DataPagePayload data = dataIo.readPayload(pageAddr, 
itemId, pageSize);
+
+                    if (data.nextLink() == 0 && nextLink == link) {
+                        // Good luck: we can read the row without fragments.
+                        return readFullRow(link, hash, rowData, pageAddr + 
data.offset());
+                    }
+
+                    ByteBuffer dataBuf = wrapPointer(pageAddr, pageSize);
+
+                    dataBuf.position(data.offset());
+                    dataBuf.limit(data.offset() + data.payloadSize());
+
+                    if (keyBytes == null) {
+                        keyBytes = new FragmentedByteArray();
+                    }
+
+                    keyBytes.readData(dataBuf);
+
+                    if (keyBytes.ready()) {
+                        if (rowData == KEY_ONLY) {
+                            nextLink = 0;
+                            continue;
+                        }
+
+                        if (valueBytes == null) {
+                            valueBytes = new FragmentedByteArray();
+                        }
+
+                        valueBytes.readData(dataBuf);
+
+                        if (valueBytes.ready()) {
+                            nextLink = 0;
+                            continue;
+                        }
+                    }
+
+                    nextLink = data.nextLink();
+                } finally {
+                    pageMem.readUnlock(grpId, pageId, page);
+                }
+            } finally {
+                pageMem.releasePage(grpId, pageId, page);
+            }
+        } while (nextLink != 0);
+
+        ByteBuffer key = ByteBuffer.wrap(keyBytes.array());
+        ByteBuffer value = ByteBuffer.wrap(valueBytes == null ? 
BYTE_EMPTY_ARRAY : valueBytes.array());
+
+        return new TableDataRowImpl(link, hash, key, value);
+    }
+
+    private TableDataRow readFullRow(long link, int hash, RowData rowData, 
long pageAddr) {
+        int off = 0;
+
+        int keyBytesLen = getInt(pageAddr, off);
+        off += 4;
+
+        byte[] keyBytes = getBytes(pageAddr, off, keyBytesLen);
+        off += keyBytesLen;
+
+        if (rowData == KEY_ONLY) {
+            return new TableDataRowImpl(link, hash, ByteBuffer.wrap(keyBytes), 
ByteBuffer.wrap(BYTE_EMPTY_ARRAY));
+        }
+
+        int valueBytesLen = getInt(pageAddr, off);
+        off += 4;
+
+        byte[] valueBytes = getBytes(pageAddr, off, valueBytesLen);
+
+        return new TableDataRowImpl(link, hash, ByteBuffer.wrap(keyBytes), 
ByteBuffer.wrap(valueBytes));
+    }
+
+    private int compareRows(final long link, TableSearchRow row) throws 
IgniteInternalCheckedException {
+        assert link != 0;
+
+        long nextLink = link;
+
+        int keyBytesLen = -1;
+        int keyBytesOff = 0;
+
+        do {
+            final long pageId = pageId(nextLink);
+
+            final long page = pageMem.acquirePage(grpId, pageId, 
statisticsHolder());
+
+            try {
+                final long pageAddr = pageMem.readLock(grpId, pageId, page);
+
+                assert pageAddr != 0L : nextLink;
+
+                try {
+                    TableDataIo dataIo = 
pageMem.ioRegistry().resolve(pageAddr);
+
+                    int itemId = itemId(nextLink);
+
+                    int pageSize = pageMem.realPageSize(grpId);
+
+                    DataPagePayload data = dataIo.readPayload(pageAddr, 
itemId, pageSize);
+
+                    if (data.nextLink() == 0 && nextLink == link) {
+                        // Good luck: we can compare the rows without 
fragments.
+                        return compareRowsFull(pageAddr + data.offset(), row);
+                    }
+
+                    ByteBuffer dataBuf = wrapPointer(pageAddr, pageSize);
+
+                    dataBuf.position(data.offset());
+                    dataBuf.limit(data.offset() + data.payloadSize());
+
+                    ByteBuffer keyBuf = row.key();
+
+                    if (keyBytesLen == -1) {
+                        // Guaranteed to read because we store it in the 
header.
+                        keyBytesLen = dataBuf.getInt();
+
+                        int cmp = Integer.compare(keyBytesLen, keyBuf.limit());
+
+                        if (cmp != 0) {
+                            return cmp;
+                        }
+                    }
+
+                    if (dataBuf.remaining() > 0) {
+                        int len = Math.min(dataBuf.remaining(), keyBytesLen - 
keyBytesOff);
+
+                        int dataBufPos = dataBuf.position();
+
+                        dataBuf.position(dataBufPos);
+                        dataBuf.limit(dataBufPos + len);
+
+                        int oldKeyBufLimit = keyBuf.limit();
+
+                        keyBuf.position(keyBytesOff);
+                        keyBuf.limit(keyBytesOff + len);
+
+                        int cmp = dataBuf.compareTo(keyBuf);
+
+                        keyBytesOff += len;
+
+                        keyBuf.limit(oldKeyBufLimit);
+
+                        if (cmp != 0 || keyBytesOff == keyBytesLen) {
+                            return cmp;
+                        }
+                    }
+
+                    nextLink = data.nextLink();
+                } finally {
+                    pageMem.readUnlock(grpId, pageId, page);
+                }
+            } finally {
+                pageMem.releasePage(grpId, pageId, page);
+            }
+        } while (nextLink != 0);
+
+        throw new IgniteInternalCheckedException("Row comparison error [link=" 
+ link + ", row=" + row + "]");

Review comment:
       Is this even reachable? If it's not, then why do you use checked 
exception?

##########
File path: 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableInnerIo.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+import static 
org.apache.ignite.internal.storage.pagememory.TableTree.RowData.KEY_ONLY;
+
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusInnerIo;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.storage.pagememory.TableSearchRow;
+import org.apache.ignite.internal.storage.pagememory.TableTree;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * IO routines for {@link TableTree} inner pages.
+ *
+ * <p>Structure: hash(int) + link(long).
+ */
+public class TableInnerIo extends BplusInnerIo<TableSearchRow> implements 
RowIo {
+    /** Page IO type. */
+    public static final short T_TABLE_INNER_IO = 4;
+
+    /** I/O versions. */
+    public static final IoVersions<TableInnerIo> VERSIONS = new 
IoVersions<>(new TableInnerIo(1));
+
+    /**
+     * Constructor.
+     *
+     * @param ver Page format version.
+     */
+    protected TableInnerIo(int ver) {
+        super(
+                T_TABLE_INNER_IO,
+                ver,
+                true,
+                Integer.BYTES + Long.BYTES // hash(int) + link(long);
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void store(long dstPageAddr, int dstIdx, BplusIo<TableSearchRow> 
srcIo, long srcPageAddr, int srcIdx) {
+        assertPageType(dstPageAddr);
+
+        int srcHash = hash(srcPageAddr, srcIdx);
+        long srcLink = link(srcPageAddr, srcIdx);
+
+        int dstOff = offset(dstIdx);
+
+        putInt(dstPageAddr, dstOff, srcHash);
+        dstOff += 4;
+
+        putLong(dstPageAddr, dstOff, srcLink);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void storeByOffset(long pageAddr, int off, TableSearchRow row) {
+        assertPageType(pageAddr);
+
+        putInt(pageAddr, off, row.hash());
+        off += 4;
+
+        putLong(pageAddr, off, row.link());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public TableSearchRow getLookupRow(BplusTree<TableSearchRow, ?> tree, long 
pageAddr, int idx) throws IgniteInternalCheckedException {
+        int hash = hash(pageAddr, idx);
+        long link = link(pageAddr, idx);
+
+        return ((TableTree) tree).getRowByLink(link, hash, KEY_ONLY);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long link(long pageAddr, int idx) {
+        assert idx < getCount(pageAddr) : idx;
+
+        return getLong(pageAddr, offset(idx) + 4 /* hash ahead */);

Review comment:
       Can this "4" be a constant, like "LINK_OFFSET" or something?

##########
File path: 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static 
org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.storage.StorageUtils.groupId;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree;
+import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.apache.ignite.internal.storage.PartitionStorage;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteCursor;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Storage implementation based on a {@link BplusTree}.
+ */
+// TODO: IGNITE-16644 Support snapshots.
+class PageMemoryPartitionStorage implements PartitionStorage {
+    private final int partId;
+
+    private final TableTree tree;
+
+    private final TableFreeList freeList;
+
+    /**
+     * Constructor.
+     *
+     * @param partId Partition id.
+     * @param tableCfg – Table configuration.
+     * @param dataRegion – Data region for the table.
+     * @param freeList Table free list.
+     * @throws StorageException If there is an error while creating the 
partition storage.
+     */
+    public PageMemoryPartitionStorage(
+            int partId,
+            TableConfiguration tableCfg,
+            PageMemoryDataRegion dataRegion,
+            TableFreeList freeList
+    ) throws StorageException {
+        assert partId >= 0 && partId < MAX_PARTITION_ID : partId;
+
+        this.partId = partId;
+
+        this.freeList = freeList;
+
+        TableView tableView = tableCfg.value();
+
+        int grpId = groupId(tableView);
+
+        try {
+            long metaPageId = dataRegion.pageMemory().allocatePage(grpId, 
partId, FLAG_AUX);

Review comment:
       And this is the actual reason why it only applies to volatile regions.

##########
File path: 
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageUtils.java
##########
@@ -279,4 +279,27 @@ public static void copyMemory(ByteBuffer src, long srcOff, 
ByteBuffer dst, long
     public static void copyMemory(long srcAddr, long srcOff, long dstAddr, 
long dstOff, long cnt) {
         GridUnsafe.copyMemory(null, srcAddr + srcOff, null, dstAddr + dstOff, 
cnt);
     }
+
+    /**
+     * Writes a {@link ByteBuffer} into the memory.
+     *
+     * @param addr Address.
+     * @param off Offset.
+     * @param buf Byte buffer.
+     */
+    public static void putByteBuffer(long addr, int off, ByteBuffer buf) {
+        assert addr > 0 : addr;
+        assert off >= 0 : off;
+        assert buf != null;
+
+        if (buf.isDirect()) {
+            GridUnsafe.copyMemory(buf, GridUnsafe.bufferAddress(buf), null, 
addr + off, buf.limit());

Review comment:
       Is this method ever used for direct buffers? First parameter must be 
null, you'll copy garbage data with current code!
   This is what happens when there are no unit tests

##########
File path: 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableDataRow.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import org.apache.ignite.internal.pagememory.Storable;
+import org.apache.ignite.internal.storage.DataRow;
+
+/**
+ * {@link DataRow} extension for {@link TableTree}.
+ */
+public interface TableDataRow extends TableSearchRow, DataRow, Storable {

Review comment:
       I wonder why do you need this interface in the first place? Are there 
more than 1 implementations planned?

##########
File path: 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static 
org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.storage.StorageUtils.groupId;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree;
+import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.apache.ignite.internal.storage.PartitionStorage;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteCursor;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Storage implementation based on a {@link BplusTree}.
+ */
+// TODO: IGNITE-16644 Support snapshots.
+class PageMemoryPartitionStorage implements PartitionStorage {
+    private final int partId;
+
+    private final TableTree tree;
+
+    private final TableFreeList freeList;
+
+    /**
+     * Constructor.
+     *
+     * @param partId Partition id.
+     * @param tableCfg – Table configuration.
+     * @param dataRegion – Data region for the table.
+     * @param freeList Table free list.
+     * @throws StorageException If there is an error while creating the 
partition storage.
+     */
+    public PageMemoryPartitionStorage(
+            int partId,
+            TableConfiguration tableCfg,
+            PageMemoryDataRegion dataRegion,
+            TableFreeList freeList
+    ) throws StorageException {
+        assert partId >= 0 && partId < MAX_PARTITION_ID : partId;
+
+        this.partId = partId;
+
+        this.freeList = freeList;
+
+        TableView tableView = tableCfg.value();
+
+        int grpId = groupId(tableView);
+
+        try {
+            long metaPageId = dataRegion.pageMemory().allocatePage(grpId, 
partId, FLAG_AUX);
+
+            tree = new TableTree(
+                    grpId,
+                    tableView.name(),
+                    dataRegion.pageMemory(),
+                    PageLockListenerNoOp.INSTANCE,
+                    new AtomicLong(),
+                    metaPageId,
+                    freeList,
+                    partId
+            );
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error occurred while creating the 
partition storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int partitionId() {
+        return partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable DataRow read(SearchRow key) throws StorageException {
+        try {
+            return tree.findOne(wrap(key));
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error reading row", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Collection<DataRow> readAll(List<? extends SearchRow> keys) throws 
StorageException {
+        Collection<DataRow> res = new ArrayList<>(keys.size());
+
+        try {
+            for (SearchRow key : keys) {
+                res.add(tree.findOne(wrap(key)));
+            }
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error reading rows", e);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void write(DataRow row) throws StorageException {
+        try {
+            TableDataRow dataRow = wrap(row);
+
+            freeList.insertDataRow(dataRow);
+
+            tree.put(dataRow);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error writing row", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeAll(List<? extends DataRow> rows) throws StorageException 
{
+        try {
+            for (DataRow row : rows) {
+                TableDataRow dataRow = wrap(row);
+
+                freeList.insertDataRow(dataRow);
+
+                tree.put(dataRow);
+            }
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error writing rows", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Collection<DataRow> insertAll(List<? extends DataRow> rows) throws 
StorageException {
+        Collection<DataRow> cantInsert = new ArrayList<>();
+
+        try {
+            for (DataRow row : rows) {
+                TableDataRow dataRow = wrap(row);
+
+                if (tree.findOne(dataRow) == null) {

Review comment:
       Can this be done with a compute? Get and subsequent put is not effective!

##########
File path: 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static 
org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.storage.StorageUtils.groupId;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree;
+import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.apache.ignite.internal.storage.PartitionStorage;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteCursor;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Storage implementation based on a {@link BplusTree}.
+ */
+// TODO: IGNITE-16644 Support snapshots.
+class PageMemoryPartitionStorage implements PartitionStorage {
+    private final int partId;
+
+    private final TableTree tree;
+
+    private final TableFreeList freeList;
+
+    /**
+     * Constructor.
+     *
+     * @param partId Partition id.
+     * @param tableCfg – Table configuration.
+     * @param dataRegion – Data region for the table.
+     * @param freeList Table free list.
+     * @throws StorageException If there is an error while creating the 
partition storage.
+     */
+    public PageMemoryPartitionStorage(
+            int partId,
+            TableConfiguration tableCfg,
+            PageMemoryDataRegion dataRegion,
+            TableFreeList freeList
+    ) throws StorageException {
+        assert partId >= 0 && partId < MAX_PARTITION_ID : partId;
+
+        this.partId = partId;
+
+        this.freeList = freeList;
+
+        TableView tableView = tableCfg.value();
+
+        int grpId = groupId(tableView);
+
+        try {
+            long metaPageId = dataRegion.pageMemory().allocatePage(grpId, 
partId, FLAG_AUX);
+
+            tree = new TableTree(
+                    grpId,
+                    tableView.name(),
+                    dataRegion.pageMemory(),
+                    PageLockListenerNoOp.INSTANCE,
+                    new AtomicLong(),
+                    metaPageId,
+                    freeList,
+                    partId
+            );
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error occurred while creating the 
partition storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int partitionId() {
+        return partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable DataRow read(SearchRow key) throws StorageException {
+        try {
+            return tree.findOne(wrap(key));
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error reading row", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Collection<DataRow> readAll(List<? extends SearchRow> keys) throws 
StorageException {
+        Collection<DataRow> res = new ArrayList<>(keys.size());
+
+        try {
+            for (SearchRow key : keys) {
+                res.add(tree.findOne(wrap(key)));
+            }
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error reading rows", e);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void write(DataRow row) throws StorageException {
+        try {
+            TableDataRow dataRow = wrap(row);
+
+            freeList.insertDataRow(dataRow);
+
+            tree.put(dataRow);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error writing row", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeAll(List<? extends DataRow> rows) throws StorageException 
{
+        try {
+            for (DataRow row : rows) {
+                TableDataRow dataRow = wrap(row);
+
+                freeList.insertDataRow(dataRow);
+
+                tree.put(dataRow);
+            }
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error writing rows", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Collection<DataRow> insertAll(List<? extends DataRow> rows) throws 
StorageException {
+        Collection<DataRow> cantInsert = new ArrayList<>();
+
+        try {
+            for (DataRow row : rows) {
+                TableDataRow dataRow = wrap(row);
+
+                if (tree.findOne(dataRow) == null) {
+                    freeList.insertDataRow(dataRow);
+
+                    tree.put(dataRow);
+                } else {
+                    cantInsert.add(row);
+                }
+            }
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error inserting rows", e);
+        }
+
+        return cantInsert;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void remove(SearchRow key) throws StorageException {
+        try {
+            TableSearchRow searchRow = wrap(key);
+
+            TableDataRow removed = tree.remove(searchRow);
+
+            if (removed != null) {
+                freeList.removeDataRowByLink(removed.link());
+            }
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error removing row", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Collection<SearchRow> removeAll(List<? extends SearchRow> keys) 
throws StorageException {
+        Collection<SearchRow> skippedRows = new ArrayList<>();
+
+        try {
+            for (SearchRow key : keys) {
+                TableDataRow removed = tree.remove(wrap(key));
+
+                if (removed != null) {
+                    freeList.removeDataRowByLink(removed.link());
+                } else {
+                    skippedRows.add(key);
+                }
+            }
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error removing rows", e);
+        }
+
+        return skippedRows;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Collection<DataRow> removeAllExact(List<? extends DataRow> 
keyValues) throws StorageException {
+        Collection<DataRow> skipped = new ArrayList<>();
+
+        try {
+            for (DataRow keyValue : keyValues) {
+                TableDataRow dataRow = wrap(keyValue);
+
+                TableDataRow founded = tree.findOne(dataRow);
+
+                if (founded != null && 
founded.value().equals(dataRow.value())) {
+                    tree.remove(founded);
+
+                    freeList.removeDataRowByLink(founded.link());
+                } else {
+                    skipped.add(keyValue);
+                }
+            }
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error while removing exact rows", e);
+        }
+
+        return skipped;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> @Nullable T invoke(SearchRow key, InvokeClosure<T> clo) throws 
StorageException {
+        IgniteTree.InvokeClosure<TableDataRow> treeClosure = new 
IgniteTree.InvokeClosure<>() {
+            /** {@inheritDoc} */
+            @Override
+            public void call(@Nullable TableDataRow oldRow) {
+                clo.call(oldRow);
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public @Nullable TableDataRow newRow() {
+                DataRow newRow = clo.newRow();
+
+                if (newRow == null) {
+                    return null;
+                }
+
+                TableDataRow dataRow = wrap(newRow);
+
+                try {
+                    freeList.insertDataRow(dataRow);
+                } catch (IgniteInternalCheckedException e) {
+                    throw new IgniteInternalException(e);
+                }
+
+                return dataRow;
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public @Nullable IgniteTree.OperationType operationType() {
+                OperationType operationType = clo.operationType();
+
+                switch (operationType) {
+                    case WRITE:
+                        return IgniteTree.OperationType.PUT;
+
+                    case REMOVE:
+                        return IgniteTree.OperationType.REMOVE;
+
+                    case NOOP:
+                        return IgniteTree.OperationType.NOOP;
+
+                    default:
+                        throw new 
UnsupportedOperationException(String.valueOf(clo.operationType()));
+                }
+            }
+        };
+
+        try {
+            tree.invoke(wrap(key), null, treeClosure);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error invoking a closure for a row", 
e);
+        }
+
+        return clo.result();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws 
StorageException {
+        try {
+            IgniteCursor<TableDataRow> treeCursor = tree.find(null, null);
+
+            return new Cursor<DataRow>() {
+                @Nullable TableDataRow cur = advance();
+
+                /** {@inheritDoc} */
+                @Override
+                public void close() {
+                }
+
+                /** {@inheritDoc} */
+                @Override
+                public Iterator<DataRow> iterator() {
+                    return this;
+                }
+
+                /** {@inheritDoc} */
+                @Override
+                public boolean hasNext() {
+                    return cur != null;
+                }
+
+                /** {@inheritDoc} */
+                @Override
+                public DataRow next() {
+                    DataRow next = cur;
+
+                    if (next == null) {
+                        throw new NoSuchElementException();
+                    }
+
+                    try {
+                        cur = advance();
+                    } catch (IgniteInternalCheckedException e) {
+                        throw new StorageException("Error getting next row", 
e);
+                    }
+
+                    return next;
+                }
+
+                @Nullable TableDataRow advance() throws 
IgniteInternalCheckedException {
+                    while (treeCursor.next()) {
+                        TableDataRow dataRow = treeCursor.get();
+
+                        if (filter.test(dataRow)) {
+                            return dataRow;
+                        }
+                    }
+
+                    return null;
+                }
+            };
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error while scanning rows", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> snapshot(Path snapshotPath) {
+        throw new UnsupportedOperationException("Snapshots are not supported 
yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void restoreSnapshot(Path snapshotPath) {
+        throw new UnsupportedOperationException("Snapshots are not supported 
yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void destroy() throws StorageException {
+        try {
+            tree.destroy();

Review comment:
       Do we have a partition meta page?
   Does destruction of the tree also deallocates tree meta page?

##########
File path: 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableInnerIo.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+import static 
org.apache.ignite.internal.storage.pagememory.TableTree.RowData.KEY_ONLY;
+
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusInnerIo;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.storage.pagememory.TableSearchRow;
+import org.apache.ignite.internal.storage.pagememory.TableTree;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * IO routines for {@link TableTree} inner pages.
+ *
+ * <p>Structure: hash(int) + link(long).
+ */
+public class TableInnerIo extends BplusInnerIo<TableSearchRow> implements 
RowIo {
+    /** Page IO type. */
+    public static final short T_TABLE_INNER_IO = 4;
+
+    /** I/O versions. */
+    public static final IoVersions<TableInnerIo> VERSIONS = new 
IoVersions<>(new TableInnerIo(1));
+
+    /**
+     * Constructor.
+     *
+     * @param ver Page format version.
+     */
+    protected TableInnerIo(int ver) {
+        super(
+                T_TABLE_INNER_IO,
+                ver,
+                true,
+                Integer.BYTES + Long.BYTES // hash(int) + link(long);
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void store(long dstPageAddr, int dstIdx, BplusIo<TableSearchRow> 
srcIo, long srcPageAddr, int srcIdx) {
+        assertPageType(dstPageAddr);
+
+        int srcHash = hash(srcPageAddr, srcIdx);
+        long srcLink = link(srcPageAddr, srcIdx);
+
+        int dstOff = offset(dstIdx);
+
+        putInt(dstPageAddr, dstOff, srcHash);
+        dstOff += 4;
+
+        putLong(dstPageAddr, dstOff, srcLink);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void storeByOffset(long pageAddr, int off, TableSearchRow row) {
+        assertPageType(pageAddr);
+
+        putInt(pageAddr, off, row.hash());
+        off += 4;
+
+        putLong(pageAddr, off, row.link());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public TableSearchRow getLookupRow(BplusTree<TableSearchRow, ?> tree, long 
pageAddr, int idx) throws IgniteInternalCheckedException {
+        int hash = hash(pageAddr, idx);
+        long link = link(pageAddr, idx);
+
+        return ((TableTree) tree).getRowByLink(link, hash, KEY_ONLY);

Review comment:
       We should make this one lazy in the future, what do you think?

##########
File path: 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableLeafIo.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+import static 
org.apache.ignite.internal.storage.pagememory.TableTree.RowData.KEY_ONLY;
+
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.pagememory.tree.io.BplusLeafIo;
+import org.apache.ignite.internal.storage.pagememory.TableSearchRow;
+import org.apache.ignite.internal.storage.pagememory.TableTree;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * IO routines for {@link TableTree} leaf pages.
+ *
+ * <p>Structure: hash(int) + link(long).
+ */
+public class TableLeafIo extends BplusLeafIo<TableSearchRow> implements RowIo {
+    /** Page IO type. */
+    public static final short T_TABLE_LEAF_IO = 5;
+
+    /** I/O versions. */
+    public static final IoVersions<TableLeafIo> VERSIONS = new 
IoVersions<>(new TableLeafIo(1));
+
+    /**
+     * Constructor.
+     *
+     * @param ver Page format version.
+     */
+    protected TableLeafIo(int ver) {
+        super(
+                T_TABLE_LEAF_IO,
+                ver,
+                Integer.BYTES + Long.BYTES // hash(int) + link(long);
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void store(long dstPageAddr, int dstIdx, BplusIo<TableSearchRow> 
srcIo, long srcPageAddr, int srcIdx) {
+        assertPageType(dstPageAddr);
+
+        int srcHash = hash(srcPageAddr, srcIdx);
+        long srcLink = link(srcPageAddr, srcIdx);
+
+        int dstOff = offset(dstIdx);
+
+        putInt(dstPageAddr, dstOff, srcHash);
+        dstOff += 4;
+
+        putLong(dstPageAddr, dstOff, srcLink);
+

Review comment:
       Empty line

##########
File path: 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryDataRegion.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import 
org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionView;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstract data region for {@link PageMemoryStorageEngine}. Based on a {@link 
PageMemory}.
+ */
+// TODO: IGNITE-16641 Add support for persistent case.
+abstract class PageMemoryDataRegion implements DataRegion {
+    protected final PageMemoryDataRegionConfiguration cfg;
+
+    protected final PageIoRegistry ioRegistry;
+
+    protected PageMemory pageMemory;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Data region configuration.
+     * @param ioRegistry IO registry.
+     */
+    public PageMemoryDataRegion(PageMemoryDataRegionConfiguration cfg, 
PageIoRegistry ioRegistry) {
+        this.cfg = cfg;
+        this.ioRegistry = ioRegistry;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() {
+        if (pageMemory != null) {
+            pageMemory.stop(true);
+        }
+    }
+
+    /**
+     * Returns {@link true} if the date region is persistent.
+     */
+    public boolean persistent() {
+        return ((PageMemoryDataRegionView) cfg.value()).persistent();

Review comment:
       Too bad that code generator doesn't provide overrides. We should fix it 
one day

##########
File path: 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/TableTree.java
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.itemId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getBytes;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
+import static 
org.apache.ignite.internal.storage.pagememory.TableTree.RowData.FULL;
+import static 
org.apache.ignite.internal.storage.pagememory.TableTree.RowData.KEY_ONLY;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.pagememory.util.PageLockListener;
+import org.apache.ignite.internal.storage.pagememory.io.RowIo;
+import org.apache.ignite.internal.storage.pagememory.io.TableDataIo;
+import org.apache.ignite.internal.storage.pagememory.io.TableInnerIo;
+import org.apache.ignite.internal.storage.pagememory.io.TableLeafIo;
+import org.apache.ignite.internal.storage.pagememory.io.TableMetaIo;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link BplusTree} implementation for storage-page-memory module.
+ */
+public class TableTree extends BplusTree<TableSearchRow, TableDataRow> {
+    private final int partId;
+
+    /**
+     * Constructor.
+     *
+     * @param grpId Group ID.
+     * @param grpName Group name.
+     * @param pageMem Page memory.
+     * @param lockLsnr Page lock listener.
+     * @param globalRmvId Global remove ID.
+     * @param metaPageId Meta page ID.
+     * @param reuseList Reuse list.
+     * @param partId Partition id.
+     */
+    public TableTree(
+            int grpId,
+            String grpName,
+            PageMemory pageMem,
+            PageLockListener lockLsnr,
+            AtomicLong globalRmvId,
+            long metaPageId,
+            @Nullable ReuseList reuseList,
+            int partId
+    ) throws IgniteInternalCheckedException {
+        super(
+                "TableTree_" + grpId,
+                grpId,
+                grpName,
+                pageMem,
+                lockLsnr,
+                FLAG_AUX,
+                globalRmvId,
+                metaPageId,
+                reuseList
+        );
+
+        this.partId = partId;
+
+        setIos(TableInnerIo.VERSIONS, TableLeafIo.VERSIONS, 
TableMetaIo.VERSIONS);
+
+        initTree(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected long allocatePageNoReuse() throws IgniteInternalCheckedException 
{
+        return pageMem.allocatePage(grpId, partId, defaultPageFlag);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected int compare(BplusIo<TableSearchRow> io, long pageAddr, int idx, 
TableSearchRow row) throws IgniteInternalCheckedException {
+        RowIo rowIo = (RowIo) io;
+
+        int cmp = Integer.compare(rowIo.hash(pageAddr, idx), row.hash());
+
+        return cmp != 0 ? cmp : compareRows(rowIo.link(pageAddr, idx), row);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public TableDataRow getRow(BplusIo<TableSearchRow> io, long pageAddr, int 
idx, Object x) throws IgniteInternalCheckedException {
+        RowIo rowIo = (RowIo) io;
+
+        int hash = rowIo.hash(pageAddr, idx);
+        long link = rowIo.link(pageAddr, idx);
+
+        return getRowByLink(link, hash, FULL);
+    }
+
+    /**
+     * Returns a row by link.
+     *
+     * @param link Row link.
+     * @param hash Row hash.
+     * @param rowData Specifies what data to lookup.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public TableDataRow getRowByLink(final long link, int hash, RowData 
rowData) throws IgniteInternalCheckedException {
+        assert link != 0;
+
+        FragmentedByteArray keyBytes = null;
+        FragmentedByteArray valueBytes = null;
+
+        long nextLink = link;
+
+        do {
+            final long pageId = pageId(nextLink);
+
+            final long page = pageMem.acquirePage(grpId, pageId, 
statisticsHolder());
+
+            try {
+                long pageAddr = pageMem.readLock(grpId, pageId, page);
+
+                assert pageAddr != 0L : nextLink;
+
+                try {
+                    TableDataIo dataIo = 
pageMem.ioRegistry().resolve(pageAddr);
+
+                    int itemId = itemId(nextLink);
+
+                    int pageSize = pageMem.realPageSize(grpId);
+
+                    DataPagePayload data = dataIo.readPayload(pageAddr, 
itemId, pageSize);
+
+                    if (data.nextLink() == 0 && nextLink == link) {
+                        // Good luck: we can read the row without fragments.
+                        return readFullRow(link, hash, rowData, pageAddr + 
data.offset());
+                    }
+
+                    ByteBuffer dataBuf = wrapPointer(pageAddr, pageSize);
+
+                    dataBuf.position(data.offset());
+                    dataBuf.limit(data.offset() + data.payloadSize());
+
+                    if (keyBytes == null) {
+                        keyBytes = new FragmentedByteArray();
+                    }
+
+                    keyBytes.readData(dataBuf);
+
+                    if (keyBytes.ready()) {
+                        if (rowData == KEY_ONLY) {
+                            nextLink = 0;
+                            continue;
+                        }
+
+                        if (valueBytes == null) {
+                            valueBytes = new FragmentedByteArray();
+                        }
+
+                        valueBytes.readData(dataBuf);
+
+                        if (valueBytes.ready()) {
+                            nextLink = 0;
+                            continue;
+                        }
+                    }
+
+                    nextLink = data.nextLink();
+                } finally {
+                    pageMem.readUnlock(grpId, pageId, page);
+                }
+            } finally {
+                pageMem.releasePage(grpId, pageId, page);
+            }
+        } while (nextLink != 0);
+
+        ByteBuffer key = ByteBuffer.wrap(keyBytes.array());
+        ByteBuffer value = ByteBuffer.wrap(valueBytes == null ? 
BYTE_EMPTY_ARRAY : valueBytes.array());
+
+        return new TableDataRowImpl(link, hash, key, value);
+    }
+
+    private TableDataRow readFullRow(long link, int hash, RowData rowData, 
long pageAddr) {
+        int off = 0;
+
+        int keyBytesLen = getInt(pageAddr, off);
+        off += 4;
+
+        byte[] keyBytes = getBytes(pageAddr, off, keyBytesLen);
+        off += keyBytesLen;
+
+        if (rowData == KEY_ONLY) {
+            return new TableDataRowImpl(link, hash, ByteBuffer.wrap(keyBytes), 
ByteBuffer.wrap(BYTE_EMPTY_ARRAY));
+        }
+
+        int valueBytesLen = getInt(pageAddr, off);
+        off += 4;
+
+        byte[] valueBytes = getBytes(pageAddr, off, valueBytesLen);
+
+        return new TableDataRowImpl(link, hash, ByteBuffer.wrap(keyBytes), 
ByteBuffer.wrap(valueBytes));
+    }
+
+    private int compareRows(final long link, TableSearchRow row) throws 
IgniteInternalCheckedException {
+        assert link != 0;
+
+        long nextLink = link;
+
+        int keyBytesLen = -1;
+        int keyBytesOff = 0;
+
+        do {
+            final long pageId = pageId(nextLink);
+
+            final long page = pageMem.acquirePage(grpId, pageId, 
statisticsHolder());
+
+            try {
+                final long pageAddr = pageMem.readLock(grpId, pageId, page);
+
+                assert pageAddr != 0L : nextLink;
+
+                try {
+                    TableDataIo dataIo = 
pageMem.ioRegistry().resolve(pageAddr);
+
+                    int itemId = itemId(nextLink);
+
+                    int pageSize = pageMem.realPageSize(grpId);
+
+                    DataPagePayload data = dataIo.readPayload(pageAddr, 
itemId, pageSize);
+
+                    if (data.nextLink() == 0 && nextLink == link) {
+                        // Good luck: we can compare the rows without 
fragments.
+                        return compareRowsFull(pageAddr + data.offset(), row);
+                    }
+
+                    ByteBuffer dataBuf = wrapPointer(pageAddr, pageSize);
+
+                    dataBuf.position(data.offset());
+                    dataBuf.limit(data.offset() + data.payloadSize());

Review comment:
       Interesting, should this be without "data.offset()"? Is this code tested?

##########
File path: 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/io/TableLeafIo.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+import static 
org.apache.ignite.internal.storage.pagememory.TableTree.RowData.KEY_ONLY;
+
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.pagememory.tree.io.BplusLeafIo;
+import org.apache.ignite.internal.storage.pagememory.TableSearchRow;
+import org.apache.ignite.internal.storage.pagememory.TableTree;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * IO routines for {@link TableTree} leaf pages.
+ *
+ * <p>Structure: hash(int) + link(long).
+ */
+public class TableLeafIo extends BplusLeafIo<TableSearchRow> implements RowIo {
+    /** Page IO type. */
+    public static final short T_TABLE_LEAF_IO = 5;
+
+    /** I/O versions. */
+    public static final IoVersions<TableLeafIo> VERSIONS = new 
IoVersions<>(new TableLeafIo(1));
+
+    /**
+     * Constructor.
+     *
+     * @param ver Page format version.
+     */
+    protected TableLeafIo(int ver) {
+        super(
+                T_TABLE_LEAF_IO,
+                ver,
+                Integer.BYTES + Long.BYTES // hash(int) + link(long);
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void store(long dstPageAddr, int dstIdx, BplusIo<TableSearchRow> 
srcIo, long srcPageAddr, int srcIdx) {
+        assertPageType(dstPageAddr);
+
+        int srcHash = hash(srcPageAddr, srcIdx);
+        long srcLink = link(srcPageAddr, srcIdx);
+
+        int dstOff = offset(dstIdx);
+
+        putInt(dstPageAddr, dstOff, srcHash);
+        dstOff += 4;

Review comment:
       Magic numbers everywhere...

##########
File path: 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static 
org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.storage.StorageUtils.groupId;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree;
+import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.apache.ignite.internal.storage.PartitionStorage;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteCursor;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Storage implementation based on a {@link BplusTree}.
+ */
+// TODO: IGNITE-16644 Support snapshots.
+class PageMemoryPartitionStorage implements PartitionStorage {
+    private final int partId;
+
+    private final TableTree tree;
+
+    private final TableFreeList freeList;
+
+    /**
+     * Constructor.
+     *
+     * @param partId Partition id.
+     * @param tableCfg – Table configuration.
+     * @param dataRegion – Data region for the table.
+     * @param freeList Table free list.
+     * @throws StorageException If there is an error while creating the 
partition storage.
+     */
+    public PageMemoryPartitionStorage(
+            int partId,
+            TableConfiguration tableCfg,
+            PageMemoryDataRegion dataRegion,
+            TableFreeList freeList
+    ) throws StorageException {
+        assert partId >= 0 && partId < MAX_PARTITION_ID : partId;
+
+        this.partId = partId;
+
+        this.freeList = freeList;
+
+        TableView tableView = tableCfg.value();
+
+        int grpId = groupId(tableView);
+
+        try {
+            long metaPageId = dataRegion.pageMemory().allocatePage(grpId, 
partId, FLAG_AUX);
+
+            tree = new TableTree(
+                    grpId,
+                    tableView.name(),
+                    dataRegion.pageMemory(),
+                    PageLockListenerNoOp.INSTANCE,
+                    new AtomicLong(),
+                    metaPageId,
+                    freeList,
+                    partId
+            );
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error occurred while creating the 
partition storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int partitionId() {
+        return partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable DataRow read(SearchRow key) throws StorageException {
+        try {
+            return tree.findOne(wrap(key));
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error reading row", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Collection<DataRow> readAll(List<? extends SearchRow> keys) throws 
StorageException {
+        Collection<DataRow> res = new ArrayList<>(keys.size());
+
+        try {
+            for (SearchRow key : keys) {
+                res.add(tree.findOne(wrap(key)));
+            }
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error reading rows", e);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void write(DataRow row) throws StorageException {
+        try {
+            TableDataRow dataRow = wrap(row);
+
+            freeList.insertDataRow(dataRow);
+
+            tree.put(dataRow);
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error writing row", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeAll(List<? extends DataRow> rows) throws StorageException 
{
+        try {
+            for (DataRow row : rows) {
+                TableDataRow dataRow = wrap(row);
+
+                freeList.insertDataRow(dataRow);
+
+                tree.put(dataRow);
+            }
+        } catch (IgniteInternalCheckedException e) {
+            throw new StorageException("Error writing rows", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Collection<DataRow> insertAll(List<? extends DataRow> rows) throws 
StorageException {
+        Collection<DataRow> cantInsert = new ArrayList<>();
+
+        try {
+            for (DataRow row : rows) {
+                TableDataRow dataRow = wrap(row);
+
+                if (tree.findOne(dataRow) == null) {

Review comment:
       I wonder how such operations are implemented in Ignite 2.x. I guess 
there's no point in fixing this right now, MV-store API and transactions will 
change a lot of things in the near future.

##########
File path: 
modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorageTest.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static java.util.stream.Collectors.joining;
+import static 
org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+import java.nio.file.Path;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import 
org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema;
+import 
org.apache.ignite.configuration.schemas.store.UnsafeMemoryAllocatorConfigurationSchema;
+import 
org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.AbstractPartitionStorageTest;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.engine.DataRegion;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.TableStorage;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Storage test implementation for {@link PageMemoryPartitionStorage}.
+ */
+// TODO: IGNITE-16641 Add test for persistent case.
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+public class PageMemoryPartitionStorageTest extends 
AbstractPartitionStorageTest {
+    private static PageIoRegistry ioRegistry;
+
+    @InjectConfiguration(
+            value = "mock.type = pagemem",
+            polymorphicExtensions = {
+                    PageMemoryDataRegionConfigurationSchema.class,
+                    UnsafeMemoryAllocatorConfigurationSchema.class
+            })
+    private DataRegionConfiguration dataRegionCfg;
+
+    @InjectConfiguration(
+            value = "mock.name = default",
+            polymorphicExtensions = HashIndexConfigurationSchema.class
+    )
+    private TableConfiguration tableCfg;
+
+    @WorkDirectory
+    private Path workDir;

Review comment:
       Can this be null for in-memory storage?

##########
File path: 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static 
org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.storage.StorageUtils.groupId;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree;
+import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.apache.ignite.internal.storage.PartitionStorage;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteCursor;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Storage implementation based on a {@link BplusTree}.
+ */
+// TODO: IGNITE-16644 Support snapshots.
+class PageMemoryPartitionStorage implements PartitionStorage {

Review comment:
       This class only applies to volatile regions, right? I see no mentions of 
this fact.

##########
File path: 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import static 
org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.storage.StorageUtils.groupId;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree;
+import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.apache.ignite.internal.storage.PartitionStorage;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteCursor;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Storage implementation based on a {@link BplusTree}.
+ */
+// TODO: IGNITE-16644 Support snapshots.
+class PageMemoryPartitionStorage implements PartitionStorage {
+    private final int partId;
+
+    private final TableTree tree;
+
+    private final TableFreeList freeList;
+
+    /**
+     * Constructor.
+     *
+     * @param partId Partition id.
+     * @param tableCfg – Table configuration.
+     * @param dataRegion – Data region for the table.
+     * @param freeList Table free list.
+     * @throws StorageException If there is an error while creating the 
partition storage.
+     */
+    public PageMemoryPartitionStorage(
+            int partId,
+            TableConfiguration tableCfg,
+            PageMemoryDataRegion dataRegion,
+            TableFreeList freeList
+    ) throws StorageException {
+        assert partId >= 0 && partId < MAX_PARTITION_ID : partId;
+
+        this.partId = partId;
+
+        this.freeList = freeList;
+
+        TableView tableView = tableCfg.value();
+
+        int grpId = groupId(tableView);
+
+        try {
+            long metaPageId = dataRegion.pageMemory().allocatePage(grpId, 
partId, FLAG_AUX);

Review comment:
       BTW, where's the code that allocates tree root? In tree's constructor? 
If it is in constructor, then this is a questionable solution, so I'm waiting 
for the reply.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to