sashapolo commented on code in PR #1619:
URL: https://github.com/apache/ignite-3/pull/1619#discussion_r1095819933


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -570,17 +589,26 @@ public void commitWrite(RowId rowId, HybridTimestamp 
timestamp) throws StorageEx
                 byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf, 
readOpts, uncommittedKeyBytes);
 
                 if (valueBytes == null) {
-                    //the chain doesn't contain an uncommitted write intent
+                    // The chain doesn't contain an uncommitted write intent.
                     return null;
                 }
 
+                boolean isNewValueTombstone = valueBytes.length == 
VALUE_HEADER_SIZE;
+
+                // Both this and previous values for the row id are tombstones.
+                boolean newAndPrevTombstones = maybeAddToGcQueue(writeBatch, 
rowId, timestamp, isNewValueTombstone);

Review Comment:
   I think `tryAddToGcQueue` is a better name for this method



##########
modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.file.Path;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.storage.AbstractMvPartitionStorageGcTest;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test implementation for {@link RocksDbStorageEngine}.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class RocksDbMvPartitionStorageGcTest extends 
AbstractMvPartitionStorageGcTest {
+    @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 
16777216, writeBufferSize = 16777216}}")

Review Comment:
   What's this configuration for?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1123,218 @@ public void destroyData(WriteBatch writeBatch) throws 
RocksDBException {
         writeBatch.deleteRange(cf, partitionStartPrefix(), 
partitionEndPrefix());
     }
 
