ibessonov commented on code in PR #814: URL: https://github.com/apache/ignite-3/pull/814#discussion_r881461336
########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/ReadPageMemoryRowValue.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.datapage; + +import java.util.Objects; +import org.apache.ignite.internal.pagememory.Storable; +import org.apache.ignite.internal.pagememory.io.DataPagePayload; +import org.apache.ignite.internal.pagememory.util.PageUtils; +import org.jetbrains.annotations.Nullable; + +/** + * Reads full data row value (as byte array) from page memory. Supports fragmented values occupying more than one slot. + * + * <p>This works for the cases when the following conditions are satisfied: + * 1. Row data starts with a fixed-length header, which is followed by the 'value'; the 'value' ends the row data + * 2. Row data header contains 'value' size as an int at a fixed offset known beforehand + * 3. The beginning of the header, including the 'value' size, is always stored in the first slot (i.e. + * {@link Storable#headerSize()} is enough to include 'value' size + */ +public abstract class ReadPageMemoryRowValue implements PageMemoryTraversal<Void> { + /** + * First it's {@code true} (this means that we traverse first slots of versions of the Version Chain using NextLink); + * then it's {@code false} (when we found the version we need and we read its value). + */ + private boolean readingFirstSlot = true; + + private int valueSize; + /** + * Used to collect all the bytes of the target version value. + */ + private byte @Nullable [] allValueBytes; + /** + * Number of bytes written to {@link #allValueBytes}. + */ + private int transferredBytes = 0; + + { + reset(); + } + + @Override + public long consumePagePayload(long link, long pageAddr, DataPagePayload payload, Void ignoredArg) { + if (readingFirstSlot) { + readingFirstSlot = false; + return readFullyOrStartReadingFragmented(pageAddr, payload); + } else { + // we are continuing reading a fragmented row + return readNextFragment(pageAddr, payload); + } + } + + private long readFullyOrStartReadingFragmented(long pageAddr, DataPagePayload payload) { + valueSize = readValueSize(pageAddr, payload); + + if (!payload.hasMoreFragments()) { + return readFully(pageAddr, payload); + } else { + allValueBytes = new byte[valueSize]; + transferredBytes = 0; + + readValueFragmentToArray(pageAddr, payload); Review Comment: I believe that this method doesn't skip a header with timestamp and size. Are there tests for this case? ########## modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java: ########## @@ -291,8 +292,12 @@ void readOfUncommittedRowWithCorrespondingTransactionIdReturnsTheRow() { BinaryRow foundRow = storage.read(rowId, txId); - assertThat(foundRow, is(notNullValue())); - assertThat(foundRow.bytes(), is(equalTo(binaryRow.bytes()))); + assertRowMatches(foundRow, binaryRow); + } + + private void assertRowMatches(@Nullable BinaryRow rowUnderQuestion, BinaryRow expectedRow) { Review Comment: Usually expected value goes first, why did you swap them? ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.pagememory.mv; + +import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal; +import org.apache.ignite.internal.pagememory.io.DataPagePayload; +import org.apache.ignite.internal.pagememory.util.PageIdUtils; +import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.internal.tx.Timestamp; +import org.jetbrains.annotations.Nullable; + +/** + * Traversal that scans Version Chain until first version visible at the given timestamp is found; then the version + * is converted to {@link ByteBufferRow} and finally made available via {@link #result()}. + * + * <p>NB: this traversal first traverses starting data slots of the Version Chain one after another; when it finds the + * version it needs, it switches to traversing the slots comprising the version (because it might be fragmented). + */ +class ScanVersionChainByTimestamp implements PageMemoryTraversal<Timestamp> { + /** + * Contains the result when the traversal ends. + */ + @Nullable + private ByteBufferRow result; + + /** + * First it's {@code true} (this means that we traverse first slots of versions of the Version Chain using NextLink); + * then it's {@code false} (when we found the version we need and we read its value). + */ + private boolean lookingForVersion = true; + + private final ReadRowVersionValue readRowVersionValue = new ReadRowVersionValue(); + + { + reset(); + } + + @Override + public long consumePagePayload(long link, long pageAddr, DataPagePayload payload, Timestamp timestamp) { + if (lookingForVersion) { + Timestamp rowVersionTs = Timestamps.readTimestamp(pageAddr, payload.offset() + RowVersion.TIMESTAMP_OFFSET); + + if (rowTimestampMatches(rowVersionTs, timestamp)) { + return readFullyOrStartReadingFragmented(link, pageAddr, payload); + } else { + return advanceToNextVersion(pageAddr, payload, partitionIdFromLink(link)); + } + } else { + // we are continuing reading a fragmented row Review Comment: I can't link you to the source, but according to our codestyle, comments start with capital letter and end with the dot. ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.pagememory.mv; + +import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal; +import org.apache.ignite.internal.pagememory.io.DataPagePayload; +import org.apache.ignite.internal.pagememory.util.PageIdUtils; +import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.internal.tx.Timestamp; +import org.jetbrains.annotations.Nullable; + +/** + * Traversal that scans Version Chain until first version visible at the given timestamp is found; then the version + * is converted to {@link ByteBufferRow} and finally made available via {@link #result()}. + * + * <p>NB: this traversal first traverses starting data slots of the Version Chain one after another; when it finds the + * version it needs, it switches to traversing the slots comprising the version (because it might be fragmented). + */ +class ScanVersionChainByTimestamp implements PageMemoryTraversal<Timestamp> { + /** + * Contains the result when the traversal ends. + */ + @Nullable + private ByteBufferRow result; + + /** + * First it's {@code true} (this means that we traverse first slots of versions of the Version Chain using NextLink); + * then it's {@code false} (when we found the version we need and we read its value). + */ + private boolean lookingForVersion = true; + + private final ReadRowVersionValue readRowVersionValue = new ReadRowVersionValue(); + + { Review Comment: Why can't we have regular constructors? This construction is barely ever used in Ignite code, and you invoke virtual method in it. Why? ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/DataPageReader.java: ########## @@ -54,129 +49,49 @@ public DataPageReader(PageMemory pageMemory, int groupId, IoStatisticsHolder sta } /** - * Returns a row by link. To get the row bytes, more than one pages may be traversed (if the corresponding row - * was fragmented when stored). + * Traverses page memory starting at the given link. At each step, reads the current data row and feeds it to the given + * {@link PageMemoryTraversal} object which updates itself (usually) and returns next link to continue traversal + * (or {@link PageMemoryTraversal#STOP_TRAVERSAL} to stop. * * @param link Row link - * @return row object assembled from the row bytes + * @param traversal object consuming payloads and controlling the traversal Review Comment: Params descriptions usually start with a capital letters. Please try to adapt your code style to what you see in the project, not what technically counts as appropriate according to TC. We should all try to write code in similar manner, it's very important ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java: ########## @@ -72,8 +83,17 @@ public PageMemoryMvPartitionStorage(int partitionId, TableView tableConfig, Page groupId = StorageUtils.groupId(tableConfig); - versionChainFreeList = ((VolatilePageMemoryDataRegion) dataRegion).versionChainFreeList(); - rowVersionFreeList = ((VolatilePageMemoryDataRegion) dataRegion).rowVersionFreeList(); + try { + versionChainFreeList = createVersionChainFreeList(dataRegion.pageMemory(), groupId, partitionId); Review Comment: Why did you decide to create these lists per partition? It would be better to turn them into parameters. Maybe there was a slight misunderstanding between us - volatile region should have shared freelists to better utilize memory, it's an optimization that we should do. Persistent regions must have separate freelists for each partition. Right now we have separate lists in volatile region, which is not what I expect. ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadLatestRowVersion.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.pagememory.mv; + +import java.nio.ByteBuffer; +import java.util.function.Predicate; +import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal; +import org.apache.ignite.internal.pagememory.io.DataPagePayload; +import org.apache.ignite.internal.pagememory.util.PageIdUtils; +import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.internal.tx.Timestamp; +import org.jetbrains.annotations.Nullable; + +/** + * Traversal for reading the latest row version. If the version is uncommitted, returns its value; otherwise, does NOT return it. + */ +class ReadLatestRowVersion implements PageMemoryTraversal<Predicate<Timestamp>> { + private RowVersion result; + + private boolean readingFirstSlot = true; + + private long firstFragmentLink; + @Nullable + private Timestamp timestamp; + private long nextLink; + + private final ReadRowVersionValue readRowVersionValue = new ReadRowVersionValue(); + + { + reset(); + } + + @Override + public long consumePagePayload(long link, long pageAddr, DataPagePayload payload, Predicate<Timestamp> loadValue) { + if (readingFirstSlot) { + readingFirstSlot = false; + return readFullOrInitiateReadFragmented(link, pageAddr, payload, loadValue); + } else { + return readRowVersionValue.consumePagePayload(link, pageAddr, payload, null); + } + } + + private long readFullOrInitiateReadFragmented(long link, long pageAddr, DataPagePayload payload, Predicate<Timestamp> loadValue) { + firstFragmentLink = link; + + timestamp = Timestamps.readTimestamp(pageAddr, payload.offset() + RowVersion.TIMESTAMP_OFFSET); + nextLink = PartitionlessLinks.readFromMemory(pageAddr, payload.offset() + RowVersion.NEXT_LINK_OFFSET); + + if (!loadValue.test(timestamp)) { + result = new RowVersion(partitionIdFromLink(link), firstFragmentLink, timestamp, nextLink, null); + return STOP_TRAVERSAL; + } + + return readRowVersionValue.consumePagePayload(link, pageAddr, payload, null); Review Comment: Same problem might be present here, I don't see where you skip the header ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java: ########## @@ -38,10 +41,24 @@ public class RowVersion extends StorableBase { */ public static final long NULL_LINK = 0; + private static final int TIMESTAMP_STORE_SIZE_BYTES = 2 * Long.BYTES; + private static final int NEXT_LINK_STORE_SIZE_BYTES = PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES; + private static final int VALUE_SIZE_STORE_SIZE_BYTES = Integer.BYTES; + + public static final int TIMESTAMP_OFFSET = 0; + public static final int NEXT_LINK_OFFSET = TIMESTAMP_STORE_SIZE_BYTES; + public static final int VALUE_SIZE_OFFSET = NEXT_LINK_OFFSET + NEXT_LINK_STORE_SIZE_BYTES; + public static final int VALUE_OFFSET = VALUE_SIZE_OFFSET + VALUE_SIZE_STORE_SIZE_BYTES; + + private final int partitionId; + private long link; + @Nullable private final Timestamp timestamp; private final long nextLink; private final int valueSize; + @IgniteToStringExclude Review Comment: I know that you are against these comments, but please use line separators more often, with these changes code becomes more and more inconsistent. ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java: ########## @@ -38,10 +41,24 @@ public class RowVersion extends StorableBase { */ public static final long NULL_LINK = 0; + private static final int TIMESTAMP_STORE_SIZE_BYTES = 2 * Long.BYTES; + private static final int NEXT_LINK_STORE_SIZE_BYTES = PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES; + private static final int VALUE_SIZE_STORE_SIZE_BYTES = Integer.BYTES; + + public static final int TIMESTAMP_OFFSET = 0; + public static final int NEXT_LINK_OFFSET = TIMESTAMP_STORE_SIZE_BYTES; + public static final int VALUE_SIZE_OFFSET = NEXT_LINK_OFFSET + NEXT_LINK_STORE_SIZE_BYTES; + public static final int VALUE_OFFSET = VALUE_SIZE_OFFSET + VALUE_SIZE_STORE_SIZE_BYTES; + + private final int partitionId; Review Comment: Can you please explain, why we need two fields if partition id can be derived from the link value? ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/Timestamps.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.pagememory.mv; + +import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong; +import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong; + +import org.apache.ignite.internal.tx.Timestamp; +import org.jetbrains.annotations.Nullable; + +/** + * Code to work with {@link Timestamp}s. + */ +public class Timestamps { + /** + * Reads a {@link Timestamp} value from memory. + * + * @param pageAddr address where page data starts + * @param offset offset to the timestamp value relative to pageAddr + * @return the timestamp + */ + @Nullable + static Timestamp readTimestamp(long pageAddr, int offset) { + long nodeId = getLong(pageAddr, offset); + long localTimestamp = getLong(pageAddr, offset + Long.BYTES); + + Timestamp timestamp = new Timestamp(localTimestamp, nodeId); + if (timestamp.equals(RowVersion.NULL_TIMESTAMP)) { + timestamp = null; + } + + return timestamp; + } + + /** + * Writes a {@link Timestamp} to memory starting at the given address + offset. + * + * @param addr memory address + * @param offset offset added to the address + * @param timestamp the timestamp to write + * @return number of bytes written + */ + public static int writeTimestamp(long addr, int offset, @Nullable Timestamp timestamp) { Review Comment: Ok, some methods are public, some are not. Why? ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/DataPageReader.java: ########## @@ -54,129 +49,49 @@ public DataPageReader(PageMemory pageMemory, int groupId, IoStatisticsHolder sta } /** - * Returns a row by link. To get the row bytes, more than one pages may be traversed (if the corresponding row - * was fragmented when stored). + * Traverses page memory starting at the given link. At each step, reads the current data row and feeds it to the given Review Comment: I would avoid "data row" mentioning in this context. I tried to abstract from pages content. ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/TransactionIds.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.pagememory.mv; + +import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong; + +import java.util.UUID; +import org.jetbrains.annotations.Nullable; + +/** + * Utils to work with Transaction IDs. + */ +public class TransactionIds { + /** + * Writes transaction ID to memory starting at the specified address. + * + * @param addr addreds where to start writing + * @param offset offset to add to the address + * @param txId transaction ID to write + * @return number of bytes written + */ + public static int writeTransactionId(long addr, int offset, @Nullable UUID txId) { + long txIdHigh; + long txIdLow; + if (txId != null) { + txIdHigh = txId.getMostSignificantBits(); + txIdLow = txId.getLeastSignificantBits(); + } else { + txIdHigh = VersionChain.NULL_UUID_COMPONENT; + txIdLow = VersionChain.NULL_UUID_COMPONENT; + } + + putLong(addr, offset, txIdHigh); + putLong(addr, offset + Long.BYTES, txIdLow); + + return 2 * Long.BYTES; + } + + private TransactionIds() { Review Comment: Why is this necessary? ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/DataPageReader.java: ########## @@ -54,129 +49,49 @@ public DataPageReader(PageMemory pageMemory, int groupId, IoStatisticsHolder sta } /** - * Returns a row by link. To get the row bytes, more than one pages may be traversed (if the corresponding row - * was fragmented when stored). + * Traverses page memory starting at the given link. At each step, reads the current data row and feeds it to the given + * {@link PageMemoryTraversal} object which updates itself (usually) and returns next link to continue traversal + * (or {@link PageMemoryTraversal#STOP_TRAVERSAL} to stop. Review Comment: You forgot a closing parenthesis. ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datapage/DataPageReader.java: ########## @@ -54,129 +49,49 @@ public DataPageReader(PageMemory pageMemory, int groupId, IoStatisticsHolder sta } /** - * Returns a row by link. To get the row bytes, more than one pages may be traversed (if the corresponding row - * was fragmented when stored). + * Traverses page memory starting at the given link. At each step, reads the current data row and feeds it to the given + * {@link PageMemoryTraversal} object which updates itself (usually) and returns next link to continue traversal + * (or {@link PageMemoryTraversal#STOP_TRAVERSAL} to stop. * * @param link Row link - * @return row object assembled from the row bytes + * @param traversal object consuming payloads and controlling the traversal + * @param argument argument that is passed to the traversal * @throws IgniteInternalCheckedException If failed * @see org.apache.ignite.internal.pagememory.util.PageIdUtils#link(long, int) + * @see PageMemoryTraversal */ - @Nullable - public T getRowByLink(final long link) throws IgniteInternalCheckedException { + public <T> void traverse(final long link, PageMemoryTraversal<T> traversal, @Nullable T argument) + throws IgniteInternalCheckedException { assert link != 0; int pageSize = pageMemory.realPageSize(groupId); - ByteArrayOutputStream baos = null; - - long nextLink = link; + long currentLink = link; do { - final long pageId = pageId(nextLink); - + final long pageId = pageId(currentLink); final long page = pageMemory.acquirePage(groupId, pageId, statisticsHolder); try { long pageAddr = pageMemory.readLock(groupId, pageId, page); - - assert pageAddr != 0L : nextLink; + assert pageAddr != 0L : currentLink; try { AbstractDataPageIo<?> dataIo = pageMemory.ioRegistry().resolve(pageAddr); - int itemId = itemId(nextLink); - - if (handleNonExistentItemsGracefully() && !dataIo.itemExists(pageAddr, itemId, pageSize)) { - assert nextLink == link : "It is not first page of a fragmented row, but the item does not exist, pageId=" - + pageId + ", itemId=" + itemId; - - return rowForNonExistingItem(pageId, itemId); - } + int itemId = itemId(currentLink); DataPagePayload data = dataIo.readPayload(pageAddr, itemId, pageSize); - if (data.nextLink() == 0 && nextLink == link) { - // Good luck: we can read the row without fragments. - return readRowFromAddress(link, pageAddr + data.offset()); - } - - ByteBuffer dataBuf = wrapPointer(pageAddr, pageSize); - - dataBuf.position(data.offset()); - dataBuf.limit(data.offset() + data.payloadSize()); - - if (baos == null) { - baos = new ByteArrayOutputStream(); - } - - byte[] bytes = new byte[data.payloadSize()]; - dataBuf.get(bytes); - try { - baos.write(bytes); - } catch (IOException e) { - throw new IllegalStateException("Should not happen", e); - } - - nextLink = data.nextLink(); + currentLink = traversal.consumePagePayload(currentLink, pageAddr, data, argument); Review Comment: Ok, it took me a while to understand this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
