sashapolo commented on code in PR #1619:
URL: https://github.com/apache/ignite-3/pull/1619#discussion_r1095819933
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -570,17 +589,26 @@ public void commitWrite(RowId rowId, HybridTimestamp
timestamp) throws StorageEx
byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf,
readOpts, uncommittedKeyBytes);
if (valueBytes == null) {
- //the chain doesn't contain an uncommitted write intent
+ // The chain doesn't contain an uncommitted write intent.
return null;
}
+ boolean isNewValueTombstone = valueBytes.length ==
VALUE_HEADER_SIZE;
+
+ // Both this and previous values for the row id are tombstones.
+ boolean newAndPrevTombstones = maybeAddToGcQueue(writeBatch,
rowId, timestamp, isNewValueTombstone);
Review Comment:
I think `tryAddToGcQueue` is a better name for this method
##########
modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.file.Path;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.storage.AbstractMvPartitionStorageGcTest;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test implementation for {@link RocksDbStorageEngine}.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class RocksDbMvPartitionStorageGcTest extends
AbstractMvPartitionStorageGcTest {
+ @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size =
16777216, writeBufferSize = 16777216}}")
Review Comment:
What's this configuration for?
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1123,218 @@ public void destroyData(WriteBatch writeBatch) throws
RocksDBException {
writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
}
+ @Override
+ public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
+
+ WriteBatchWithIndex batch = requireWriteBatch();
+
+ // We retrieve the first element of the GC queue and seeking for
it in the data CF.
+ // However, the element that we need to garbage collect is the
next (older one) element.
+ // First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
+ // If the next element is exists, that should be the element that
we want to garbage collect.
+ try (
+ var gcUpperBound = new Slice(partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound).setTotalOrderSeek(true);
Review Comment:
Why is `.setTotalOrderSeek(true)` needed? Please leave a comment
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -570,17 +589,26 @@ public void commitWrite(RowId rowId, HybridTimestamp
timestamp) throws StorageEx
byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf,
readOpts, uncommittedKeyBytes);
if (valueBytes == null) {
- //the chain doesn't contain an uncommitted write intent
+ // The chain doesn't contain an uncommitted write intent.
return null;
}
+ boolean isNewValueTombstone = valueBytes.length ==
VALUE_HEADER_SIZE;
+
+ // Both this and previous values for the row id are tombstones.
+ boolean newAndPrevTombstones = maybeAddToGcQueue(writeBatch,
rowId, timestamp, isNewValueTombstone);
+
// Delete pending write.
writeBatch.delete(cf, uncommittedKeyBytes);
- // Add timestamp to the key, and put the value back into the
storage.
- putTimestamp(keyBuf, timestamp);
+ // We only write tombstone if the previous value for the same
row id was not a tombstone.
+ // So there won't be consecutive tombstones for the same row
id.
+ if (!newAndPrevTombstones) {
Review Comment:
Why wasn't this check present in the previous implementation? Was it a bug?
And when can we have two tombstones for the same key anyway?
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1123,218 @@ public void destroyData(WriteBatch writeBatch) throws
RocksDBException {
writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
}
+ @Override
+ public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
+
+ WriteBatchWithIndex batch = requireWriteBatch();
+
+ // We retrieve the first element of the GC queue and seeking for
it in the data CF.
Review Comment:
```suggestion
// We retrieve the first element of the GC queue and seek for it
in the data CF.
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1123,218 @@ public void destroyData(WriteBatch writeBatch) throws
RocksDBException {
writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
}
+ @Override
+ public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
+
+ WriteBatchWithIndex batch = requireWriteBatch();
+
+ // We retrieve the first element of the GC queue and seeking for
it in the data CF.
+ // However, the element that we need to garbage collect is the
next (older one) element.
+ // First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
+ // If the next element is exists, that should be the element that
we want to garbage collect.
Review Comment:
```suggestion
// If the next element exists, that should be the element that
we want to garbage collect.
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1123,218 @@ public void destroyData(WriteBatch writeBatch) throws
RocksDBException {
writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
}
+ @Override
+ public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
+
+ WriteBatchWithIndex batch = requireWriteBatch();
+
+ // We retrieve the first element of the GC queue and seeking for
it in the data CF.
+ // However, the element that we need to garbage collect is the
next (older one) element.
+ // First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
+ // If the next element is exists, that should be the element that
we want to garbage collect.
+ try (
+ var gcUpperBound = new Slice(partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound).setTotalOrderSeek(true);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts);
+ ) {
+ gcIt.seek(partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKeyBuffer = allocateDirect(GC_KEY_SIZE);
+ gcKeyBuffer.clear();
+
+ ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
Review Comment:
Why do we clear this buffer here? What if the lowWatermark is too small and
we exit early?
##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java:
##########
@@ -181,7 +159,7 @@ protected static List<IgniteBiTuple<TestKey, TestValue>>
drainToList(Cursor<Read
}
}
- protected final void assertRowMatches(TableRow rowUnderQuestion, TableRow
expectedRow) {
+ protected final void assertRowMatches(@Nullable TableRow rowUnderQuestion,
TableRow expectedRow) {
Review Comment:
> Second, in the first line of method we assert it not to be null. So it is
nullable
This contradicts the logic of `@Nullable` annotation. Methods with
`@Nullable` parameters should accept `null`s as valid values
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1123,218 @@ public void destroyData(WriteBatch writeBatch) throws
RocksDBException {
writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
}
+ @Override
+ public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
+
+ WriteBatchWithIndex batch = requireWriteBatch();
+
+ // We retrieve the first element of the GC queue and seeking for
it in the data CF.
+ // However, the element that we need to garbage collect is the
next (older one) element.
+ // First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
+ // If the next element is exists, that should be the element that
we want to garbage collect.
+ try (
+ var gcUpperBound = new Slice(partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound).setTotalOrderSeek(true);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts);
+ ) {
+ gcIt.seek(partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKeyBuffer = allocateDirect(GC_KEY_SIZE);
+ gcKeyBuffer.clear();
+
+ ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+ dataKeyBuffer.clear();
+
+ gcIt.key(gcKeyBuffer);
+
+ HybridTimestamp gcElementTimestamp =
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+ if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+ // No elements to garbage collect.
+ return null;
+ }
+
+ RowId gcElementRowId = getRowId(gcKeyBuffer,
GC_KEY_ROW_ID_OFFSET);
+
+ try (
+ var upperBound = new Slice(partitionEndPrefix());
+ ReadOptions opts = new
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
Review Comment:
Looks like these read options may be cached
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -595,21 +623,99 @@ public void addWriteCommitted(RowId rowId, @Nullable
TableRow row, HybridTimesta
@SuppressWarnings("resource") WriteBatchWithIndex writeBatch =
requireWriteBatch();
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
- putTimestamp(keyBuf, commitTimestamp);
+ putTimestampDesc(keyBuf, commitTimestamp);
+
+ boolean isNewValueTombstone = row == null;
//TODO IGNITE-16913 Add proper way to write row bytes into array
without allocations.
- byte[] rowBytes = rowBytes(row);
+ byte[] rowBytes = row != null ? rowBytes(row) :
ArrayUtils.BYTE_EMPTY_ARRAY;
+ boolean newAndPrevTombstones; // Both this and previous values for
the row id are tombstones.
try {
- writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE),
rowBytes);
-
- return null;
+ newAndPrevTombstones = maybeAddToGcQueue(writeBatch, rowId,
commitTimestamp, isNewValueTombstone);
} catch (RocksDBException e) {
- throw new StorageException("Failed to update a row in
storage", e);
+ throw new StorageException("Failed to add row to the GC queue:
" + createStorageInfo(), e);
+ }
+
+ // We only write tombstone if the previous value for the same row
id was not a tombstone.
+ // So there won't be consecutive tombstones for the same row id.
+ if (!newAndPrevTombstones) {
+ try {
+ writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE),
rowBytes);
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to update a row in
storage: " + createStorageInfo(), e);
+ }
}
+
+ return null;
});
}
+ /**
+ * Tries adding a row to the GC queue. We put new row's timestamp, because
we can remove previous row only if both this row's
+ * and previous row's timestamps are below the watermark.
+ * Returns {@code true} if new value and previous value are both
tombstones.
+ *
+ * @param writeBatch Write batch.
+ * @param rowId Row id.
+ * @param timestamp New row's timestamp.
+ * @param isNewValueTombstone If new row is a tombstone.
+ * @return {@code true} if new value and previous value are both
tombstones.
+ * @throws RocksDBException If failed.
+ */
+ private boolean maybeAddToGcQueue(WriteBatchWithIndex writeBatch, RowId
rowId, HybridTimestamp timestamp, boolean isNewValueTombstone)
+ throws RocksDBException {
+ boolean newAndPrevTombstones = false;
+
+ // Try find previous value for the row id.
+ ByteBuffer key = allocateDirect(MAX_KEY_SIZE);
+ key.putShort((short) partitionId);
+ putRowId(key, rowId);
+ putTimestampDesc(key, timestamp);
+
+ try (
+ Slice upperBound = new Slice(partitionEndPrefix());
+ ReadOptions readOpts = new
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, readOpts)
+ ) {
+ key.flip();
+ it.seek(key);
+
+ if (!invalid(it)) {
Review Comment:
I would suggest to invert this condition
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -171,6 +186,9 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/** Meta column family. */
private final ColumnFamilyHandle meta;
+ /** GC queue column family. */
Review Comment:
Is there a description anywhere about what is a GC queue?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]