ibessonov commented on code in PR #1102:
URL: https://github.com/apache/ignite-3/pull/1102#discussion_r974975012
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -120,19 +120,27 @@ private static HybridTimestamp convertTimestamp(Timestamp
timestamp) {
*/
@Nullable
@Deprecated
- default BinaryRow read(RowId rowId, Timestamp timestamp) throws
StorageException {
+ default ReadResult read(RowId rowId, Timestamp timestamp) throws
StorageException {
return read(rowId, convertTimestamp(timestamp));
}
/**
* Reads the value from the storage as it was at the given timestamp.
+ * If there is a row with specified row id and timestamp - return it.
+ * If there are multiple versions of row with specified row id:
+ * <ol>
+ * <li>If there is only write-intent - return write-intent.</li>
+ * <li>If there is write-intent and previous commit is older than
timestamp - return write-intent.</li>
+ * <li>If there is a commit older than timestamp, but no write-intent
- return said commit.</li>
+ * <li>If there are two commits one older and one newer than timestamp
- return older commit.</li>
+ * <li>There are commits but they're all newer than timestamp - return
nothing.</li>
+ * </ol>
*
* @param rowId Row id.
* @param timestamp Timestamp.
- * @return Binary row that corresponds to the key or {@code null} if value
is not found.
+ * @return Read result that corresponds to the key or {@link
ReadResult#empty} if value is not found.
Review Comment:
`ReadResult#empty` may be undocumented. Result is immutable anyway.
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.UUID;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link MvPartitionStorage#read} result.
+ */
+public class ReadResult {
+ /** Unset commit partition id value. */
+ public static final int UNDEFINED_COMMIT_PARTITION_ID = -1;
+
+ /** Empty read result. */
+ private static final ReadResult EMPTY = new ReadResult(null, null, null,
null, UNDEFINED_COMMIT_PARTITION_ID);
+
+ /** Data. */
+ private final BinaryRow binaryRow;
+
+ /** Transaction id. Not {@code null} iff this is a write-intent. */
+ private final @Nullable UUID transactionId;
+
+ /** Commit table id. Not {@code null} iff this is a write-intent. */
+ private final @Nullable UUID commitTableId;
+
+ /** Commit table id. If this is not a write-intent it is equal to {@link
#UNDEFINED_COMMIT_PARTITION_ID}. */
+ private final int commitPartitionId;
+
+ /**
+ * Timestamp of the newest commit of the data. Not {@code null} iff
committed version exists, this is a
+ * write-intent and read was made with a timestamp.
+ */
+ private final @Nullable HybridTimestamp newestCommitTs;
+
+ private ReadResult(BinaryRow binaryRow, @Nullable UUID transactionId,
@Nullable UUID commitTableId,
+ @Nullable HybridTimestamp newestCommitTs, int commitPartitionId) {
+ this.binaryRow = binaryRow;
+
+ // If transaction is not null, then commitTableId and
commitPartitionId should be defined.
+ assert (binaryRow != null || transactionId == null) || (commitTableId
!= null && commitPartitionId != -1);
Review Comment:
This condition is very complicated, I think. Version without binaryRow was
much simpler and actually matched the comment.
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.UUID;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link MvPartitionStorage#read} result.
+ */
+public class ReadResult {
+ /** Unset commit partition id value. */
+ public static final int UNDEFINED_COMMIT_PARTITION_ID = -1;
+
+ /** Empty read result. */
+ private static final ReadResult EMPTY = new ReadResult(null, null, null,
null, UNDEFINED_COMMIT_PARTITION_ID);
+
+ /** Data. */
+ private final BinaryRow binaryRow;
Review Comment:
This one is nullable
##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -121,13 +121,13 @@ void testPartitionIndependence() throws Exception {
RowId rowId0 = partitionStorage0.runConsistently(() ->
partitionStorage0.insert(testData0, txId));
assertThat(unwrap(partitionStorage0.read(rowId0, txId)),
is(equalTo(unwrap(testData0))));
- assertThat(partitionStorage1.read(rowId0, txId), is(nullValue()));
+ assertThrows(AssertionError.class, () ->
partitionStorage1.read(rowId0, txId));
Review Comment:
Are you sure? First of all, test will fail with no `-ea`. Second, you
**never** expect `Error` to happen in normal circumstances.
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -142,14 +150,14 @@ default BinaryRow read(RowId rowId, Timestamp timestamp)
throws StorageException
* @return Row id.
* @throws StorageException If failed to write data into the storage.
*
- * @deprecated Generates different ids for each replica. {@link
#addWrite(RowId, BinaryRow, UUID)} with explicit replicated id must be
- * used instead.
+ * @deprecated Generates different ids for each replica. {@link
#addWrite(RowId, BinaryRow, UUID, UUID, int)} with explicit replicated
+ * id must be used instead.
*/
@Deprecated
RowId insert(BinaryRow binaryRow, UUID txId) throws StorageException;
/**
- * Creates (or replaces) an uncommitted (aka pending) version, assigned to
the given transaction id.
+ Creates (or replaces) an uncommitted (aka pending) version, assigned to
the given transaction id.
Review Comment:
```suggestion
* Creates (or replaces) an uncommitted (aka pending) version, assigned
to the given transaction id.
```
##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java:
##########
@@ -746,4 +809,168 @@ void testAppliedIndex() {
assertEquals(1, storage.persistedIndex());
}
+
+ @Test
+ void testReadWithinBeforeAndAfterTwoCommits() {
+ HybridTimestamp before = clock.now();
+
+ RowId rowId = new RowId(PARTITION_ID);
+
+ HybridTimestamp first = clock.now();
+
+ storage.runConsistently(() -> {
+ addWrite(rowId, binaryRow, newTransactionId());
+
+ commitWrite(rowId, first);
+ return null;
+ });
+
+ HybridTimestamp betweenCommits = clock.now();
+
+ HybridTimestamp second = clock.now();
+
+ storage.runConsistently(() -> {
+ addWrite(rowId, binaryRow2, newTransactionId());
+
+ commitWrite(rowId, second);
+ return null;
+ });
+
+ storage.runConsistently(() -> {
+ addWrite(rowId, binaryRow3, newTransactionId());
+
+ return null;
+ });
+
+ HybridTimestamp after = clock.now();
+
+ // Read before commits.
+ ReadResult res = storage.read(rowId, before);
+ assertNull(res.binaryRow());
+
+ // Read at exact time of first commit.
+ res = storage.read(rowId, first);
+
+ assertNotNull(res);
+ assertNull(res.newestCommitTimestamp());
+ assertRowMatches(res.binaryRow(), binaryRow);
+
+ // Read between two commits.
+ res = storage.read(rowId, betweenCommits);
+
+ assertNotNull(res);
+ assertNull(res.newestCommitTimestamp());
+ assertRowMatches(res.binaryRow(), binaryRow);
+
+ // Read at exact time of second commit.
+ res = storage.read(rowId, second);
+
+ assertNotNull(res);
+ assertNull(res.newestCommitTimestamp());
+ assertRowMatches(res.binaryRow(), binaryRow2);
+
+ // Read after second commit (write intent).
+ res = storage.read(rowId, after);
+
+ assertNotNull(res);
+ assertNotNull(res.newestCommitTimestamp());
+ assertEquals(second, res.newestCommitTimestamp());
+ assertRowMatches(res.binaryRow(), binaryRow3);
+ }
+
+ @Test
+ void testWrongPartition() {
+ RowId rowId = commitAbortAndAddUncommitted();
+
+ var row = new RowId(rowId.partitionId() + 1,
rowId.mostSignificantBits(), rowId.leastSignificantBits());
+
+ assertThrows(AssertionError.class, () -> read(row, clock.now()));
+ assertThrows(AssertionError.class, () -> read(row, UUID.randomUUID()));
+ }
+
+ @Test
+ void testReadingNothingWithLowerRowIdIfHigherRowIdWritesExist() {
+ RowId rowId = commitAbortAndAddUncommitted();
+
+ RowId lowerRowId = getPreviousRowId(rowId);
+
+ assertNull(read(lowerRowId, clock.now()));
+ }
+
+ @Test
+ void testReadingNothingByTxIdWithLowerRowId() {
+ RowId higherRowId = new RowId(PARTITION_ID);
+ RowId lowerRowId = getPreviousRowId(higherRowId);
+
+ UUID txId = UUID.randomUUID();
+
+ storage.runConsistently(() -> {
+ addWrite(higherRowId, binaryRow, txId);
+
+ return null;
+ });
+
+ assertNull(read(lowerRowId, txId));
+ }
+
+ @Test
+ void
testReadingCorrectWriteIntentByTimestampIfLowerRowIdWriteIntentExists() {
+ RowId higherRowId = new RowId(PARTITION_ID);
+ RowId lowerRowId = getPreviousRowId(higherRowId);
+
+ storage.runConsistently(() -> {
+ addWrite(lowerRowId, binaryRow2, newTransactionId());
+ addWrite(higherRowId, binaryRow, newTransactionId());
+
+ commitWrite(higherRowId, clock.now());
+
+ return null;
+ });
+
+ assertRowMatches(read(higherRowId, clock.now()), binaryRow);
+ }
+
+ @Test
+ void
testReadingCorrectWriteIntentByTimestampIfHigherRowIdWriteIntentExists() {
+ RowId higherRowId = new RowId(PARTITION_ID);
+ RowId lowerRowId = getPreviousRowId(higherRowId);
+
+ storage.runConsistently(() -> {
+ addWrite(lowerRowId, binaryRow, newTransactionId());
+ addWrite(higherRowId, binaryRow2, newTransactionId());
+
+ return null;
+ });
+
+ assertRowMatches(read(lowerRowId, clock.now()), binaryRow);
+ }
+
+ /**
+ * Returns row id that is lexicographically smaller (by the value of one)
than the argument.
+ *
+ * @param value Row id.
+ * @return Row id value minus 1.
+ */
+ private RowId getPreviousRowId(RowId value) {
+ Pair<Long, Long> previous128Uint =
getPrevious128Uint(value.mostSignificantBits(), value.leastSignificantBits());
+
+ return new RowId(value.partitionId(), previous128Uint.getFirst(),
previous128Uint.getSecond());
+ }
+
+ /**
+ * Performs a decrement operation on a 128-bit unsigned value that is
represented by two longs.
+ *
+ * @param msb Most significant bytes of 128-bit unsigned integer.
+ * @param lsb Least significant bytes of 128-bit unsigned integer.
+ * @return Less by one value.
+ */
+ private Pair<Long, Long> getPrevious128Uint(long msb, long lsb) {
+ assert (msb | lsb) != 0L : "Cheer up! That was very unlikely";
Review Comment:
What if we do the same check in RowId's constructor? I'm pretty sure no one
expects zeroes there
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java:
##########
@@ -57,19 +72,60 @@ public long consumePagePayload(long link, long pageAddr,
DataPagePayload payload
if (lookingForVersion) {
HybridTimestamp rowVersionTs =
HybridTimestamps.readTimestamp(pageAddr, payload.offset() +
RowVersion.TIMESTAMP_OFFSET);
- if (rowTimestampMatches(rowVersionTs, timestamp)) {
- return readFullyOrStartReadingFragmented(link, pageAddr,
payload);
- } else {
- return advanceToNextVersion(pageAddr, payload);
+ boolean isWriteIntent = rowVersionTs == null;
+
+ if (!hasWriteIntent && isWriteIntent) {
Review Comment:
```suggestion
if (isWriteIntent) {
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -272,7 +308,7 @@ public RowId insert(BinaryRow row, UUID txId) throws
StorageException {
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
try {
- writeUnversioned(keyBuf.array(), row, txId);
+ writeUnversioned(keyBuf.array(), row, txId, UUID.randomUUID(),
partitionId);
Review Comment:
Deprecate this `insert` as well
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java:
##########
@@ -57,19 +72,60 @@ public long consumePagePayload(long link, long pageAddr,
DataPagePayload payload
if (lookingForVersion) {
HybridTimestamp rowVersionTs =
HybridTimestamps.readTimestamp(pageAddr, payload.offset() +
RowVersion.TIMESTAMP_OFFSET);
- if (rowTimestampMatches(rowVersionTs, timestamp)) {
- return readFullyOrStartReadingFragmented(link, pageAddr,
payload);
- } else {
- return advanceToNextVersion(pageAddr, payload);
+ boolean isWriteIntent = rowVersionTs == null;
+
+ if (!hasWriteIntent && isWriteIntent) {
+ hasWriteIntent = true;
+ writeIntentLink = link;
}
+
+ if (rowVersionTs != null) {
Review Comment:
```suggestion
if (!isWriteIntent) {
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionChainByTimestamp.java:
##########
@@ -57,19 +72,60 @@ public long consumePagePayload(long link, long pageAddr,
DataPagePayload payload
if (lookingForVersion) {
HybridTimestamp rowVersionTs =
HybridTimestamps.readTimestamp(pageAddr, payload.offset() +
RowVersion.TIMESTAMP_OFFSET);
- if (rowTimestampMatches(rowVersionTs, timestamp)) {
- return readFullyOrStartReadingFragmented(link, pageAddr,
payload);
- } else {
- return advanceToNextVersion(pageAddr, payload);
+ boolean isWriteIntent = rowVersionTs == null;
+
+ if (!hasWriteIntent && isWriteIntent) {
+ hasWriteIntent = true;
+ writeIntentLink = link;
}
+
+ if (rowVersionTs != null) {
+ boolean isFirstCommittedRow = false;
+
+ if (!hasCommittedRow) {
Review Comment:
I have to be honest, I don't like how complicated it is and how many
booleans and fields in general we need to achieve the result. Are you ready for
drastic measures?
The idea to return a link to the next element from the same traversal
instance is cool in theory, but in practice - I'd rather see a good old loop
with _maybe_ different traversal instances. Can you do it for me? The amount of
code will increase, but the complexity will decrease.
Ideally, it should look almost exactly like a test implementation.
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainIo.java:
##########
@@ -36,6 +38,11 @@
* Interface for VersionChain B+Tree-related IO. Defines a following data
layout:
* <pre><code>[rowId's UUID (16 bytes), txId (16 bytes), head link (6 bytes),
next link (6 bytes)]</code></pre>
*/
+// RowId, link, (next lint?), Nullable txId - прямо в страницах дерева
Review Comment:
You forgot to remove my comment
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -335,17 +373,21 @@ public RowId insert(BinaryRow row, UUID txId) throws
StorageException {
* @param txId Transaction id.
* @throws RocksDBException If write failed.
*/
- private void writeUnversioned(byte[] keyArray, BinaryRow row, UUID txId)
throws RocksDBException {
+ private void writeUnversioned(byte[] keyArray, BinaryRow row, UUID txId,
UUID commitTableId, int commitPartitionId)
+ throws RocksDBException {
WriteBatchWithIndex writeBatch = requireWriteBatch();
//TODO IGNITE-16913 Add proper way to write row bytes into array
without allocations.
byte[] rowBytes = row.bytes();
- ByteBuffer value = ByteBuffer.allocate(rowBytes.length +
TX_ID_SIZE).order(LITTLE_ENDIAN);
+ ByteBuffer value = ByteBuffer.allocate(rowBytes.length +
VALUE_HEADER_SIZE);
+ byte[] array = value.array();
- putTransactionId(value.array(), 0, txId);
+ putUuid(array, TX_ID_OFFSET, txId);
+ putUuid(array, TABLE_ID_OFFSET, commitTableId);
+ putShort(array, PARTITION_ID_OFFSET, (short) commitPartitionId);
- value.position(TX_ID_SIZE).put(rowBytes);
+ value.position(VALUE_OFFSET).put(rowBytes);
// Write binary row data as a value.
writeBatch.put(cf, copyOf(keyArray, ROW_PREFIX_SIZE),
copyOf(value.array(), value.capacity()));
Review Comment:
Actually, I think that second `copyOf` is not needed, can you check?
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -859,19 +1020,60 @@ private static boolean invalid(RocksIterator it) {
* Converts raw byte array representation of the value into a binary row.
*
* @param valueBytes Value bytes as read from the storage.
- * @param valueHasTxId Whether the value has a transaction id prefix in it.
+ * @param valueHasTxData Whether the value has a transaction id prefix in
it.
* @return Binary row instance or {@code null} if value is a tombstone.
*/
- private static @Nullable BinaryRow wrapValueIntoBinaryRow(byte[]
valueBytes, boolean valueHasTxId) {
- if (isTombstone(valueBytes, valueHasTxId)) {
+ private static @Nullable BinaryRow wrapValueIntoBinaryRow(byte[]
valueBytes, boolean valueHasTxData) {
+ if (isTombstone(valueBytes, valueHasTxData)) {
return null;
}
- return valueHasTxId
- ? new
ByteBufferRow(ByteBuffer.wrap(valueBytes).position(TX_ID_SIZE).slice().order(LITTLE_ENDIAN))
+ return valueHasTxData
+ ? new
ByteBufferRow(ByteBuffer.wrap(valueBytes).position(VALUE_OFFSET).slice().order(LITTLE_ENDIAN))
: new ByteBufferRow(valueBytes);
}
+ /**
+ * Converts raw byte array representation of the write-intent value into a
read result adding newest commit timestamp if
+ * it is not {@code null}.
+ *
+ * @param valueBytes Value bytes as read from the storage.
+ * @param newestCommitTs Commit timestamp of the most recent committed
write of this value.
+ * @return Read result instance or {@code null} if value is a tombstone.
+ */
+ private static ReadResult wrapUncommittedValue(byte[] valueBytes,
@Nullable HybridTimestamp newestCommitTs) {
+ if (isTombstone(valueBytes, true)) {
+ return ReadResult.empty();
+ }
+
+ long txIdMsb = GridUnsafe.getLong(valueBytes, GridUnsafe.BYTE_ARR_OFF
+ TX_ID_OFFSET);
Review Comment:
You could extract `TxStateRocksDbStorage#bytesToUuid` into utils and
introduce offset to it.
i believe there's also similar code in hash index storage for RocksDB
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]