sashapolo commented on code in PR #1673:
URL: https://github.com/apache/ignite-3/pull/1673#discussion_r1109526759
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java:
##########
@@ -51,8 +49,12 @@ class AddWriteCommittedInvokeClosure implements
InvokeClosure<VersionChain> {
private final AbstractPageMemoryMvPartitionStorage storage;
+ private OperationType operationType;
+
private @Nullable VersionChain newRow;
+ private boolean addToGc;
Review Comment:
Please document the purpose of this field and when it's set to true or false
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java:
##########
@@ -45,7 +45,9 @@ public class PartitionMetaIo extends PageIo {
public static final int INDEX_TREE_META_PAGE_ID_OFF =
VERSION_CHAIN_TREE_ROOT_PAGE_ID_OFF + Long.BYTES;
- private static final int PAGE_COUNT_OFF = INDEX_TREE_META_PAGE_ID_OFF +
Long.BYTES;
+ public static final int GARBAGE_COLLECTION_META_PAGE_ID_OFF =
INDEX_TREE_META_PAGE_ID_OFF + Long.BYTES;
Review Comment:
Why is this field (and some other fields, btw) public?
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionIdFromLink;
+import static
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionless;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import
org.apache.ignite.internal.storage.pagememory.mv.FindRowVersion.RowVersionFilter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Search for a row version in version chains.
+ */
+class FindRowVersion implements PageMemoryTraversal<RowVersionFilter> {
+ private final int partitionId;
+
+ private final boolean loadValueBytes;
+
+ private boolean rowVersionFounded;
+
+ private final ReadRowVersionValue readRowVersionValue = new
ReadRowVersionValue();
+
+ private long rowLink = NULL_LINK;
+
+ private @Nullable HybridTimestamp rowTimestamp;
+
+ private long rowNextLink = NULL_LINK;
+
+ private int rowValueSize;
+
+ private @Nullable RowVersion result;
+
+ FindRowVersion(int partitionId, boolean loadValueBytes) {
+ this.partitionId = partitionId;
+ this.loadValueBytes = loadValueBytes;
+ }
+
+ @Override
+ public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, RowVersionFilter arg) {
+ if (!rowVersionFounded) {
+ long nextLink = readPartitionless(partitionId, pageAddr,
payload.offset() + RowVersion.NEXT_LINK_OFFSET);
+
+ if (arg.apply(link, pageAddr + payload.offset())) {
+ rowVersionFounded = true;
+
+ rowLink = link;
+ rowTimestamp = HybridTimestamps.readTimestamp(pageAddr,
payload.offset() + RowVersion.TIMESTAMP_OFFSET);
+ rowNextLink = nextLink;
+
+ if (!loadValueBytes) {
+ rowValueSize = PageUtils.getInt(pageAddr, payload.offset()
+ RowVersion.VALUE_SIZE_OFFSET);
+
+ return STOP_TRAVERSAL;
+ } else {
+ return readRowVersionValue.consumePagePayload(link,
pageAddr, payload, null);
+ }
+ } else {
+ return nextLink;
+ }
+ } else {
+ return readRowVersionValue.consumePagePayload(link, pageAddr,
payload, null);
+ }
+ }
+
+ @Override
+ public void finish() {
+ if (!rowVersionFounded) {
+ return;
+ }
+
+ if (loadValueBytes) {
+ readRowVersionValue.finish();
+
+ byte[] valueBytes = readRowVersionValue.result();
+
+ ByteBuffer value =
ByteBuffer.wrap(valueBytes).order(ByteBufferRow.ORDER);
+
+ result = new RowVersion(partitionId, rowLink, rowTimestamp,
rowNextLink, value);
+ } else {
+ result = new RowVersion(partitionId, rowLink, rowTimestamp,
rowNextLink, rowValueSize);
+ }
+ }
+
+ /**
+ * Returns the found version in the version chain, {@code null} if not
found.
+ */
+ @Nullable RowVersion getResult() {
+ return result;
+ }
+
+ /**
+ * Row version filter in the version chain.
+ */
+ interface RowVersionFilter {
+ /**
+ * Returns {@code true} if the version matches.
+ *
+ * @param rowVersionLink Row version link;
+ * @param rowVersionAdder Address to row version (including page
address + offset within it).
+ */
+ boolean apply(long rowVersionLink, long rowVersionAdder);
Review Comment:
```suggestion
boolean apply(long rowVersionLink, long rowVersionAddr);
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionIdFromLink;
+import static
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionless;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import
org.apache.ignite.internal.storage.pagememory.mv.FindRowVersion.RowVersionFilter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Search for a row version in version chains.
+ */
+class FindRowVersion implements PageMemoryTraversal<RowVersionFilter> {
+ private final int partitionId;
+
+ private final boolean loadValueBytes;
+
+ private boolean rowVersionFounded;
+
+ private final ReadRowVersionValue readRowVersionValue = new
ReadRowVersionValue();
+
+ private long rowLink = NULL_LINK;
+
+ private @Nullable HybridTimestamp rowTimestamp;
+
+ private long rowNextLink = NULL_LINK;
+
+ private int rowValueSize;
+
+ private @Nullable RowVersion result;
+
+ FindRowVersion(int partitionId, boolean loadValueBytes) {
+ this.partitionId = partitionId;
+ this.loadValueBytes = loadValueBytes;
+ }
+
+ @Override
+ public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, RowVersionFilter arg) {
+ if (!rowVersionFounded) {
+ long nextLink = readPartitionless(partitionId, pageAddr,
payload.offset() + RowVersion.NEXT_LINK_OFFSET);
+
+ if (arg.apply(link, pageAddr + payload.offset())) {
+ rowVersionFounded = true;
+
+ rowLink = link;
+ rowTimestamp = HybridTimestamps.readTimestamp(pageAddr,
payload.offset() + RowVersion.TIMESTAMP_OFFSET);
+ rowNextLink = nextLink;
+
+ if (!loadValueBytes) {
+ rowValueSize = PageUtils.getInt(pageAddr, payload.offset()
+ RowVersion.VALUE_SIZE_OFFSET);
+
+ return STOP_TRAVERSAL;
+ } else {
+ return readRowVersionValue.consumePagePayload(link,
pageAddr, payload, null);
+ }
+ } else {
+ return nextLink;
+ }
+ } else {
+ return readRowVersionValue.consumePagePayload(link, pageAddr,
payload, null);
+ }
+ }
+
+ @Override
+ public void finish() {
+ if (!rowVersionFounded) {
+ return;
+ }
+
+ if (loadValueBytes) {
+ readRowVersionValue.finish();
+
+ byte[] valueBytes = readRowVersionValue.result();
+
+ ByteBuffer value =
ByteBuffer.wrap(valueBytes).order(ByteBufferRow.ORDER);
+
+ result = new RowVersion(partitionId, rowLink, rowTimestamp,
rowNextLink, value);
+ } else {
+ result = new RowVersion(partitionId, rowLink, rowTimestamp,
rowNextLink, rowValueSize);
+ }
+ }
+
+ /**
+ * Returns the found version in the version chain, {@code null} if not
found.
+ */
+ @Nullable RowVersion getResult() {
+ return result;
+ }
+
+ /**
+ * Row version filter in the version chain.
+ */
+ interface RowVersionFilter {
+ /**
+ * Returns {@code true} if the version matches.
+ *
+ * @param rowVersionLink Row version link;
+ * @param rowVersionAdder Address to row version (including page
address + offset within it).
Review Comment:
```suggestion
* @param rowVersionAdder Address of row version (including page
address + offset within it).
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionIdFromLink;
+import static
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionless;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import
org.apache.ignite.internal.storage.pagememory.mv.FindRowVersion.RowVersionFilter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Search for a row version in version chains.
+ */
+class FindRowVersion implements PageMemoryTraversal<RowVersionFilter> {
+ private final int partitionId;
+
+ private final boolean loadValueBytes;
+
+ private boolean rowVersionFounded;
+
+ private final ReadRowVersionValue readRowVersionValue = new
ReadRowVersionValue();
+
+ private long rowLink = NULL_LINK;
+
+ private @Nullable HybridTimestamp rowTimestamp;
+
+ private long rowNextLink = NULL_LINK;
+
+ private int rowValueSize;
+
+ private @Nullable RowVersion result;
+
+ FindRowVersion(int partitionId, boolean loadValueBytes) {
+ this.partitionId = partitionId;
+ this.loadValueBytes = loadValueBytes;
+ }
+
+ @Override
+ public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, RowVersionFilter arg) {
Review Comment:
```suggestion
public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, RowVersionFilter filter) {
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/HybridTimestamps.java:
##########
@@ -31,23 +31,22 @@
* Code to work with {@link HybridTimestamp}s.
*/
public class HybridTimestamps {
- /**
- * Physical time component to store for {@code null} hybrid timestamp
values.
- */
+ /** Physical time component to store for {@code null} hybrid timestamp
values. */
private static final long NULL_PHYSICAL_TIME = 0L;
- /**
- * Logical time component to store for {@code null} hybrid timestamp
values.
- */
+ /** Logical time component to store for {@code null} hybrid timestamp
values. */
private static final int NULL_LOGICAL_TIME = 0;
+ /** Payload size in bytes. */
+ public static final int SIZE_IN_BYTES = Long.BYTES + Integer.BYTES;
Review Comment:
This is a duplicate of `HybridTimestamp.HYBRID_TIMESTAMP_SIZE` constant
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java:
##########
@@ -46,11 +49,11 @@ class ReadRowVersion implements
PageMemoryTraversal<Predicate<HybridTimestamp>>
private final ReadRowVersionValue readRowVersionValue = new
ReadRowVersionValue();
- ReadRowVersion(int partitionId) {
+ ReadRowVersion(int partitionId, boolean loadValueBytes) {
Review Comment:
Please document the purpose of the boolean flag
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/io/GarbageCollectionIo.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv.gc.io;
+
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.pagememory.mv.HybridTimestamps;
+import
org.apache.ignite.internal.storage.pagememory.mv.gc.GarbageCollectionRowVersion;
+import
org.apache.ignite.internal.storage.pagememory.mv.gc.GarbageCollectionTree;
+
+/**
+ * Interface for {@link GarbageCollectionTree}-related IO.
+ *
+ * <p>Defines a following data layout:
+ * <ul>
+ * <li>Row ID (16 bytes);</li>
+ * <li>Row timestamp (12 bytes).</li>
+ * </ul>
+ */
+public interface GarbageCollectionIo {
+ /** Offset of rowId's most significant bits, 8 bytes. */
+ int ROW_ID_MSB_OFFSET = 0;
+
+ /** Offset of rowId's least significant bits, 8 bytes. */
+ int ROW_ID_LSB_OFFSET = ROW_ID_MSB_OFFSET + Long.BYTES;
+
+ /** Offset of row timestamp, 12 bytes. */
+ int ROW_TIMESTAMP_OFFSET = ROW_ID_LSB_OFFSET + Long.BYTES;
+
+ /** Payload size in bytes. */
+ int SIZE_IN_BYTES = ROW_TIMESTAMP_OFFSET + HybridTimestamps.SIZE_IN_BYTES;
+
+ /**
+ * Returns an offset of the element inside the page.
+ *
+ * @see BplusIo#offset(int)
+ */
+ int offset(int idx);
+
+ /**
+ * Stores a row version for garbage collection, copied from another page.
+ *
+ * @see BplusIo#store(long, int, BplusIo, long, int)
+ */
+ default void store(long dstPageAddr, int dstIdx,
BplusIo<GarbageCollectionRowVersion> srcIo, long srcPageAddr, int srcIdx) {
+ int dstOffset = offset(dstIdx);
+ int srcOffset = offset(srcIdx);
+
+ PageUtils.copyMemory(srcPageAddr, srcOffset, dstPageAddr, dstOffset,
SIZE_IN_BYTES);
+ }
+
+ /**
+ * Stores a row version for garbage collection chain in the page.
+ *
+ * @see BplusIo#storeByOffset(long, int, Object)
+ */
+ default void storeByOffset(long pageAddr, int off,
GarbageCollectionRowVersion row) {
+ RowId rowId = row.getRowId();
+
+ putLong(pageAddr, off + ROW_ID_MSB_OFFSET,
rowId.mostSignificantBits());
+ putLong(pageAddr, off + ROW_ID_LSB_OFFSET,
rowId.leastSignificantBits());
+
+ HybridTimestamps.writeTimestampToMemory(pageAddr, off +
ROW_TIMESTAMP_OFFSET, row.getTimestamp());
+ }
+
+ /**
+ * Compare the row version for garbage collection from the page with
passed row version, thus defining the order of element in the
+ * {@link GarbageCollectionTree}.
+ *
+ * @param pageAddr Page address.
+ * @param idx Element's index.
+ * @param rowVersion Row version for garbage collection.
+ * @return Comparison result.
+ */
+ default int compare(long pageAddr, int idx, GarbageCollectionRowVersion
rowVersion) {
+ int offset = offset(idx);
+
+ int cmp = HybridTimestamps.readTimestamp(pageAddr, offset +
ROW_TIMESTAMP_OFFSET).compareTo(rowVersion.getTimestamp());
+
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ RowId rowId = rowVersion.getRowId();
+
+ cmp = Long.compare(getLong(pageAddr, offset + ROW_ID_MSB_OFFSET),
rowId.mostSignificantBits());
+
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ return Long.compare(getLong(pageAddr, offset + ROW_ID_LSB_OFFSET),
rowId.leastSignificantBits());
+ }
+
+ /**
+ * Reads a row version for garbage collection from the page.
+ *
+ * @param pageAddr Page address.
+ * @param idx Element's index.
+ * @param partitionId Partition id to enrich read partitionless links.
+ */
+ default GarbageCollectionRowVersion getRow(long pageAddr, int idx, int
partitionId) {
+ int offset = offset(idx);
+
+ long rowIdMsb = getLong(pageAddr, offset + ROW_ID_MSB_OFFSET);
+ long rowIdLsb = getLong(pageAddr, offset + ROW_ID_LSB_OFFSET);
+
+ return new GarbageCollectionRowVersion(
+ new RowId(partitionId, rowIdMsb, rowIdLsb),
+ HybridTimestamps.readTimestamp(pageAddr, offset +
ROW_TIMESTAMP_OFFSET)
Review Comment:
`readTimestamp` can return `null`, is that ok?
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -384,6 +398,24 @@ RowVersion readRowVersion(long rowVersionLink,
Predicate<HybridTimestamp> loadVa
return read.result();
}
+ @Nullable RowVersion foundRowVersion(VersionChain versionChain,
RowVersionFilter filter, boolean loadValueBytes) {
Review Comment:
Shouldn't it be `findRowVersion`?
##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java:
##########
@@ -138,7 +138,7 @@ void testReadWritePartitionMeta(@WorkDirectory Path
workDir) throws Exception {
try (FilePageStore filePageStore =
createFilePageStore(testFilePath)) {
manager.writeMetaToBuffer(
partId,
- new PartitionMeta(UUID.randomUUID(), 100, 10, 34, 900,
500, 300, 200, 4).metaSnapshot(null),
+ new PartitionMeta(UUID.randomUUID(), 100, 10, 34, 900,
500, 300, 200, 400, 4).metaSnapshot(null),
Review Comment:
yes, please. It is even used with a setter as well =(
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionIdFromLink;
+import static
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionless;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import
org.apache.ignite.internal.storage.pagememory.mv.FindRowVersion.RowVersionFilter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Search for a row version in version chains.
+ */
+class FindRowVersion implements PageMemoryTraversal<RowVersionFilter> {
+ private final int partitionId;
+
+ private final boolean loadValueBytes;
+
+ private boolean rowVersionFounded;
+
+ private final ReadRowVersionValue readRowVersionValue = new
ReadRowVersionValue();
+
+ private long rowLink = NULL_LINK;
+
+ private @Nullable HybridTimestamp rowTimestamp;
+
+ private long rowNextLink = NULL_LINK;
+
+ private int rowValueSize;
+
+ private @Nullable RowVersion result;
+
+ FindRowVersion(int partitionId, boolean loadValueBytes) {
+ this.partitionId = partitionId;
+ this.loadValueBytes = loadValueBytes;
+ }
+
+ @Override
+ public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, RowVersionFilter arg) {
+ if (!rowVersionFounded) {
+ long nextLink = readPartitionless(partitionId, pageAddr,
payload.offset() + RowVersion.NEXT_LINK_OFFSET);
+
+ if (arg.apply(link, pageAddr + payload.offset())) {
+ rowVersionFounded = true;
+
+ rowLink = link;
+ rowTimestamp = HybridTimestamps.readTimestamp(pageAddr,
payload.offset() + RowVersion.TIMESTAMP_OFFSET);
+ rowNextLink = nextLink;
+
+ if (!loadValueBytes) {
+ rowValueSize = PageUtils.getInt(pageAddr, payload.offset()
+ RowVersion.VALUE_SIZE_OFFSET);
+
+ return STOP_TRAVERSAL;
+ } else {
+ return readRowVersionValue.consumePagePayload(link,
pageAddr, payload, null);
+ }
+ } else {
+ return nextLink;
+ }
+ } else {
+ return readRowVersionValue.consumePagePayload(link, pageAddr,
payload, null);
+ }
+ }
+
+ @Override
+ public void finish() {
+ if (!rowVersionFounded) {
+ return;
+ }
+
+ if (loadValueBytes) {
+ readRowVersionValue.finish();
+
+ byte[] valueBytes = readRowVersionValue.result();
+
+ ByteBuffer value =
ByteBuffer.wrap(valueBytes).order(ByteBufferRow.ORDER);
+
+ result = new RowVersion(partitionId, rowLink, rowTimestamp,
rowNextLink, value);
+ } else {
+ result = new RowVersion(partitionId, rowLink, rowTimestamp,
rowNextLink, rowValueSize);
+ }
+ }
+
+ /**
+ * Returns the found version in the version chain, {@code null} if not
found.
+ */
+ @Nullable RowVersion getResult() {
+ return result;
+ }
+
+ /**
+ * Row version filter in the version chain.
+ */
+ interface RowVersionFilter {
Review Comment:
Should it be a `@FunctionalInterface`?
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java:
##########
@@ -45,9 +47,14 @@ class CommitWriteInvokeClosure implements
InvokeClosure<VersionChain> {
private @Nullable VersionChain newRow;
- private @Nullable Long updateTimestampLink;
+ private long updateTimestampLink = NULL_LINK;
+
+ private @Nullable RowVersion toRemove;
+
+ private boolean addToGc;
Review Comment:
Same here about documentation
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -374,6 +379,53 @@ private IndexMetaTree createIndexMetaTree(
}
}
+ /**
+ * Returns new {@link GarbageCollectionTree} instance for partition.
+ *
+ * @param tableView Table configuration.
+ * @param partitionId Partition ID.
+ * @param reuseList Reuse list.
+ * @param pageMemory Persistent page memory instance.
+ * @param meta Partition metadata.
+ * @throws StorageException If failed.
+ */
+ private GarbageCollectionTree createGarbageCollectionTree(
+ TableView tableView,
+ int partitionId,
+ ReuseList reuseList,
+ PersistentPageMemory pageMemory,
+ PartitionMeta meta
+ ) {
+ try {
+ boolean initNew = false;
+
+ if (meta.garbageCollectionTreeMetaPageId() == 0) {
+ long rootPageId = pageMemory.allocatePage(tableView.tableId(),
partitionId, FLAG_AUX);
+
+ meta.garbageCollectionTreeMetaPageId(lastCheckpointId(),
rootPageId);
+
+ initNew = true;
+ }
+
+ return new GarbageCollectionTree(
+ tableView.tableId(),
+ tableView.name(),
+ partitionId,
+ dataRegion.pageMemory(),
+ PageLockListenerNoOp.INSTANCE,
+ new AtomicLong(),
+ meta.garbageCollectionTreeMetaPageId(),
+ reuseList,
+ initNew
+ );
+ } catch (IgniteInternalCheckedException e) {
Review Comment:
What's the point of throwing a checked exception here? Do we do anything
with it apart from wrapping in a `StorageException`? I know it's not related to
this PR, but maybe we should introduce a tech-debt ticket and remove this
exception?
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionIdFromLink;
+import static
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionless;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import
org.apache.ignite.internal.storage.pagememory.mv.FindRowVersion.RowVersionFilter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Search for a row version in version chains.
+ */
+class FindRowVersion implements PageMemoryTraversal<RowVersionFilter> {
+ private final int partitionId;
+
+ private final boolean loadValueBytes;
+
+ private boolean rowVersionFounded;
+
+ private final ReadRowVersionValue readRowVersionValue = new
ReadRowVersionValue();
+
+ private long rowLink = NULL_LINK;
+
+ private @Nullable HybridTimestamp rowTimestamp;
+
+ private long rowNextLink = NULL_LINK;
+
+ private int rowValueSize;
+
+ private @Nullable RowVersion result;
+
+ FindRowVersion(int partitionId, boolean loadValueBytes) {
+ this.partitionId = partitionId;
+ this.loadValueBytes = loadValueBytes;
+ }
+
+ @Override
+ public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, RowVersionFilter arg) {
+ if (!rowVersionFounded) {
+ long nextLink = readPartitionless(partitionId, pageAddr,
payload.offset() + RowVersion.NEXT_LINK_OFFSET);
+
+ if (arg.apply(link, pageAddr + payload.offset())) {
+ rowVersionFounded = true;
+
+ rowLink = link;
+ rowTimestamp = HybridTimestamps.readTimestamp(pageAddr,
payload.offset() + RowVersion.TIMESTAMP_OFFSET);
+ rowNextLink = nextLink;
+
+ if (!loadValueBytes) {
+ rowValueSize = PageUtils.getInt(pageAddr, payload.offset()
+ RowVersion.VALUE_SIZE_OFFSET);
+
+ return STOP_TRAVERSAL;
+ } else {
+ return readRowVersionValue.consumePagePayload(link,
pageAddr, payload, null);
+ }
+ } else {
+ return nextLink;
Review Comment:
Same here about simplification, all these `if-else` are harder to read, I
think
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionIdFromLink;
+import static
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionless;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import
org.apache.ignite.internal.storage.pagememory.mv.FindRowVersion.RowVersionFilter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Search for a row version in version chains.
+ */
+class FindRowVersion implements PageMemoryTraversal<RowVersionFilter> {
+ private final int partitionId;
+
+ private final boolean loadValueBytes;
+
+ private boolean rowVersionFounded;
+
+ private final ReadRowVersionValue readRowVersionValue = new
ReadRowVersionValue();
+
+ private long rowLink = NULL_LINK;
+
+ private @Nullable HybridTimestamp rowTimestamp;
+
+ private long rowNextLink = NULL_LINK;
+
+ private int rowValueSize;
+
+ private @Nullable RowVersion result;
+
+ FindRowVersion(int partitionId, boolean loadValueBytes) {
+ this.partitionId = partitionId;
+ this.loadValueBytes = loadValueBytes;
+ }
+
+ @Override
+ public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, RowVersionFilter arg) {
+ if (!rowVersionFounded) {
Review Comment:
This code can be simplified a little bit:
```
if (rowVersionFounded) {
return readRowVersionValue.consumePagePayload(link, pageAddr, payload,
null);
}
// other stuff
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainKey.java:
##########
@@ -18,13 +18,15 @@
package org.apache.ignite.internal.storage.pagememory.mv;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
/**
* Search key for the {@link VersionChainTree}.
*/
public class VersionChainKey {
/** Row id. */
- private final RowId rowId;
+ @IgniteToStringInclude
+ protected final RowId rowId;
Review Comment:
Why does this field have to be `protected` if there's a public getter?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]