tkalkirill commented on code in PR #1619:
URL: https://github.com/apache/ignite-3/pull/1619#discussion_r1096983649


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/TableRowAndRowId.java:
##########
@@ -24,7 +24,7 @@
  * Wrapper that holds both {@link TableRow} and {@link RowId}. {@link 
TableRow} is null for tombstones.
  */
 public class TableRowAndRowId {
-    /** Table row. */
+    /** Table row. {@code null} if tombstone. */

Review Comment:
   It seems that we discussed that such a statement is not entirely true, and 
we may have a situation that this is not a tombstone)
   I propose to describe this in the documentation for the method.



##########
modules/storage-rocksdb/docs/garbage-collection.md:
##########
@@ -0,0 +1,83 @@
+# Garbage Collection in the RocksDB partition storage
+
+## Garbage Collection queue
+
+We store garbage collector's queue in the RocksDB column family in the 
following
+format. The key:
+
+| Partition id | Timestamp                                 | Row id         |
+|--------------|-------------------------------------------|----------------|
+| 2-byte       | 12-byte (8-byte physical, 4-byte logical) | 16-byte (uuid) |
+
+The value is not stored, as we only need the key. We can make row id the value,
+because for the ascending order processing of the queue we only need the 
timestamp,
+however, multiple row ids can have same timestamp, so making row id a value 
requires storing a list of
+row ids, hence the commit in this implementation of the storage becomes more 
sophisticated and, probably,
+less performant.
+
+Each time a row is being committed to the storage, we perform a check whether
+there is already a value for this row. If there is one and both it and new 
version are not tombstones, we put
+new commit's timestamp and row id into the GC queue. To understand why we only 
put new value's timestamp
+please refer to the Garbage Collection 
[algorithm](#garbage-collection-algorithm).  
+The queue is updated along with the data column family in a single batch and 
is destroyed when the storage
+is being cleared or destroyed.
+
+## Garbage Collection algorithm
+
+It's important to understand when we actually need to perform garbage 
collection.   
+
+Consider the following example:  
+*Note that **Record number** is a hypothetical value that helps referring to 
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp |
+|---------------|--------|-----------|
+| 1             | Foo    | 1         |
+| 2             | Foo    | 10        |
+
+In this case, we can only remove record 1 if the low watermark is 10 or 
higher. If watermark is at 9, 
+then it means that there can still occur a transaction with a 9 timestamp, 
which means that the record number 1 
+is still needed.  

Review Comment:
   I think that if the LW is 9 then we only need the row with 10, right?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Helper.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+
+/** Helper for the partition data. */
+class Helper {

Review Comment:
   The name is too generic for me. Maybe rename it to something like 
`PartitionDataHelper`?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Helper.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+
+/** Helper for the partition data. */
+class Helper {
+    /** Commit partition id size. */
+    static final int PARTITION_ID_SIZE = Short.BYTES;
+
+    /** UUID size in bytes. */
+    static final int ROW_ID_SIZE = 2 * Long.BYTES;
+
+    /** Position of row id inside the key. */
+    static final int ROW_ID_OFFSET = Short.BYTES;
+
+    /** Size of the key without timestamp. */
+    static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Maximum size of the data key. */
+    static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE + HYBRID_TIMESTAMP_SIZE;
+
+    /** Transaction id size (part of the transaction state). */
+    static final int TX_ID_SIZE = 2 * Long.BYTES;
+
+    /** Commit table id size (part of the transaction state). */
+    static final int TABLE_ID_SIZE = 2 * Long.BYTES;
+
+    /** Size of the value header (transaction state). */
+    static final int VALUE_HEADER_SIZE = TX_ID_SIZE + TABLE_ID_SIZE + 
PARTITION_ID_SIZE;
+
+    /** Transaction id offset. */
+    static final int TX_ID_OFFSET = 0;
+
+    /** Commit table id offset. */
+    static final int TABLE_ID_OFFSET = TX_ID_SIZE;
+
+    /** Commit partition id offset. */
+    static final int PARTITION_ID_OFFSET = TABLE_ID_OFFSET + TABLE_ID_SIZE;
+
+    /** Value offset (if transaction state is present). */
+    static final int VALUE_OFFSET = VALUE_HEADER_SIZE;
+
+    static final ByteOrder TABLE_ROW_BYTE_ORDER = TableRow.ORDER;
+
+    static final ByteOrder KEY_BYTE_ORDER = ByteOrder.BIG_ENDIAN;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER = withInitial(() -> 
allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    final int partitionId;
+
+    final RocksDB db;
+
+    final ColumnFamilyHandle cf;

Review Comment:
   Let's rename it to `partCf`.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java:
##########
@@ -181,7 +158,7 @@ protected static List<IgniteBiTuple<TestKey, TestValue>> 
drainToList(Cursor<Read
         }
     }
 
-    protected final void assertRowMatches(TableRow rowUnderQuestion, TableRow 
expectedRow) {
+    protected final void assertRowMatches(@Nullable TableRow rowUnderQuestion, 
TableRow expectedRow) {

Review Comment:
   Can we still remove the `@Nullable`? it confuses a little, like `null` is a 
valid value, and we will fall here.



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in 
this module.
+ */
+class GarbageCollector {
+    /**
+     * Empty direct byte buffer. Note that allocating memory of size 0 is UB, 
so java actually allocates
+     * a 1-byte space. Be sure not to use this buffer for actual reading or 
writing.
+     * In this instance it is only used for RocksDB to get the size of the 
entry without copying the entry into the buffer.
+     */
+    private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+    /** Garbage collector's queue key's timestamp offset. */
+    private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+    /** Garbage collector's queue key's row id offset. */
+    private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET + 
HYBRID_TIMESTAMP_SIZE;
+
+    /** Garbage collector's queue key's size. */
+    private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER = 
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Helper for the rocksdb partition. */
+    private final Helper helper;
+
+    GarbageCollector(Helper helper) {
+        this.helper = helper;
+    }
+
+    /**
+     * Tries adding a row to the GC queue. We put new row's timestamp, because 
we can remove previous row only if both this row's
+     * and previous row's timestamps are below the watermark.
+     * Returns {@code true} if new value and previous value are both 
tombstones.
+     *
+     * @param writeBatch Write batch.
+     * @param rowId Row id.
+     * @param timestamp New row's timestamp.
+     * @param isNewValueTombstone If new row is a tombstone.
+     * @return {@code true} if new value and previous value are both 
tombstones.
+     * @throws RocksDBException If failed.
+     */
+    boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, 
HybridTimestamp timestamp, boolean isNewValueTombstone)
+            throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;

Review Comment:
   Let's rename to `partCf`



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1494,7 +1473,7 @@ void startRebalance(WriteBatch writeBatch) {
      *
      * @throws StorageRebalanceException If there was an error when aborting 
the rebalance.
      */
-    void abortReblance(WriteBatch writeBatch) {

Review Comment:
   Lol



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in 
this module.
+ */
+class GarbageCollector {
+    /**
+     * Empty direct byte buffer. Note that allocating memory of size 0 is UB, 
so java actually allocates
+     * a 1-byte space. Be sure not to use this buffer for actual reading or 
writing.
+     * In this instance it is only used for RocksDB to get the size of the 
entry without copying the entry into the buffer.
+     */
+    private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+    /** Garbage collector's queue key's timestamp offset. */
+    private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+    /** Garbage collector's queue key's row id offset. */
+    private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET + 
HYBRID_TIMESTAMP_SIZE;
+
+    /** Garbage collector's queue key's size. */
+    private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER = 
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Helper for the rocksdb partition. */
+    private final Helper helper;
+
+    GarbageCollector(Helper helper) {
+        this.helper = helper;
+    }
+
+    /**
+     * Tries adding a row to the GC queue. We put new row's timestamp, because 
we can remove previous row only if both this row's
+     * and previous row's timestamps are below the watermark.
+     * Returns {@code true} if new value and previous value are both 
tombstones.
+     *
+     * @param writeBatch Write batch.
+     * @param rowId Row id.
+     * @param timestamp New row's timestamp.
+     * @param isNewValueTombstone If new row is a tombstone.
+     * @return {@code true} if new value and previous value are both 
tombstones.
+     * @throws RocksDBException If failed.
+     */
+    boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, 
HybridTimestamp timestamp, boolean isNewValueTombstone)
+            throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;
+
+        boolean newAndPrevTombstones = false;
+
+        // Try find previous value for the row id.
+        ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+        keyBuffer.clear();
+
+        helper.putDataKey(keyBuffer, rowId, timestamp);
+
+        try (
+                Slice upperBound = new Slice(helper.partitionEndPrefix());
+                ReadOptions readOpts = new 
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+                RocksIterator it = db.newIterator(cf, readOpts)
+        ) {
+            it.seek(keyBuffer);
+
+            if (invalid(it)) {
+                return false;
+            }
+
+            keyBuffer.clear();
+
+            int keyLen = it.key(keyBuffer);
+
+            RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+            if (readRowId.equals(rowId)) {
+                // Found previous value.
+                assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+                if (isNewValueTombstone) {
+                    // If new value is a tombstone, lets check if previous 
value was also a tombstone.
+                    int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+                    newAndPrevTombstones = valueSize == 0;
+                }
+
+                if (!newAndPrevTombstones) {
+                    keyBuffer.clear();
+
+                    helper.putGcKey(keyBuffer, rowId, timestamp);
+
+                    writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+                }
+            }
+        }
+
+        return newAndPrevTombstones;
+    }
+
+    /**
+     * Polls an element for vacuum. See {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+     *
+     * @param batch Write batch.
+     * @param lowWatermark Low watermark.
+     * @return Garbage collected element.
+     * @throws RocksDBException If failed to collect the garbage.
+     */
+    @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch, 
HybridTimestamp lowWatermark) throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;

Review Comment:
   Let's rename to `partCf`



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in 
this module.
+ */
+class GarbageCollector {
+    /**
+     * Empty direct byte buffer. Note that allocating memory of size 0 is UB, 
so java actually allocates
+     * a 1-byte space. Be sure not to use this buffer for actual reading or 
writing.
+     * In this instance it is only used for RocksDB to get the size of the 
entry without copying the entry into the buffer.
+     */
+    private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+    /** Garbage collector's queue key's timestamp offset. */
+    private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+    /** Garbage collector's queue key's row id offset. */
+    private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET + 
HYBRID_TIMESTAMP_SIZE;
+
+    /** Garbage collector's queue key's size. */
+    private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER = 
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Helper for the rocksdb partition. */
+    private final Helper helper;
+
+    GarbageCollector(Helper helper) {
+        this.helper = helper;
+    }
+
+    /**
+     * Tries adding a row to the GC queue. We put new row's timestamp, because 
we can remove previous row only if both this row's
+     * and previous row's timestamps are below the watermark.
+     * Returns {@code true} if new value and previous value are both 
tombstones.
+     *
+     * @param writeBatch Write batch.
+     * @param rowId Row id.
+     * @param timestamp New row's timestamp.
+     * @param isNewValueTombstone If new row is a tombstone.
+     * @return {@code true} if new value and previous value are both 
tombstones.
+     * @throws RocksDBException If failed.
+     */
+    boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, 
HybridTimestamp timestamp, boolean isNewValueTombstone)
+            throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;
+
+        boolean newAndPrevTombstones = false;
+
+        // Try find previous value for the row id.
+        ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+        keyBuffer.clear();
+
+        helper.putDataKey(keyBuffer, rowId, timestamp);
+
+        try (
+                Slice upperBound = new Slice(helper.partitionEndPrefix());
+                ReadOptions readOpts = new 
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+                RocksIterator it = db.newIterator(cf, readOpts)
+        ) {
+            it.seek(keyBuffer);
+
+            if (invalid(it)) {
+                return false;
+            }
+
+            keyBuffer.clear();
+
+            int keyLen = it.key(keyBuffer);
+
+            RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+            if (readRowId.equals(rowId)) {
+                // Found previous value.
+                assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+                if (isNewValueTombstone) {
+                    // If new value is a tombstone, lets check if previous 
value was also a tombstone.
+                    int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+                    newAndPrevTombstones = valueSize == 0;
+                }
+
+                if (!newAndPrevTombstones) {
+                    keyBuffer.clear();
+
+                    helper.putGcKey(keyBuffer, rowId, timestamp);
+
+                    writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+                }
+            }
+        }
+
+        return newAndPrevTombstones;
+    }
+
+    /**
+     * Polls an element for vacuum. See {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+     *
+     * @param batch Write batch.
+     * @param lowWatermark Low watermark.
+     * @return Garbage collected element.
+     * @throws RocksDBException If failed to collect the garbage.
+     */
+    @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch, 
HybridTimestamp lowWatermark) throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;
+
+        // We retrieve the first element of the GC queue and seek for it in 
the data CF.
+        // However, the element that we need to garbage collect is the next 
(older one) element.
+        // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
+        // If the next element exists, that should be the element that we want 
to garbage collect.
+        try (
+                var gcUpperBound = new Slice(helper.partitionEndPrefix());
+                ReadOptions gcOpts = new 
ReadOptions().setIterateUpperBound(gcUpperBound);
+                RocksIterator gcIt = db.newIterator(gc, gcOpts)
+        ) {
+            gcIt.seek(helper.partitionStartPrefix());
+
+            if (invalid(gcIt)) {
+                // GC queue is empty.
+                return null;
+            }
+
+            ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+            gcKeyBuffer.clear();
+
+            gcIt.key(gcKeyBuffer);
+
+            HybridTimestamp gcElementTimestamp = 
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+            if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+                // No elements to garbage collect.
+                return null;
+            }
+
+            RowId gcElementRowId = helper.getRowId(gcKeyBuffer, 
GC_KEY_ROW_ID_OFFSET);
+
+            // Delete element from the GC queue.
+            batch.delete(gc, gcKeyBuffer);
+
+            try (

Review Comment:
   Please also add some description.



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in 
this module.
+ */
+class GarbageCollector {
+    /**
+     * Empty direct byte buffer. Note that allocating memory of size 0 is UB, 
so java actually allocates
+     * a 1-byte space. Be sure not to use this buffer for actual reading or 
writing.
+     * In this instance it is only used for RocksDB to get the size of the 
entry without copying the entry into the buffer.
+     */
+    private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+    /** Garbage collector's queue key's timestamp offset. */
+    private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+    /** Garbage collector's queue key's row id offset. */
+    private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET + 
HYBRID_TIMESTAMP_SIZE;
+
+    /** Garbage collector's queue key's size. */
+    private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER = 
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Helper for the rocksdb partition. */
+    private final Helper helper;
+
+    GarbageCollector(Helper helper) {
+        this.helper = helper;
+    }
+
+    /**
+     * Tries adding a row to the GC queue. We put new row's timestamp, because 
we can remove previous row only if both this row's
+     * and previous row's timestamps are below the watermark.
+     * Returns {@code true} if new value and previous value are both 
tombstones.
+     *
+     * @param writeBatch Write batch.
+     * @param rowId Row id.
+     * @param timestamp New row's timestamp.
+     * @param isNewValueTombstone If new row is a tombstone.
+     * @return {@code true} if new value and previous value are both 
tombstones.
+     * @throws RocksDBException If failed.
+     */
+    boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, 
HybridTimestamp timestamp, boolean isNewValueTombstone)
+            throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;
+
+        boolean newAndPrevTombstones = false;
+
+        // Try find previous value for the row id.
+        ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+        keyBuffer.clear();
+
+        helper.putDataKey(keyBuffer, rowId, timestamp);
+
+        try (
+                Slice upperBound = new Slice(helper.partitionEndPrefix());
+                ReadOptions readOpts = new 
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+                RocksIterator it = db.newIterator(cf, readOpts)
+        ) {
+            it.seek(keyBuffer);
+
+            if (invalid(it)) {
+                return false;
+            }
+
+            keyBuffer.clear();
+
+            int keyLen = it.key(keyBuffer);
+
+            RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+            if (readRowId.equals(rowId)) {
+                // Found previous value.
+                assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+                if (isNewValueTombstone) {
+                    // If new value is a tombstone, lets check if previous 
value was also a tombstone.
+                    int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+                    newAndPrevTombstones = valueSize == 0;
+                }
+
+                if (!newAndPrevTombstones) {
+                    keyBuffer.clear();
+
+                    helper.putGcKey(keyBuffer, rowId, timestamp);
+
+                    writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+                }
+            }
+        }
+
+        return newAndPrevTombstones;
+    }
+
+    /**
+     * Polls an element for vacuum. See {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+     *
+     * @param batch Write batch.
+     * @param lowWatermark Low watermark.
+     * @return Garbage collected element.
+     * @throws RocksDBException If failed to collect the garbage.
+     */
+    @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch, 
HybridTimestamp lowWatermark) throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;
+
+        // We retrieve the first element of the GC queue and seek for it in 
the data CF.
+        // However, the element that we need to garbage collect is the next 
(older one) element.
+        // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
+        // If the next element exists, that should be the element that we want 
to garbage collect.
+        try (
+                var gcUpperBound = new Slice(helper.partitionEndPrefix());
+                ReadOptions gcOpts = new 
ReadOptions().setIterateUpperBound(gcUpperBound);
+                RocksIterator gcIt = db.newIterator(gc, gcOpts)
+        ) {
+            gcIt.seek(helper.partitionStartPrefix());
+
+            if (invalid(gcIt)) {
+                // GC queue is empty.
+                return null;
+            }
+
+            ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+            gcKeyBuffer.clear();
+
+            gcIt.key(gcKeyBuffer);
+
+            HybridTimestamp gcElementTimestamp = 
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+            if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+                // No elements to garbage collect.
+                return null;
+            }
+
+            RowId gcElementRowId = helper.getRowId(gcKeyBuffer, 
GC_KEY_ROW_ID_OFFSET);
+
+            // Delete element from the GC queue.
+            batch.delete(gc, gcKeyBuffer);
+
+            try (
+                    var upperBound = new Slice(helper.partitionEndPrefix());
+                    ReadOptions opts = new 
ReadOptions().setIterateUpperBound(upperBound);
+                    RocksIterator it = db.newIterator(cf, opts)
+            ) {
+                ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+                dataKeyBuffer.clear();
+
+                // Process the element in data cf that triggered the addition 
to the GC queue.
+                boolean proceed = checkHasNewerRowAndRemoveTombstone(it, 
batch, dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+                if (!proceed) {
+                    // No further processing required.
+                    return null;
+                }
+
+                // Process the element in data cf that should be garbage 
collected.
+                proceed = checkHasRowForGc(it, dataKeyBuffer, gcElementRowId);
+
+                if (!proceed) {
+                    // No further processing required.
+                    return null;
+                }
+
+                // At this point there's definitely a value that needs to be 
garbage collected in the iterator.
+                byte[] valueBytes = it.value();
+
+                var row = new 
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+                TableRowAndRowId retVal = new TableRowAndRowId(row, 
gcElementRowId);
+
+                // Delete the row from the data cf.
+                batch.delete(cf, dataKeyBuffer);
+
+                return retVal;
+            }
+        }
+    }
+
+    /**
+     * Processes the entry that triggered adding row id to garbage collector's 
queue.
+     * <br>
+     * There might already be no row in the data column family, because GC can 
be run in parallel.
+     * If there is no row in the data column family, returns {@code false} as 
no further processing is required.
+     * if there is a row and this entry is a tombstone, removes tombstone.
+     *
+     * @param it RocksDB data column family iterator.
+     * @param batch Write batch.
+     * @param dataKeyBuffer Buffer for the data column family key.
+     * @param gcElementRowId Row id of the element from the GC queue/
+     * @return {@code true} if further processing by garbage collector is 
needed.
+     */
+    private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it, 
WriteBatchWithIndex batch, ByteBuffer dataKeyBuffer,
+            RowId gcElementRowId, HybridTimestamp gcElementTimestamp) throws 
RocksDBException {
+        ColumnFamilyHandle cf = helper.cf;
+
+        // Set up the data key.
+        helper.putDataKey(dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+        // Seek to the row id and timestamp from the GC queue.
+        // Note that it doesn't mean that the element in this iterator has 
matching row id or even partition id.
+        it.seek(dataKeyBuffer);
+
+        boolean hasRowForGc = true;
+
+        if (invalid(it)) {
+            // There is no row for the GC queue element.
+            hasRowForGc = false;
+        } else {
+            dataKeyBuffer.clear();
+
+            it.key(dataKeyBuffer);
+
+            if (!helper.getRowId(dataKeyBuffer, 
ROW_ID_OFFSET).equals(gcElementRowId)) {
+                // There is no row for the GC queue element.
+                hasRowForGc = false;
+            }
+        }
+
+        if (!hasRowForGc) {
+            return false;
+        }
+
+        // Check if the new element, whose insertion scheduled the GC, was a 
tombstone.
+        int len = it.value(EMPTY_DIRECT_BUFFER);
+
+        if (len == 0) {
+            // This is a tombstone, we need to delete it.
+            batch.delete(cf, dataKeyBuffer);
+        }
+
+        return true;

Review Comment:
   But what about the `hasRowForGc` ? =)



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in 
this module.
+ */
+class GarbageCollector {
+    /**
+     * Empty direct byte buffer. Note that allocating memory of size 0 is UB, 
so java actually allocates
+     * a 1-byte space. Be sure not to use this buffer for actual reading or 
writing.
+     * In this instance it is only used for RocksDB to get the size of the 
entry without copying the entry into the buffer.
+     */
+    private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+    /** Garbage collector's queue key's timestamp offset. */
+    private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+    /** Garbage collector's queue key's row id offset. */
+    private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET + 
HYBRID_TIMESTAMP_SIZE;
+
+    /** Garbage collector's queue key's size. */
+    private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER = 
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Helper for the rocksdb partition. */
+    private final Helper helper;
+
+    GarbageCollector(Helper helper) {
+        this.helper = helper;
+    }
+
+    /**
+     * Tries adding a row to the GC queue. We put new row's timestamp, because 
we can remove previous row only if both this row's
+     * and previous row's timestamps are below the watermark.
+     * Returns {@code true} if new value and previous value are both 
tombstones.
+     *
+     * @param writeBatch Write batch.
+     * @param rowId Row id.
+     * @param timestamp New row's timestamp.
+     * @param isNewValueTombstone If new row is a tombstone.
+     * @return {@code true} if new value and previous value are both 
tombstones.
+     * @throws RocksDBException If failed.
+     */
+    boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, 
HybridTimestamp timestamp, boolean isNewValueTombstone)
+            throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;
+
+        boolean newAndPrevTombstones = false;
+
+        // Try find previous value for the row id.
+        ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+        keyBuffer.clear();
+
+        helper.putDataKey(keyBuffer, rowId, timestamp);
+
+        try (
+                Slice upperBound = new Slice(helper.partitionEndPrefix());
+                ReadOptions readOpts = new 
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+                RocksIterator it = db.newIterator(cf, readOpts)
+        ) {
+            it.seek(keyBuffer);
+
+            if (invalid(it)) {
+                return false;
+            }
+
+            keyBuffer.clear();
+
+            int keyLen = it.key(keyBuffer);
+
+            RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+            if (readRowId.equals(rowId)) {
+                // Found previous value.
+                assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+                if (isNewValueTombstone) {
+                    // If new value is a tombstone, lets check if previous 
value was also a tombstone.
+                    int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+                    newAndPrevTombstones = valueSize == 0;
+                }
+
+                if (!newAndPrevTombstones) {
+                    keyBuffer.clear();
+
+                    helper.putGcKey(keyBuffer, rowId, timestamp);
+
+                    writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+                }
+            }
+        }
+
+        return newAndPrevTombstones;
+    }
+
+    /**
+     * Polls an element for vacuum. See {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+     *
+     * @param batch Write batch.
+     * @param lowWatermark Low watermark.
+     * @return Garbage collected element.
+     * @throws RocksDBException If failed to collect the garbage.
+     */
+    @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch, 
HybridTimestamp lowWatermark) throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;
+
+        // We retrieve the first element of the GC queue and seek for it in 
the data CF.
+        // However, the element that we need to garbage collect is the next 
(older one) element.
+        // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
+        // If the next element exists, that should be the element that we want 
to garbage collect.
+        try (
+                var gcUpperBound = new Slice(helper.partitionEndPrefix());
+                ReadOptions gcOpts = new 
ReadOptions().setIterateUpperBound(gcUpperBound);
+                RocksIterator gcIt = db.newIterator(gc, gcOpts)
+        ) {
+            gcIt.seek(helper.partitionStartPrefix());
+
+            if (invalid(gcIt)) {
+                // GC queue is empty.
+                return null;
+            }
+
+            ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+            gcKeyBuffer.clear();
+
+            gcIt.key(gcKeyBuffer);
+
+            HybridTimestamp gcElementTimestamp = 
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+            if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+                // No elements to garbage collect.
+                return null;
+            }
+
+            RowId gcElementRowId = helper.getRowId(gcKeyBuffer, 
GC_KEY_ROW_ID_OFFSET);
+
+            // Delete element from the GC queue.
+            batch.delete(gc, gcKeyBuffer);
+
+            try (
+                    var upperBound = new Slice(helper.partitionEndPrefix());
+                    ReadOptions opts = new 
ReadOptions().setIterateUpperBound(upperBound);
+                    RocksIterator it = db.newIterator(cf, opts)
+            ) {
+                ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+                dataKeyBuffer.clear();
+
+                // Process the element in data cf that triggered the addition 
to the GC queue.
+                boolean proceed = checkHasNewerRowAndRemoveTombstone(it, 
batch, dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+                if (!proceed) {
+                    // No further processing required.
+                    return null;
+                }
+
+                // Process the element in data cf that should be garbage 
collected.
+                proceed = checkHasRowForGc(it, dataKeyBuffer, gcElementRowId);
+
+                if (!proceed) {
+                    // No further processing required.
+                    return null;
+                }
+
+                // At this point there's definitely a value that needs to be 
garbage collected in the iterator.
+                byte[] valueBytes = it.value();
+
+                var row = new 
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+                TableRowAndRowId retVal = new TableRowAndRowId(row, 
gcElementRowId);
+
+                // Delete the row from the data cf.
+                batch.delete(cf, dataKeyBuffer);
+
+                return retVal;
+            }
+        }
+    }
+
+    /**
+     * Processes the entry that triggered adding row id to garbage collector's 
queue.
+     * <br>
+     * There might already be no row in the data column family, because GC can 
be run in parallel.
+     * If there is no row in the data column family, returns {@code false} as 
no further processing is required.
+     * if there is a row and this entry is a tombstone, removes tombstone.
+     *
+     * @param it RocksDB data column family iterator.
+     * @param batch Write batch.
+     * @param dataKeyBuffer Buffer for the data column family key.
+     * @param gcElementRowId Row id of the element from the GC queue/
+     * @return {@code true} if further processing by garbage collector is 
needed.
+     */
+    private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it, 
WriteBatchWithIndex batch, ByteBuffer dataKeyBuffer,
+            RowId gcElementRowId, HybridTimestamp gcElementTimestamp) throws 
RocksDBException {
+        ColumnFamilyHandle cf = helper.cf;
+
+        // Set up the data key.
+        helper.putDataKey(dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+        // Seek to the row id and timestamp from the GC queue.
+        // Note that it doesn't mean that the element in this iterator has 
matching row id or even partition id.
+        it.seek(dataKeyBuffer);
+
+        boolean hasRowForGc = true;
+
+        if (invalid(it)) {
+            // There is no row for the GC queue element.
+            hasRowForGc = false;
+        } else {
+            dataKeyBuffer.clear();
+
+            it.key(dataKeyBuffer);
+
+            if (!helper.getRowId(dataKeyBuffer, 
ROW_ID_OFFSET).equals(gcElementRowId)) {
+                // There is no row for the GC queue element.
+                hasRowForGc = false;
+            }
+        }
+
+        if (!hasRowForGc) {
+            return false;
+        }
+
+        // Check if the new element, whose insertion scheduled the GC, was a 
tombstone.
+        int len = it.value(EMPTY_DIRECT_BUFFER);
+
+        if (len == 0) {
+            // This is a tombstone, we need to delete it.
+            batch.delete(cf, dataKeyBuffer);
+        }
+
+        return true;
+    }
+
+    /**
+     * Checks if there is a row for garbage collection.
+     * There might already be no row in the data column family, because GC can 
be run in parallel.
+     *
+     * @param it RocksDB data column family iterator.
+     * @param dataKeyBuffer Buffer for the data column family key.
+     * @param gcElementRowId Row id of the element from the GC queue/
+     * @return {@code true} if further processing by garbage collector is 
needed.
+     */
+    private boolean checkHasRowForGc(RocksIterator it, ByteBuffer 
dataKeyBuffer, RowId gcElementRowId) {
+        // Let's move to the element that was scheduled for GC.
+        it.next();
+
+        boolean hasRowForGc = true;
+
+        RowId gcRowId;
+
+        if (invalid(it)) {
+            hasRowForGc = false;
+        } else {
+            dataKeyBuffer.clear();
+
+            int keyLen = it.key(dataKeyBuffer);
+
+            if (keyLen != MAX_KEY_SIZE) {
+                // We moved to the next row id's write-intent, so there was no 
row to GC for the current row id.
+                hasRowForGc = false;
+            } else {
+                gcRowId = helper.getRowId(dataKeyBuffer, ROW_ID_OFFSET);
+
+                if (!gcElementRowId.equals(gcRowId)) {
+                    // We moved to the next row id, so there was no row to GC 
for the current row id.
+                    hasRowForGc = false;

Review Comment:
   Can be returned immediately. =)



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in 
this module.
+ */
+class GarbageCollector {
+    /**
+     * Empty direct byte buffer. Note that allocating memory of size 0 is UB, 
so java actually allocates
+     * a 1-byte space. Be sure not to use this buffer for actual reading or 
writing.
+     * In this instance it is only used for RocksDB to get the size of the 
entry without copying the entry into the buffer.
+     */
+    private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+    /** Garbage collector's queue key's timestamp offset. */
+    private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+    /** Garbage collector's queue key's row id offset. */
+    private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET + 
HYBRID_TIMESTAMP_SIZE;
+
+    /** Garbage collector's queue key's size. */
+    private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER = 
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Helper for the rocksdb partition. */
+    private final Helper helper;
+
+    GarbageCollector(Helper helper) {
+        this.helper = helper;
+    }
+
+    /**
+     * Tries adding a row to the GC queue. We put new row's timestamp, because 
we can remove previous row only if both this row's
+     * and previous row's timestamps are below the watermark.
+     * Returns {@code true} if new value and previous value are both 
tombstones.
+     *
+     * @param writeBatch Write batch.
+     * @param rowId Row id.
+     * @param timestamp New row's timestamp.
+     * @param isNewValueTombstone If new row is a tombstone.
+     * @return {@code true} if new value and previous value are both 
tombstones.
+     * @throws RocksDBException If failed.
+     */
+    boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, 
HybridTimestamp timestamp, boolean isNewValueTombstone)
+            throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;
+
+        boolean newAndPrevTombstones = false;
+
+        // Try find previous value for the row id.
+        ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+        keyBuffer.clear();
+
+        helper.putDataKey(keyBuffer, rowId, timestamp);
+
+        try (
+                Slice upperBound = new Slice(helper.partitionEndPrefix());
+                ReadOptions readOpts = new 
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+                RocksIterator it = db.newIterator(cf, readOpts)
+        ) {
+            it.seek(keyBuffer);
+
+            if (invalid(it)) {
+                return false;
+            }
+
+            keyBuffer.clear();
+
+            int keyLen = it.key(keyBuffer);
+
+            RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+            if (readRowId.equals(rowId)) {
+                // Found previous value.
+                assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+                if (isNewValueTombstone) {
+                    // If new value is a tombstone, lets check if previous 
value was also a tombstone.
+                    int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+                    newAndPrevTombstones = valueSize == 0;
+                }
+
+                if (!newAndPrevTombstones) {
+                    keyBuffer.clear();
+
+                    helper.putGcKey(keyBuffer, rowId, timestamp);
+
+                    writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+                }
+            }
+        }
+
+        return newAndPrevTombstones;
+    }
+
+    /**
+     * Polls an element for vacuum. See {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+     *
+     * @param batch Write batch.
+     * @param lowWatermark Low watermark.
+     * @return Garbage collected element.
+     * @throws RocksDBException If failed to collect the garbage.
+     */
+    @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch, 
HybridTimestamp lowWatermark) throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;
+
+        // We retrieve the first element of the GC queue and seek for it in 
the data CF.
+        // However, the element that we need to garbage collect is the next 
(older one) element.
+        // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
+        // If the next element exists, that should be the element that we want 
to garbage collect.
+        try (
+                var gcUpperBound = new Slice(helper.partitionEndPrefix());
+                ReadOptions gcOpts = new 
ReadOptions().setIterateUpperBound(gcUpperBound);
+                RocksIterator gcIt = db.newIterator(gc, gcOpts)
+        ) {
+            gcIt.seek(helper.partitionStartPrefix());
+
+            if (invalid(gcIt)) {
+                // GC queue is empty.
+                return null;
+            }
+
+            ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+            gcKeyBuffer.clear();
+
+            gcIt.key(gcKeyBuffer);
+
+            HybridTimestamp gcElementTimestamp = 
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+            if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+                // No elements to garbage collect.
+                return null;
+            }
+
+            RowId gcElementRowId = helper.getRowId(gcKeyBuffer, 
GC_KEY_ROW_ID_OFFSET);
+
+            // Delete element from the GC queue.
+            batch.delete(gc, gcKeyBuffer);
+
+            try (
+                    var upperBound = new Slice(helper.partitionEndPrefix());
+                    ReadOptions opts = new 
ReadOptions().setIterateUpperBound(upperBound);
+                    RocksIterator it = db.newIterator(cf, opts)
+            ) {
+                ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+                dataKeyBuffer.clear();
+
+                // Process the element in data cf that triggered the addition 
to the GC queue.
+                boolean proceed = checkHasNewerRowAndRemoveTombstone(it, 
batch, dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+                if (!proceed) {
+                    // No further processing required.
+                    return null;
+                }
+
+                // Process the element in data cf that should be garbage 
collected.
+                proceed = checkHasRowForGc(it, dataKeyBuffer, gcElementRowId);
+
+                if (!proceed) {
+                    // No further processing required.
+                    return null;
+                }
+
+                // At this point there's definitely a value that needs to be 
garbage collected in the iterator.
+                byte[] valueBytes = it.value();
+
+                var row = new 
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+                TableRowAndRowId retVal = new TableRowAndRowId(row, 
gcElementRowId);
+
+                // Delete the row from the data cf.
+                batch.delete(cf, dataKeyBuffer);
+
+                return retVal;
+            }
+        }
+    }
+
+    /**
+     * Processes the entry that triggered adding row id to garbage collector's 
queue.
+     * <br>
+     * There might already be no row in the data column family, because GC can 
be run in parallel.
+     * If there is no row in the data column family, returns {@code false} as 
no further processing is required.
+     * if there is a row and this entry is a tombstone, removes tombstone.
+     *
+     * @param it RocksDB data column family iterator.
+     * @param batch Write batch.
+     * @param dataKeyBuffer Buffer for the data column family key.
+     * @param gcElementRowId Row id of the element from the GC queue/
+     * @return {@code true} if further processing by garbage collector is 
needed.
+     */
+    private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it, 
WriteBatchWithIndex batch, ByteBuffer dataKeyBuffer,
+            RowId gcElementRowId, HybridTimestamp gcElementTimestamp) throws 
RocksDBException {
+        ColumnFamilyHandle cf = helper.cf;
+
+        // Set up the data key.
+        helper.putDataKey(dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+        // Seek to the row id and timestamp from the GC queue.
+        // Note that it doesn't mean that the element in this iterator has 
matching row id or even partition id.
+        it.seek(dataKeyBuffer);
+
+        boolean hasRowForGc = true;
+
+        if (invalid(it)) {
+            // There is no row for the GC queue element.
+            hasRowForGc = false;
+        } else {
+            dataKeyBuffer.clear();
+
+            it.key(dataKeyBuffer);
+
+            if (!helper.getRowId(dataKeyBuffer, 
ROW_ID_OFFSET).equals(gcElementRowId)) {
+                // There is no row for the GC queue element.
+                hasRowForGc = false;
+            }
+        }
+
+        if (!hasRowForGc) {
+            return false;
+        }
+
+        // Check if the new element, whose insertion scheduled the GC, was a 
tombstone.
+        int len = it.value(EMPTY_DIRECT_BUFFER);
+
+        if (len == 0) {
+            // This is a tombstone, we need to delete it.
+            batch.delete(cf, dataKeyBuffer);
+        }
+
+        return true;
+    }
+
+    /**
+     * Checks if there is a row for garbage collection.
+     * There might already be no row in the data column family, because GC can 
be run in parallel.
+     *
+     * @param it RocksDB data column family iterator.
+     * @param dataKeyBuffer Buffer for the data column family key.
+     * @param gcElementRowId Row id of the element from the GC queue/
+     * @return {@code true} if further processing by garbage collector is 
needed.
+     */
+    private boolean checkHasRowForGc(RocksIterator it, ByteBuffer 
dataKeyBuffer, RowId gcElementRowId) {
+        // Let's move to the element that was scheduled for GC.
+        it.next();
+
+        boolean hasRowForGc = true;
+
+        RowId gcRowId;
+
+        if (invalid(it)) {
+            hasRowForGc = false;
+        } else {
+            dataKeyBuffer.clear();
+
+            int keyLen = it.key(dataKeyBuffer);
+
+            if (keyLen != MAX_KEY_SIZE) {
+                // We moved to the next row id's write-intent, so there was no 
row to GC for the current row id.
+                hasRowForGc = false;

Review Comment:
   Can be returned immediately. =)



##########
modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.file.Path;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.storage.AbstractMvPartitionStorageGcTest;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test implementation for {@link RocksDbStorageEngine}.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class RocksDbMvPartitionStorageGcTest extends 
AbstractMvPartitionStorageGcTest {
+    @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 
16777216, writeBufferSize = 16777216}}")

Review Comment:
   I will assume that for `RocksDbStorageEngine`.
   But perhaps the question is why not use the default configuration, I have 
the same question for Semyon.



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -165,9 +135,15 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     /** RocksDb instance. */
     private final RocksDB db;
 
+    /** Helper for the rocksdb partition. */
+    private final Helper helper;
+
     /** Partitions column family. */
     private final ColumnFamilyHandle cf;

Review Comment:
   As I understand it is now stored in `Helper` and can be removed from here.



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in 
this module.
+ */
+class GarbageCollector {
+    /**
+     * Empty direct byte buffer. Note that allocating memory of size 0 is UB, 
so java actually allocates
+     * a 1-byte space. Be sure not to use this buffer for actual reading or 
writing.
+     * In this instance it is only used for RocksDB to get the size of the 
entry without copying the entry into the buffer.
+     */
+    private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+    /** Garbage collector's queue key's timestamp offset. */
+    private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+    /** Garbage collector's queue key's row id offset. */
+    private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET + 
HYBRID_TIMESTAMP_SIZE;
+
+    /** Garbage collector's queue key's size. */
+    private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER = 
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Helper for the rocksdb partition. */
+    private final Helper helper;
+
+    GarbageCollector(Helper helper) {
+        this.helper = helper;
+    }
+
+    /**
+     * Tries adding a row to the GC queue. We put new row's timestamp, because 
we can remove previous row only if both this row's
+     * and previous row's timestamps are below the watermark.
+     * Returns {@code true} if new value and previous value are both 
tombstones.
+     *
+     * @param writeBatch Write batch.
+     * @param rowId Row id.
+     * @param timestamp New row's timestamp.
+     * @param isNewValueTombstone If new row is a tombstone.
+     * @return {@code true} if new value and previous value are both 
tombstones.
+     * @throws RocksDBException If failed.
+     */
+    boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, 
HybridTimestamp timestamp, boolean isNewValueTombstone)
+            throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;
+
+        boolean newAndPrevTombstones = false;
+
+        // Try find previous value for the row id.
+        ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+        keyBuffer.clear();
+
+        helper.putDataKey(keyBuffer, rowId, timestamp);
+
+        try (
+                Slice upperBound = new Slice(helper.partitionEndPrefix());
+                ReadOptions readOpts = new 
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+                RocksIterator it = db.newIterator(cf, readOpts)
+        ) {
+            it.seek(keyBuffer);
+
+            if (invalid(it)) {
+                return false;
+            }
+
+            keyBuffer.clear();
+
+            int keyLen = it.key(keyBuffer);
+
+            RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+            if (readRowId.equals(rowId)) {
+                // Found previous value.
+                assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+                if (isNewValueTombstone) {
+                    // If new value is a tombstone, lets check if previous 
value was also a tombstone.
+                    int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+                    newAndPrevTombstones = valueSize == 0;
+                }
+
+                if (!newAndPrevTombstones) {
+                    keyBuffer.clear();
+
+                    helper.putGcKey(keyBuffer, rowId, timestamp);
+
+                    writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+                }
+            }
+        }
+
+        return newAndPrevTombstones;
+    }
+
+    /**
+     * Polls an element for vacuum. See {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+     *
+     * @param batch Write batch.
+     * @param lowWatermark Low watermark.
+     * @return Garbage collected element.
+     * @throws RocksDBException If failed to collect the garbage.
+     */
+    @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch, 
HybridTimestamp lowWatermark) throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;
+
+        // We retrieve the first element of the GC queue and seek for it in 
the data CF.
+        // However, the element that we need to garbage collect is the next 
(older one) element.
+        // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
+        // If the next element exists, that should be the element that we want 
to garbage collect.
+        try (
+                var gcUpperBound = new Slice(helper.partitionEndPrefix());
+                ReadOptions gcOpts = new 
ReadOptions().setIterateUpperBound(gcUpperBound);
+                RocksIterator gcIt = db.newIterator(gc, gcOpts)
+        ) {
+            gcIt.seek(helper.partitionStartPrefix());
+
+            if (invalid(gcIt)) {
+                // GC queue is empty.
+                return null;
+            }
+
+            ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+            gcKeyBuffer.clear();
+
+            gcIt.key(gcKeyBuffer);
+
+            HybridTimestamp gcElementTimestamp = 
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+            if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+                // No elements to garbage collect.
+                return null;
+            }
+
+            RowId gcElementRowId = helper.getRowId(gcKeyBuffer, 
GC_KEY_ROW_ID_OFFSET);
+
+            // Delete element from the GC queue.
+            batch.delete(gc, gcKeyBuffer);
+
+            try (
+                    var upperBound = new Slice(helper.partitionEndPrefix());
+                    ReadOptions opts = new 
ReadOptions().setIterateUpperBound(upperBound);
+                    RocksIterator it = db.newIterator(cf, opts)
+            ) {
+                ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+                dataKeyBuffer.clear();
+
+                // Process the element in data cf that triggered the addition 
to the GC queue.
+                boolean proceed = checkHasNewerRowAndRemoveTombstone(it, 
batch, dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+                if (!proceed) {
+                    // No further processing required.
+                    return null;
+                }
+
+                // Process the element in data cf that should be garbage 
collected.
+                proceed = checkHasRowForGc(it, dataKeyBuffer, gcElementRowId);
+
+                if (!proceed) {
+                    // No further processing required.
+                    return null;
+                }
+
+                // At this point there's definitely a value that needs to be 
garbage collected in the iterator.
+                byte[] valueBytes = it.value();
+
+                var row = new 
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+                TableRowAndRowId retVal = new TableRowAndRowId(row, 
gcElementRowId);
+
+                // Delete the row from the data cf.
+                batch.delete(cf, dataKeyBuffer);
+
+                return retVal;
+            }
+        }
+    }
+
+    /**
+     * Processes the entry that triggered adding row id to garbage collector's 
queue.
+     * <br>
+     * There might already be no row in the data column family, because GC can 
be run in parallel.
+     * If there is no row in the data column family, returns {@code false} as 
no further processing is required.
+     * if there is a row and this entry is a tombstone, removes tombstone.
+     *
+     * @param it RocksDB data column family iterator.
+     * @param batch Write batch.
+     * @param dataKeyBuffer Buffer for the data column family key.
+     * @param gcElementRowId Row id of the element from the GC queue/
+     * @return {@code true} if further processing by garbage collector is 
needed.
+     */
+    private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it, 
WriteBatchWithIndex batch, ByteBuffer dataKeyBuffer,
+            RowId gcElementRowId, HybridTimestamp gcElementTimestamp) throws 
RocksDBException {
+        ColumnFamilyHandle cf = helper.cf;
+
+        // Set up the data key.
+        helper.putDataKey(dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+        // Seek to the row id and timestamp from the GC queue.
+        // Note that it doesn't mean that the element in this iterator has 
matching row id or even partition id.
+        it.seek(dataKeyBuffer);
+
+        boolean hasRowForGc = true;
+
+        if (invalid(it)) {
+            // There is no row for the GC queue element.
+            hasRowForGc = false;

Review Comment:
   Can be returned immediately. =)



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in 
this module.
+ */
+class GarbageCollector {
+    /**
+     * Empty direct byte buffer. Note that allocating memory of size 0 is UB, 
so java actually allocates
+     * a 1-byte space. Be sure not to use this buffer for actual reading or 
writing.
+     * In this instance it is only used for RocksDB to get the size of the 
entry without copying the entry into the buffer.
+     */
+    private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+    /** Garbage collector's queue key's timestamp offset. */
+    private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+    /** Garbage collector's queue key's row id offset. */
+    private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET + 
HYBRID_TIMESTAMP_SIZE;
+
+    /** Garbage collector's queue key's size. */
+    private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER = 
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Helper for the rocksdb partition. */
+    private final Helper helper;
+
+    GarbageCollector(Helper helper) {
+        this.helper = helper;
+    }
+
+    /**
+     * Tries adding a row to the GC queue. We put new row's timestamp, because 
we can remove previous row only if both this row's
+     * and previous row's timestamps are below the watermark.
+     * Returns {@code true} if new value and previous value are both 
tombstones.
+     *
+     * @param writeBatch Write batch.
+     * @param rowId Row id.
+     * @param timestamp New row's timestamp.
+     * @param isNewValueTombstone If new row is a tombstone.
+     * @return {@code true} if new value and previous value are both 
tombstones.
+     * @throws RocksDBException If failed.
+     */
+    boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, 
HybridTimestamp timestamp, boolean isNewValueTombstone)
+            throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;
+
+        boolean newAndPrevTombstones = false;
+
+        // Try find previous value for the row id.
+        ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+        keyBuffer.clear();
+
+        helper.putDataKey(keyBuffer, rowId, timestamp);
+
+        try (
+                Slice upperBound = new Slice(helper.partitionEndPrefix());
+                ReadOptions readOpts = new 
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+                RocksIterator it = db.newIterator(cf, readOpts)
+        ) {
+            it.seek(keyBuffer);
+
+            if (invalid(it)) {
+                return false;
+            }
+
+            keyBuffer.clear();
+
+            int keyLen = it.key(keyBuffer);
+
+            RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+            if (readRowId.equals(rowId)) {
+                // Found previous value.
+                assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+                if (isNewValueTombstone) {
+                    // If new value is a tombstone, lets check if previous 
value was also a tombstone.
+                    int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+                    newAndPrevTombstones = valueSize == 0;
+                }
+
+                if (!newAndPrevTombstones) {
+                    keyBuffer.clear();
+
+                    helper.putGcKey(keyBuffer, rowId, timestamp);
+
+                    writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+                }
+            }
+        }
+
+        return newAndPrevTombstones;
+    }
+
+    /**
+     * Polls an element for vacuum. See {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+     *
+     * @param batch Write batch.
+     * @param lowWatermark Low watermark.
+     * @return Garbage collected element.
+     * @throws RocksDBException If failed to collect the garbage.
+     */
+    @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch, 
HybridTimestamp lowWatermark) throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;
+
+        // We retrieve the first element of the GC queue and seek for it in 
the data CF.
+        // However, the element that we need to garbage collect is the next 
(older one) element.
+        // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
+        // If the next element exists, that should be the element that we want 
to garbage collect.
+        try (
+                var gcUpperBound = new Slice(helper.partitionEndPrefix());
+                ReadOptions gcOpts = new 
ReadOptions().setIterateUpperBound(gcUpperBound);
+                RocksIterator gcIt = db.newIterator(gc, gcOpts)
+        ) {
+            gcIt.seek(helper.partitionStartPrefix());
+
+            if (invalid(gcIt)) {
+                // GC queue is empty.
+                return null;
+            }
+
+            ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+            gcKeyBuffer.clear();
+
+            gcIt.key(gcKeyBuffer);
+
+            HybridTimestamp gcElementTimestamp = 
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+            if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+                // No elements to garbage collect.
+                return null;
+            }
+
+            RowId gcElementRowId = helper.getRowId(gcKeyBuffer, 
GC_KEY_ROW_ID_OFFSET);
+
+            // Delete element from the GC queue.
+            batch.delete(gc, gcKeyBuffer);
+
+            try (
+                    var upperBound = new Slice(helper.partitionEndPrefix());
+                    ReadOptions opts = new 
ReadOptions().setIterateUpperBound(upperBound);
+                    RocksIterator it = db.newIterator(cf, opts)
+            ) {
+                ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+                dataKeyBuffer.clear();
+
+                // Process the element in data cf that triggered the addition 
to the GC queue.
+                boolean proceed = checkHasNewerRowAndRemoveTombstone(it, 
batch, dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+                if (!proceed) {
+                    // No further processing required.
+                    return null;
+                }
+
+                // Process the element in data cf that should be garbage 
collected.
+                proceed = checkHasRowForGc(it, dataKeyBuffer, gcElementRowId);
+
+                if (!proceed) {
+                    // No further processing required.
+                    return null;
+                }
+
+                // At this point there's definitely a value that needs to be 
garbage collected in the iterator.
+                byte[] valueBytes = it.value();
+
+                var row = new 
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+                TableRowAndRowId retVal = new TableRowAndRowId(row, 
gcElementRowId);
+
+                // Delete the row from the data cf.
+                batch.delete(cf, dataKeyBuffer);
+
+                return retVal;
+            }
+        }
+    }
+
+    /**
+     * Processes the entry that triggered adding row id to garbage collector's 
queue.
+     * <br>
+     * There might already be no row in the data column family, because GC can 
be run in parallel.
+     * If there is no row in the data column family, returns {@code false} as 
no further processing is required.
+     * if there is a row and this entry is a tombstone, removes tombstone.
+     *
+     * @param it RocksDB data column family iterator.
+     * @param batch Write batch.
+     * @param dataKeyBuffer Buffer for the data column family key.
+     * @param gcElementRowId Row id of the element from the GC queue/
+     * @return {@code true} if further processing by garbage collector is 
needed.
+     */
+    private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it, 
WriteBatchWithIndex batch, ByteBuffer dataKeyBuffer,
+            RowId gcElementRowId, HybridTimestamp gcElementTimestamp) throws 
RocksDBException {
+        ColumnFamilyHandle cf = helper.cf;

Review Comment:
   Let's rename to `partCf`



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in 
this module.
+ */
+class GarbageCollector {
+    /**
+     * Empty direct byte buffer. Note that allocating memory of size 0 is UB, 
so java actually allocates
+     * a 1-byte space. Be sure not to use this buffer for actual reading or 
writing.
+     * In this instance it is only used for RocksDB to get the size of the 
entry without copying the entry into the buffer.
+     */
+    private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+    /** Garbage collector's queue key's timestamp offset. */
+    private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+    /** Garbage collector's queue key's row id offset. */
+    private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET + 
HYBRID_TIMESTAMP_SIZE;
+
+    /** Garbage collector's queue key's size. */
+    private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER = 
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Helper for the rocksdb partition. */
+    private final Helper helper;
+
+    GarbageCollector(Helper helper) {
+        this.helper = helper;
+    }
+
+    /**
+     * Tries adding a row to the GC queue. We put new row's timestamp, because 
we can remove previous row only if both this row's
+     * and previous row's timestamps are below the watermark.
+     * Returns {@code true} if new value and previous value are both 
tombstones.
+     *
+     * @param writeBatch Write batch.
+     * @param rowId Row id.
+     * @param timestamp New row's timestamp.
+     * @param isNewValueTombstone If new row is a tombstone.
+     * @return {@code true} if new value and previous value are both 
tombstones.
+     * @throws RocksDBException If failed.
+     */
+    boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, 
HybridTimestamp timestamp, boolean isNewValueTombstone)
+            throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;
+
+        boolean newAndPrevTombstones = false;
+
+        // Try find previous value for the row id.
+        ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+        keyBuffer.clear();
+
+        helper.putDataKey(keyBuffer, rowId, timestamp);
+
+        try (
+                Slice upperBound = new Slice(helper.partitionEndPrefix());
+                ReadOptions readOpts = new 
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+                RocksIterator it = db.newIterator(cf, readOpts)
+        ) {
+            it.seek(keyBuffer);
+
+            if (invalid(it)) {
+                return false;
+            }
+
+            keyBuffer.clear();
+
+            int keyLen = it.key(keyBuffer);
+
+            RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+            if (readRowId.equals(rowId)) {
+                // Found previous value.
+                assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+                if (isNewValueTombstone) {
+                    // If new value is a tombstone, lets check if previous 
value was also a tombstone.
+                    int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+                    newAndPrevTombstones = valueSize == 0;
+                }
+
+                if (!newAndPrevTombstones) {
+                    keyBuffer.clear();
+
+                    helper.putGcKey(keyBuffer, rowId, timestamp);
+
+                    writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+                }
+            }
+        }
+
+        return newAndPrevTombstones;
+    }
+
+    /**
+     * Polls an element for vacuum. See {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+     *
+     * @param batch Write batch.
+     * @param lowWatermark Low watermark.
+     * @return Garbage collected element.
+     * @throws RocksDBException If failed to collect the garbage.
+     */
+    @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch, 
HybridTimestamp lowWatermark) throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle cf = helper.cf;
+
+        // We retrieve the first element of the GC queue and seek for it in 
the data CF.
+        // However, the element that we need to garbage collect is the next 
(older one) element.
+        // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
+        // If the next element exists, that should be the element that we want 
to garbage collect.
+        try (
+                var gcUpperBound = new Slice(helper.partitionEndPrefix());
+                ReadOptions gcOpts = new 
ReadOptions().setIterateUpperBound(gcUpperBound);
+                RocksIterator gcIt = db.newIterator(gc, gcOpts)
+        ) {
+            gcIt.seek(helper.partitionStartPrefix());
+
+            if (invalid(gcIt)) {
+                // GC queue is empty.
+                return null;
+            }
+
+            ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+            gcKeyBuffer.clear();
+
+            gcIt.key(gcKeyBuffer);
+
+            HybridTimestamp gcElementTimestamp = 
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+            if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+                // No elements to garbage collect.
+                return null;
+            }
+
+            RowId gcElementRowId = helper.getRowId(gcKeyBuffer, 
GC_KEY_ROW_ID_OFFSET);
+
+            // Delete element from the GC queue.
+            batch.delete(gc, gcKeyBuffer);
+
+            try (
+                    var upperBound = new Slice(helper.partitionEndPrefix());
+                    ReadOptions opts = new 
ReadOptions().setIterateUpperBound(upperBound);
+                    RocksIterator it = db.newIterator(cf, opts)
+            ) {
+                ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+                dataKeyBuffer.clear();
+
+                // Process the element in data cf that triggered the addition 
to the GC queue.
+                boolean proceed = checkHasNewerRowAndRemoveTombstone(it, 
batch, dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+                if (!proceed) {
+                    // No further processing required.
+                    return null;
+                }
+
+                // Process the element in data cf that should be garbage 
collected.
+                proceed = checkHasRowForGc(it, dataKeyBuffer, gcElementRowId);
+
+                if (!proceed) {
+                    // No further processing required.
+                    return null;
+                }
+
+                // At this point there's definitely a value that needs to be 
garbage collected in the iterator.
+                byte[] valueBytes = it.value();
+
+                var row = new 
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+                TableRowAndRowId retVal = new TableRowAndRowId(row, 
gcElementRowId);
+
+                // Delete the row from the data cf.
+                batch.delete(cf, dataKeyBuffer);
+
+                return retVal;
+            }
+        }
+    }
+
+    /**
+     * Processes the entry that triggered adding row id to garbage collector's 
queue.
+     * <br>
+     * There might already be no row in the data column family, because GC can 
be run in parallel.
+     * If there is no row in the data column family, returns {@code false} as 
no further processing is required.
+     * if there is a row and this entry is a tombstone, removes tombstone.
+     *
+     * @param it RocksDB data column family iterator.
+     * @param batch Write batch.
+     * @param dataKeyBuffer Buffer for the data column family key.
+     * @param gcElementRowId Row id of the element from the GC queue/
+     * @return {@code true} if further processing by garbage collector is 
needed.
+     */
+    private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it, 
WriteBatchWithIndex batch, ByteBuffer dataKeyBuffer,
+            RowId gcElementRowId, HybridTimestamp gcElementTimestamp) throws 
RocksDBException {
+        ColumnFamilyHandle cf = helper.cf;
+
+        // Set up the data key.
+        helper.putDataKey(dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+        // Seek to the row id and timestamp from the GC queue.
+        // Note that it doesn't mean that the element in this iterator has 
matching row id or even partition id.
+        it.seek(dataKeyBuffer);
+
+        boolean hasRowForGc = true;
+
+        if (invalid(it)) {
+            // There is no row for the GC queue element.
+            hasRowForGc = false;
+        } else {
+            dataKeyBuffer.clear();
+
+            it.key(dataKeyBuffer);
+
+            if (!helper.getRowId(dataKeyBuffer, 
ROW_ID_OFFSET).equals(gcElementRowId)) {
+                // There is no row for the GC queue element.
+                hasRowForGc = false;

Review Comment:
   Can be returned immediately. =)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to