+    @Override
+    public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp 
lowWatermark) {
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+            WriteBatchWithIndex batch = requireWriteBatch();
+
+            // We retrieve the first element of the GC queue and seeking for 
it in the data CF.
+            // However, the element that we need to garbage collect is the 
next (older one) element.
+            // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
+            // If the next element is exists, that should be the element that 
we want to garbage collect.
+            try (
+                    var gcUpperBound = new Slice(partitionEndPrefix());
+                    ReadOptions gcOpts = new 
ReadOptions().setIterateUpperBound(gcUpperBound).setTotalOrderSeek(true);

Review Comment:
   Why is `.setTotalOrderSeek(true)` needed? Please leave a comment



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -570,17 +589,26 @@ public void commitWrite(RowId rowId, HybridTimestamp 
timestamp) throws StorageEx
                 byte[] valueBytes = writeBatch.getFromBatchAndDB(db, cf, 
readOpts, uncommittedKeyBytes);
 
                 if (valueBytes == null) {
-                    //the chain doesn't contain an uncommitted write intent
+                    // The chain doesn't contain an uncommitted write intent.
                     return null;
                 }
 
+                boolean isNewValueTombstone = valueBytes.length == 
VALUE_HEADER_SIZE;
+
+                // Both this and previous values for the row id are tombstones.
+                boolean newAndPrevTombstones = maybeAddToGcQueue(writeBatch, 
rowId, timestamp, isNewValueTombstone);
+
                 // Delete pending write.
                 writeBatch.delete(cf, uncommittedKeyBytes);
 
-                // Add timestamp to the key, and put the value back into the 
storage.
-                putTimestamp(keyBuf, timestamp);
+                // We only write tombstone if the previous value for the same 
row id was not a tombstone.
+                // So there won't be consecutive tombstones for the same row 
id.
+                if (!newAndPrevTombstones) {

Review Comment:
   Why wasn't this check present in the previous implementation? Was it a bug? 
And when can we have two tombstones for the same key anyway?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1123,218 @@ public void destroyData(WriteBatch writeBatch) throws 
RocksDBException {
         writeBatch.deleteRange(cf, partitionStartPrefix(), 
partitionEndPrefix());
     }
 
+    @Override
+    public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp 
lowWatermark) {
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+            WriteBatchWithIndex batch = requireWriteBatch();
+
+            // We retrieve the first element of the GC queue and seeking for 
it in the data CF.

Review Comment:
   ```suggestion
               // We retrieve the first element of the GC queue and seek for it 
in the data CF.
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1123,218 @@ public void destroyData(WriteBatch writeBatch) throws 
RocksDBException {
         writeBatch.deleteRange(cf, partitionStartPrefix(), 
partitionEndPrefix());
     }
 
+    @Override
+    public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp 
lowWatermark) {
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+            WriteBatchWithIndex batch = requireWriteBatch();
+
+            // We retrieve the first element of the GC queue and seeking for 
it in the data CF.
+            // However, the element that we need to garbage collect is the 
next (older one) element.
+            // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
+            // If the next element is exists, that should be the element that 
we want to garbage collect.

Review Comment:
   ```suggestion
               // If the next element exists, that should be the element that 
we want to garbage collect.
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1123,218 @@ public void destroyData(WriteBatch writeBatch) throws 
RocksDBException {
         writeBatch.deleteRange(cf, partitionStartPrefix(), 
partitionEndPrefix());
     }
 
+    @Override
+    public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp 
lowWatermark) {
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+            WriteBatchWithIndex batch = requireWriteBatch();
+
+            // We retrieve the first element of the GC queue and seeking for 
it in the data CF.
+            // However, the element that we need to garbage collect is the 
next (older one) element.
+            // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
+            // If the next element is exists, that should be the element that 
we want to garbage collect.
+            try (
+                    var gcUpperBound = new Slice(partitionEndPrefix());
+                    ReadOptions gcOpts = new 
ReadOptions().setIterateUpperBound(gcUpperBound).setTotalOrderSeek(true);
+                    RocksIterator gcIt = db.newIterator(gc, gcOpts);
+            ) {
+                gcIt.seek(partitionStartPrefix());
+
+                if (invalid(gcIt)) {
+                    // GC queue is empty.
+                    return null;
+                }
+
+                ByteBuffer gcKeyBuffer = allocateDirect(GC_KEY_SIZE);
+                gcKeyBuffer.clear();
+
+                ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();

Review Comment:
   Why do we clear this buffer here? What if the lowWatermark is too small and 
we exit early?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java:
##########
@@ -181,7 +159,7 @@ protected static List<IgniteBiTuple<TestKey, TestValue>> 
drainToList(Cursor<Read
         }
     }
 
-    protected final void assertRowMatches(TableRow rowUnderQuestion, TableRow 
expectedRow) {
+    protected final void assertRowMatches(@Nullable TableRow rowUnderQuestion, 
TableRow expectedRow) {

Review Comment:
   > Second, in the first line of method we assert it not to be null. So it is 
nullable
   
   This contradicts the logic of `@Nullable` annotation. Methods with 
`@Nullable` parameters should accept `null`s as valid values



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1123,218 @@ public void destroyData(WriteBatch writeBatch) throws 
RocksDBException {
         writeBatch.deleteRange(cf, partitionStartPrefix(), 
partitionEndPrefix());
     }
 
+    @Override
+    public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp 
lowWatermark) {
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+            WriteBatchWithIndex batch = requireWriteBatch();
+
+            // We retrieve the first element of the GC queue and seeking for 
it in the data CF.
+            // However, the element that we need to garbage collect is the 
next (older one) element.
+            // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
+            // If the next element is exists, that should be the element that 
we want to garbage collect.
+            try (
+                    var gcUpperBound = new Slice(partitionEndPrefix());
+                    ReadOptions gcOpts = new 
ReadOptions().setIterateUpperBound(gcUpperBound).setTotalOrderSeek(true);
+                    RocksIterator gcIt = db.newIterator(gc, gcOpts);
+            ) {
+                gcIt.seek(partitionStartPrefix());
+
+                if (invalid(gcIt)) {
+                    // GC queue is empty.
+                    return null;
+                }
+
+                ByteBuffer gcKeyBuffer = allocateDirect(GC_KEY_SIZE);
+                gcKeyBuffer.clear();
+
+                ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+                dataKeyBuffer.clear();
+
+                gcIt.key(gcKeyBuffer);
+
+                HybridTimestamp gcElementTimestamp = 
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+                if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+                    // No elements to garbage collect.
+                    return null;
+                }
+
+                RowId gcElementRowId = getRowId(gcKeyBuffer, 
GC_KEY_ROW_ID_OFFSET);
+
+                try (
+                        var upperBound = new Slice(partitionEndPrefix());
+                        ReadOptions opts = new 
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);

Review Comment:
   Looks like these read options may be cached



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -595,21 +623,99 @@ public void addWriteCommitted(RowId rowId, @Nullable 
TableRow row, HybridTimesta
             @SuppressWarnings("resource") WriteBatchWithIndex writeBatch = 
requireWriteBatch();
 
             ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
-            putTimestamp(keyBuf, commitTimestamp);
+            putTimestampDesc(keyBuf, commitTimestamp);
+
+            boolean isNewValueTombstone = row == null;
 
             //TODO IGNITE-16913 Add proper way to write row bytes into array 
without allocations.
-            byte[] rowBytes = rowBytes(row);
+            byte[] rowBytes = row != null ? rowBytes(row) : 
ArrayUtils.BYTE_EMPTY_ARRAY;
 
+            boolean newAndPrevTombstones; // Both this and previous values for 
the row id are tombstones.
             try {
-                writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), 
rowBytes);
-
-                return null;
+                newAndPrevTombstones = maybeAddToGcQueue(writeBatch, rowId, 
commitTimestamp, isNewValueTombstone);
             } catch (RocksDBException e) {
-                throw new StorageException("Failed to update a row in 
storage", e);
+                throw new StorageException("Failed to add row to the GC queue: 
" + createStorageInfo(), e);
+            }
+
+            // We only write tombstone if the previous value for the same row 
id was not a tombstone.
+            // So there won't be consecutive tombstones for the same row id.
+            if (!newAndPrevTombstones) {
+                try {
+                    writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE), 
rowBytes);
+                } catch (RocksDBException e) {
+                    throw new StorageException("Failed to update a row in 
storage: " + createStorageInfo(), e);
+                }
             }
+
+            return null;
         });
     }
 
+    /**
+     * Tries adding a row to the GC queue. We put new row's timestamp, because 
we can remove previous row only if both this row's
+     * and previous row's timestamps are below the watermark.
+     * Returns {@code true} if new value and previous value are both 
tombstones.
+     *
+     * @param writeBatch Write batch.
+     * @param rowId Row id.
+     * @param timestamp New row's timestamp.
+     * @param isNewValueTombstone If new row is a tombstone.
+     * @return {@code true} if new value and previous value are both 
tombstones.
+     * @throws RocksDBException If failed.
+     */
+    private boolean maybeAddToGcQueue(WriteBatchWithIndex writeBatch, RowId 
rowId, HybridTimestamp timestamp, boolean isNewValueTombstone)
+            throws RocksDBException {
+        boolean newAndPrevTombstones = false;
+
+        // Try find previous value for the row id.
+        ByteBuffer key = allocateDirect(MAX_KEY_SIZE);
+        key.putShort((short) partitionId);
+        putRowId(key, rowId);
+        putTimestampDesc(key, timestamp);
+
+        try (
+                Slice upperBound = new Slice(partitionEndPrefix());
+                ReadOptions readOpts = new 
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+                RocksIterator it = db.newIterator(cf, readOpts)
+        ) {
+            key.flip();
+            it.seek(key);
+
+            if (!invalid(it)) {

Review Comment:
   I would suggest to invert this condition



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -171,6 +186,9 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     /** Meta column family. */
     private final ColumnFamilyHandle meta;
 
+    /** GC queue column family. */

Review Comment:
   Is there a description anywhere about what is a GC queue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to