sashapolo commented on code in PR #1619:
URL: https://github.com/apache/ignite-3/pull/1619#discussion_r1097186694


##########
modules/storage-rocksdb/docs/garbage-collection.md:
##########
@@ -0,0 +1,83 @@
+# Garbage Collection in the RocksDB partition storage
+
+## Garbage Collection queue
+
+We store garbage collector's queue in the RocksDB column family in the 
following
+format. The key:
+
+| Partition id | Timestamp                                 | Row id         |
+|--------------|-------------------------------------------|----------------|
+| 2-byte       | 12-byte (8-byte physical, 4-byte logical) | 16-byte (uuid) |
+
+The value is not stored, as we only need the key. We can make row id the value,
+because for the ascending order processing of the queue we only need the 
timestamp,
+however, multiple row ids can have same timestamp, so making row id a value 
requires storing a list of
+row ids, hence the commit in this implementation of the storage becomes more 
sophisticated and, probably,
+less performant.
+
+Each time a row is being committed to the storage, we perform a check whether

Review Comment:
   `to the partition storage`?



##########
modules/storage-rocksdb/docs/garbage-collection.md:
##########
@@ -0,0 +1,83 @@
+# Garbage Collection in the RocksDB partition storage

Review Comment:
   Maybe we should put this document in a `tech-notes` folder, similar to 
`ignite-table` module



##########
modules/storage-rocksdb/docs/garbage-collection.md:
##########
@@ -0,0 +1,83 @@
+# Garbage Collection in the RocksDB partition storage
+
+## Garbage Collection queue
+
+We store garbage collector's queue in the RocksDB column family in the 
following
+format. The key:
+
+| Partition id | Timestamp                                 | Row id         |
+|--------------|-------------------------------------------|----------------|
+| 2-byte       | 12-byte (8-byte physical, 4-byte logical) | 16-byte (uuid) |
+
+The value is not stored, as we only need the key. We can make row id the value,
+because for the ascending order processing of the queue we only need the 
timestamp,
+however, multiple row ids can have same timestamp, so making row id a value 
requires storing a list of
+row ids, hence the commit in this implementation of the storage becomes more 
sophisticated and, probably,
+less performant.
+
+Each time a row is being committed to the storage, we perform a check whether
+there is already a value for this row. If there is one and both it and new 
version are not tombstones, we put
+new commit's timestamp and row id into the GC queue. To understand why we only 
put new value's timestamp
+please refer to the Garbage Collection 
[algorithm](#garbage-collection-algorithm).  
+The queue is updated along with the data column family in a single batch and 
is destroyed when the storage
+is being cleared or destroyed.
+
+## Garbage Collection algorithm
+
+It's important to understand when we actually need to perform garbage 
collection.   
+
+Consider the following example:  
+*Note that **Record number** is a hypothetical value that helps referring to 
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp |
+|---------------|--------|-----------|
+| 1             | Foo    | 1         |
+| 2             | Foo    | 10        |
+
+In this case, we can only remove record 1 if the low watermark is 10 or 
higher. If watermark is at 9, 

Review Comment:
   This "low watermark" term comes out of nowhere, please add a short 
description of what that is (for example "a timestamp that is guaranteed to 
have no transactions running")



##########
modules/storage-rocksdb/docs/garbage-collection.md:
##########
@@ -0,0 +1,83 @@
+# Garbage Collection in the RocksDB partition storage
+
+## Garbage Collection queue
+
+We store garbage collector's queue in the RocksDB column family in the 
following
+format. The key:
+
+| Partition id | Timestamp                                 | Row id         |
+|--------------|-------------------------------------------|----------------|
+| 2-byte       | 12-byte (8-byte physical, 4-byte logical) | 16-byte (uuid) |
+
+The value is not stored, as we only need the key. We can make row id the value,
+because for the ascending order processing of the queue we only need the 
timestamp,
+however, multiple row ids can have same timestamp, so making row id a value 
requires storing a list of
+row ids, hence the commit in this implementation of the storage becomes more 
sophisticated and, probably,
+less performant.
+
+Each time a row is being committed to the storage, we perform a check whether
+there is already a value for this row. If there is one and both it and new 
version are not tombstones, we put
+new commit's timestamp and row id into the GC queue. To understand why we only 
put new value's timestamp
+please refer to the Garbage Collection 
[algorithm](#garbage-collection-algorithm).  
+The queue is updated along with the data column family in a single batch and 
is destroyed when the storage
+is being cleared or destroyed.
+
+## Garbage Collection algorithm
+
+It's important to understand when we actually need to perform garbage 
collection.   

Review Comment:
   It would also be nice to specify what Garbage Collection actually is



##########
modules/storage-rocksdb/docs/garbage-collection.md:
##########
@@ -0,0 +1,83 @@
+# Garbage Collection in the RocksDB partition storage
+
+## Garbage Collection queue

Review Comment:
   I would suggest to swap this section with the GC algorithm section



##########
modules/storage-rocksdb/docs/garbage-collection.md:
##########
@@ -0,0 +1,83 @@
+# Garbage Collection in the RocksDB partition storage
+
+## Garbage Collection queue
+
+We store garbage collector's queue in the RocksDB column family in the 
following
+format. The key:
+
+| Partition id | Timestamp                                 | Row id         |
+|--------------|-------------------------------------------|----------------|
+| 2-byte       | 12-byte (8-byte physical, 4-byte logical) | 16-byte (uuid) |
+
+The value is not stored, as we only need the key. We can make row id the value,
+because for the ascending order processing of the queue we only need the 
timestamp,
+however, multiple row ids can have same timestamp, so making row id a value 
requires storing a list of
+row ids, hence the commit in this implementation of the storage becomes more 
sophisticated and, probably,
+less performant.
+
+Each time a row is being committed to the storage, we perform a check whether
+there is already a value for this row. If there is one and both it and new 
version are not tombstones, we put
+new commit's timestamp and row id into the GC queue. To understand why we only 
put new value's timestamp
+please refer to the Garbage Collection 
[algorithm](#garbage-collection-algorithm).  
+The queue is updated along with the data column family in a single batch and 
is destroyed when the storage
+is being cleared or destroyed.
+
+## Garbage Collection algorithm
+
+It's important to understand when we actually need to perform garbage 
collection.   
+
+Consider the following example:  
+*Note that **Record number** is a hypothetical value that helps referring to 
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp |
+|---------------|--------|-----------|
+| 1             | Foo    | 1         |
+| 2             | Foo    | 10        |
+
+In this case, we can only remove record 1 if the low watermark is 10 or 
higher. If watermark is at 9, 
+then it means that there can still occur a transaction with a 9 timestamp, 
which means that the record number 1 
+is still needed.  
+This is why we only add a new entry into the GC queue if there is a previous 
version and that is 
+why the timestamp of the entry in the GC queue is of the next version.
+
+Let's review another example:  
+*Note that **Is tombstone** is a hypothetical value that helps referring to 
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp | Is tombstone |
+|---------------|--------|-----------|--------------|
+| 1             | Foo    | 1         | False        |
+| 2             | Foo    | 10        | True         |
+| 3             | Foo    | 20        | False        |
+
+Everything said before stands for this example, however we can also remove the 
record number 2, because it is 
+a tombstone. So if the watermark is higher or equal to 10 and there is a 
transaction with timestamp higher than
+10, then we either get an empty value if timestamp is less than 20, or we get 
a newer version.
+
+So to sum up, the algorithm looks like this:
+1. Get an element from the GC queue, exiting if the queue is empty
+2. Add that element to the batch for removal from RocksDB
+3. Find an element in the data column family that corresponds to the element 
of GC queue. If a value doesn't exist, exit

Review Comment:
   `If a value doesn't exist, exit` - Please describe how this can happen
   
   UPD: I can see that you describe this situation below, maybe it's worth 
moving here



##########
modules/storage-rocksdb/docs/garbage-collection.md:
##########
@@ -0,0 +1,83 @@
+# Garbage Collection in the RocksDB partition storage
+
+## Garbage Collection queue
+
+We store garbage collector's queue in the RocksDB column family in the 
following
+format. The key:
+
+| Partition id | Timestamp                                 | Row id         |
+|--------------|-------------------------------------------|----------------|
+| 2-byte       | 12-byte (8-byte physical, 4-byte logical) | 16-byte (uuid) |
+
+The value is not stored, as we only need the key. We can make row id the value,
+because for the ascending order processing of the queue we only need the 
timestamp,
+however, multiple row ids can have same timestamp, so making row id a value 
requires storing a list of
+row ids, hence the commit in this implementation of the storage becomes more 
sophisticated and, probably,
+less performant.
+
+Each time a row is being committed to the storage, we perform a check whether
+there is already a value for this row. If there is one and both it and new 
version are not tombstones, we put
+new commit's timestamp and row id into the GC queue. To understand why we only 
put new value's timestamp
+please refer to the Garbage Collection 
[algorithm](#garbage-collection-algorithm).  
+The queue is updated along with the data column family in a single batch and 
is destroyed when the storage
+is being cleared or destroyed.
+
+## Garbage Collection algorithm
+
+It's important to understand when we actually need to perform garbage 
collection.   
+
+Consider the following example:  
+*Note that **Record number** is a hypothetical value that helps referring to 
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp |
+|---------------|--------|-----------|
+| 1             | Foo    | 1         |
+| 2             | Foo    | 10        |
+
+In this case, we can only remove record 1 if the low watermark is 10 or 
higher. If watermark is at 9, 
+then it means that there can still occur a transaction with a 9 timestamp, 
which means that the record number 1 
+is still needed.  
+This is why we only add a new entry into the GC queue if there is a previous 
version and that is 
+why the timestamp of the entry in the GC queue is of the next version.
+
+Let's review another example:  
+*Note that **Is tombstone** is a hypothetical value that helps referring to 
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp | Is tombstone |
+|---------------|--------|-----------|--------------|
+| 1             | Foo    | 1         | False        |
+| 2             | Foo    | 10        | True         |
+| 3             | Foo    | 20        | False        |
+
+Everything said before stands for this example, however we can also remove the 
record number 2, because it is 
+a tombstone. So if the watermark is higher or equal to 10 and there is a 
transaction with timestamp higher than
+10, then we either get an empty value if timestamp is less than 20, or we get 
a newer version.
+
+So to sum up, the algorithm looks like this:
+1. Get an element from the GC queue, exiting if the queue is empty
+2. Add that element to the batch for removal from RocksDB

Review Comment:
   Do we always do that? Doesn't that depend on the watermark value?



##########
modules/storage-rocksdb/docs/garbage-collection.md:
##########
@@ -0,0 +1,83 @@
+# Garbage Collection in the RocksDB partition storage
+
+## Garbage Collection queue
+
+We store garbage collector's queue in the RocksDB column family in the 
following
+format. The key:
+
+| Partition id | Timestamp                                 | Row id         |
+|--------------|-------------------------------------------|----------------|
+| 2-byte       | 12-byte (8-byte physical, 4-byte logical) | 16-byte (uuid) |
+
+The value is not stored, as we only need the key. We can make row id the value,
+because for the ascending order processing of the queue we only need the 
timestamp,
+however, multiple row ids can have same timestamp, so making row id a value 
requires storing a list of
+row ids, hence the commit in this implementation of the storage becomes more 
sophisticated and, probably,
+less performant.
+
+Each time a row is being committed to the storage, we perform a check whether
+there is already a value for this row. If there is one and both it and new 
version are not tombstones, we put
+new commit's timestamp and row id into the GC queue. To understand why we only 
put new value's timestamp
+please refer to the Garbage Collection 
[algorithm](#garbage-collection-algorithm).  
+The queue is updated along with the data column family in a single batch and 
is destroyed when the storage
+is being cleared or destroyed.
+
+## Garbage Collection algorithm
+
+It's important to understand when we actually need to perform garbage 
collection.   
+
+Consider the following example:  
+*Note that **Record number** is a hypothetical value that helps referring to 
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp |
+|---------------|--------|-----------|
+| 1             | Foo    | 1         |
+| 2             | Foo    | 10        |
+
+In this case, we can only remove record 1 if the low watermark is 10 or 
higher. If watermark is at 9, 
+then it means that there can still occur a transaction with a 9 timestamp, 
which means that the record number 1 
+is still needed.  
+This is why we only add a new entry into the GC queue if there is a previous 
version and that is 
+why the timestamp of the entry in the GC queue is of the next version.
+
+Let's review another example:  
+*Note that **Is tombstone** is a hypothetical value that helps referring to 
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp | Is tombstone |
+|---------------|--------|-----------|--------------|
+| 1             | Foo    | 1         | False        |
+| 2             | Foo    | 10        | True         |
+| 3             | Foo    | 20        | False        |
+
+Everything said before stands for this example, however we can also remove the 
record number 2, because it is 
+a tombstone. So if the watermark is higher or equal to 10 and there is a 
transaction with timestamp higher than
+10, then we either get an empty value if timestamp is less than 20, or we get 
a newer version.
+
+So to sum up, the algorithm looks like this:
+1. Get an element from the GC queue, exiting if the queue is empty
+2. Add that element to the batch for removal from RocksDB
+3. Find an element in the data column family that corresponds to the element 
of GC queue. If a value doesn't exist, exit
+4. Test if it is a tombstone, if yes, add it to the batch for removal

Review Comment:
   Isn't it already added to the batch for removal on step 2?



##########
modules/storage-rocksdb/docs/garbage-collection.md:
##########
@@ -0,0 +1,83 @@
+# Garbage Collection in the RocksDB partition storage
+
+## Garbage Collection queue
+
+We store garbage collector's queue in the RocksDB column family in the 
following
+format. The key:
+
+| Partition id | Timestamp                                 | Row id         |
+|--------------|-------------------------------------------|----------------|
+| 2-byte       | 12-byte (8-byte physical, 4-byte logical) | 16-byte (uuid) |
+
+The value is not stored, as we only need the key. We can make row id the value,
+because for the ascending order processing of the queue we only need the 
timestamp,
+however, multiple row ids can have same timestamp, so making row id a value 
requires storing a list of
+row ids, hence the commit in this implementation of the storage becomes more 
sophisticated and, probably,
+less performant.
+
+Each time a row is being committed to the storage, we perform a check whether
+there is already a value for this row. If there is one and both it and new 
version are not tombstones, we put
+new commit's timestamp and row id into the GC queue. To understand why we only 
put new value's timestamp
+please refer to the Garbage Collection 
[algorithm](#garbage-collection-algorithm).  
+The queue is updated along with the data column family in a single batch and 
is destroyed when the storage
+is being cleared or destroyed.
+
+## Garbage Collection algorithm
+
+It's important to understand when we actually need to perform garbage 
collection.   
+
+Consider the following example:  
+*Note that **Record number** is a hypothetical value that helps referring to 
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp |
+|---------------|--------|-----------|
+| 1             | Foo    | 1         |
+| 2             | Foo    | 10        |
+
+In this case, we can only remove record 1 if the low watermark is 10 or 
higher. If watermark is at 9, 
+then it means that there can still occur a transaction with a 9 timestamp, 
which means that the record number 1 
+is still needed.  
+This is why we only add a new entry into the GC queue if there is a previous 
version and that is 
+why the timestamp of the entry in the GC queue is of the next version.
+
+Let's review another example:  
+*Note that **Is tombstone** is a hypothetical value that helps referring to 
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp | Is tombstone |
+|---------------|--------|-----------|--------------|
+| 1             | Foo    | 1         | False        |
+| 2             | Foo    | 10        | True         |
+| 3             | Foo    | 20        | False        |
+
+Everything said before stands for this example, however we can also remove the 
record number 2, because it is 
+a tombstone. So if the watermark is higher or equal to 10 and there is a 
transaction with timestamp higher than
+10, then we either get an empty value if timestamp is less than 20, or we get 
a newer version.
+
+So to sum up, the algorithm looks like this:
+1. Get an element from the GC queue, exiting if the queue is empty
+2. Add that element to the batch for removal from RocksDB
+3. Find an element in the data column family that corresponds to the element 
of GC queue. If a value doesn't exist, exit
+4. Test if it is a tombstone, if yes, add it to the batch for removal
+5. Seek for a previous version. If it doesn't exist, exit
+6. Add that previous version to the batch for removal
+
+You might notice that there are two cases when we can exit prematurely, apart 
from queue being empty.  
+We might have not found a value that triggered the addition to the GC queue 
and/or the value that needs to be
+garbage collected because GC can run in parallel. So if two parallel threads 
got the same element from the 
+queue, one of them might have already finished the GC and removed the elements.
+
+## Storage implications
+
+To save space we don't store consecutive tombstones.

Review Comment:
   Don't store where? In the Partition Storage?



##########
modules/storage-rocksdb/docs/garbage-collection.md:
##########
@@ -0,0 +1,83 @@
+# Garbage Collection in the RocksDB partition storage
+
+## Garbage Collection queue
+
+We store garbage collector's queue in the RocksDB column family in the 
following
+format. The key:
+
+| Partition id | Timestamp                                 | Row id         |
+|--------------|-------------------------------------------|----------------|
+| 2-byte       | 12-byte (8-byte physical, 4-byte logical) | 16-byte (uuid) |
+
+The value is not stored, as we only need the key. We can make row id the value,
+because for the ascending order processing of the queue we only need the 
timestamp,
+however, multiple row ids can have same timestamp, so making row id a value 
requires storing a list of
+row ids, hence the commit in this implementation of the storage becomes more 
sophisticated and, probably,
+less performant.
+
+Each time a row is being committed to the storage, we perform a check whether
+there is already a value for this row. If there is one and both it and new 
version are not tombstones, we put
+new commit's timestamp and row id into the GC queue. To understand why we only 
put new value's timestamp
+please refer to the Garbage Collection 
[algorithm](#garbage-collection-algorithm).  
+The queue is updated along with the data column family in a single batch and 
is destroyed when the storage
+is being cleared or destroyed.
+
+## Garbage Collection algorithm
+
+It's important to understand when we actually need to perform garbage 
collection.   
+
+Consider the following example:  
+*Note that **Record number** is a hypothetical value that helps referring to 
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp |
+|---------------|--------|-----------|
+| 1             | Foo    | 1         |
+| 2             | Foo    | 10        |
+
+In this case, we can only remove record 1 if the low watermark is 10 or 
higher. If watermark is at 9, 
+then it means that there can still occur a transaction with a 9 timestamp, 
which means that the record number 1 
+is still needed.  
+This is why we only add a new entry into the GC queue if there is a previous 
version and that is 
+why the timestamp of the entry in the GC queue is of the next version.
+
+Let's review another example:  
+*Note that **Is tombstone** is a hypothetical value that helps referring to 
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp | Is tombstone |
+|---------------|--------|-----------|--------------|
+| 1             | Foo    | 1         | False        |
+| 2             | Foo    | 10        | True         |
+| 3             | Foo    | 20        | False        |
+
+Everything said before stands for this example, however we can also remove the 
record number 2, because it is 
+a tombstone. So if the watermark is higher or equal to 10 and there is a 
transaction with timestamp higher than
+10, then we either get an empty value if timestamp is less than 20, or we get 
a newer version.
+
+So to sum up, the algorithm looks like this:
+1. Get an element from the GC queue, exiting if the queue is empty
+2. Add that element to the batch for removal from RocksDB
+3. Find an element in the data column family that corresponds to the element 
of GC queue. If a value doesn't exist, exit
+4. Test if it is a tombstone, if yes, add it to the batch for removal
+5. Seek for a previous version. If it doesn't exist, exit
+6. Add that previous version to the batch for removal
+
+You might notice that there are two cases when we can exit prematurely, apart 
from queue being empty.  
+We might have not found a value that triggered the addition to the GC queue 
and/or the value that needs to be
+garbage collected because GC can run in parallel. So if two parallel threads 
got the same element from the 
+queue, one of them might have already finished the GC and removed the elements.
+
+## Storage implications
+
+To save space we don't store consecutive tombstones.
+For example, if a user removes a certain row twice
+
+```
+storage.put(key,value);
+storage.delete(key);
+storage.delete(key);
+```
+
+There should be one row with a value and one row with a tombstone, the 
tombstone being
+the most recent one. This also simplifies the processing of the garbage 
collection queue.

Review Comment:
   How does this work? Imagine this exact situation:
   Operation Key Timestamp
   PUT "foo" 1
   DELETE "foo" 2
   DELETE "foo" 3
   
   If we only store the most recent tombstone (with timestamp 3 in this 
example), then what would a transaction with timestamp 2 see?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.Slice;
+
+/** Helper for the partition data. */
+class PartitionDataHelper implements ManuallyCloseable {
+    /** Commit partition id size. */
+    static final int PARTITION_ID_SIZE = Short.BYTES;
+
+    /** UUID size in bytes. */
+    static final int ROW_ID_SIZE = 2 * Long.BYTES;
+
+    /** Position of row id inside the key. */
+    static final int ROW_ID_OFFSET = Short.BYTES;
+
+    /** Size of the key without timestamp. */
+    public static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;

Review Comment:
   ```suggestion
       static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.Slice;
+
+/** Helper for the partition data. */
+class PartitionDataHelper implements ManuallyCloseable {
+    /** Commit partition id size. */
+    static final int PARTITION_ID_SIZE = Short.BYTES;

Review Comment:
   A bunch of these constants are only used in `RocksDbMvPartitionStorage`, 
what's the point of having them here?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.Slice;
+
+/** Helper for the partition data. */
+class PartitionDataHelper implements ManuallyCloseable {
+    /** Commit partition id size. */
+    static final int PARTITION_ID_SIZE = Short.BYTES;
+
+    /** UUID size in bytes. */
+    static final int ROW_ID_SIZE = 2 * Long.BYTES;
+
+    /** Position of row id inside the key. */
+    static final int ROW_ID_OFFSET = Short.BYTES;
+
+    /** Size of the key without timestamp. */
+    public static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Maximum size of the data key. */
+    static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE + HYBRID_TIMESTAMP_SIZE;
+
+    /** Transaction id size (part of the transaction state). */
+    static final int TX_ID_SIZE = 2 * Long.BYTES;
+
+    /** Commit table id size (part of the transaction state). */
+    static final int TABLE_ID_SIZE = 2 * Long.BYTES;
+
+    /** Size of the value header (transaction state). */
+    static final int VALUE_HEADER_SIZE = TX_ID_SIZE + TABLE_ID_SIZE + 
PARTITION_ID_SIZE;
+
+    /** Transaction id offset. */
+    static final int TX_ID_OFFSET = 0;
+
+    /** Commit table id offset. */
+    static final int TABLE_ID_OFFSET = TX_ID_SIZE;
+
+    /** Commit partition id offset. */
+    static final int PARTITION_ID_OFFSET = TABLE_ID_OFFSET + TABLE_ID_SIZE;
+
+    /** Value offset (if transaction state is present). */
+    static final int VALUE_OFFSET = VALUE_HEADER_SIZE;
+
+    static final ByteOrder TABLE_ROW_BYTE_ORDER = TableRow.ORDER;
+
+    static final ByteOrder KEY_BYTE_ORDER = ByteOrder.BIG_ENDIAN;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER = withInitial(() -> 
allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Partition id. */
+    private final int partitionId;
+
+    /** Upper bound for scans and reads. */

Review Comment:
   Scans and reads are kinda the same thing)



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.Slice;
+
+/** Helper for the partition data. */
+class PartitionDataHelper implements ManuallyCloseable {
+    /** Commit partition id size. */
+    static final int PARTITION_ID_SIZE = Short.BYTES;
+
+    /** UUID size in bytes. */
+    static final int ROW_ID_SIZE = 2 * Long.BYTES;
+
+    /** Position of row id inside the key. */
+    static final int ROW_ID_OFFSET = Short.BYTES;
+
+    /** Size of the key without timestamp. */
+    public static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Maximum size of the data key. */
+    static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE + HYBRID_TIMESTAMP_SIZE;
+
+    /** Transaction id size (part of the transaction state). */
+    static final int TX_ID_SIZE = 2 * Long.BYTES;
+
+    /** Commit table id size (part of the transaction state). */
+    static final int TABLE_ID_SIZE = 2 * Long.BYTES;
+
+    /** Size of the value header (transaction state). */
+    static final int VALUE_HEADER_SIZE = TX_ID_SIZE + TABLE_ID_SIZE + 
PARTITION_ID_SIZE;
+
+    /** Transaction id offset. */
+    static final int TX_ID_OFFSET = 0;
+
+    /** Commit table id offset. */
+    static final int TABLE_ID_OFFSET = TX_ID_SIZE;
+
+    /** Commit partition id offset. */
+    static final int PARTITION_ID_OFFSET = TABLE_ID_OFFSET + TABLE_ID_SIZE;
+
+    /** Value offset (if transaction state is present). */
+    static final int VALUE_OFFSET = VALUE_HEADER_SIZE;
+
+    static final ByteOrder TABLE_ROW_BYTE_ORDER = TableRow.ORDER;
+
+    static final ByteOrder KEY_BYTE_ORDER = ByteOrder.BIG_ENDIAN;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER = withInitial(() -> 
allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Partition id. */
+    private final int partitionId;
+
+    /** Upper bound for scans and reads. */
+    private final Slice upperBound;
+
+    /** RocksDB instance. */
+    final RocksDB db;

Review Comment:
   This field is only used by `GarbageCollector`, why do you need it here?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.KEY_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MAX_KEY_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MV_KEY_BUFFER;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.PARTITION_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_OFFSET;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TABLE_ROW_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampNatural;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in 
this module.
+ */
+class GarbageCollector {
+    /**
+     * Empty direct byte buffer. Note that allocating memory of size 0 is UB, 
so java actually allocates
+     * a 1-byte space. Be sure not to use this buffer for actual reading or 
writing.
+     * In this instance it is only used for RocksDB to get the size of the 
entry without copying the entry into the buffer.
+     */
+    private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+    /** Garbage collector's queue key's timestamp offset. */
+    private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+    /** Garbage collector's queue key's row id offset. */
+    private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET + 
HYBRID_TIMESTAMP_SIZE;
+
+    /** Garbage collector's queue key's size. */
+    private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER = 
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Helper for the rocksdb partition. */
+    private final PartitionDataHelper helper;
+
+    GarbageCollector(PartitionDataHelper helper) {
+        this.helper = helper;
+    }
+
+    /**
+     * Tries adding a row to the GC queue. We put new row's timestamp, because 
we can remove previous row only if both this row's
+     * and previous row's timestamps are below the watermark.
+     * Returns {@code true} if new value and previous value are both 
tombstones.
+     *
+     * @param writeBatch Write batch.
+     * @param rowId Row id.
+     * @param timestamp New row's timestamp.
+     * @param isNewValueTombstone If new row is a tombstone.
+     * @return {@code true} if new value and previous value are both 
tombstones.
+     * @throws RocksDBException If failed.
+     */
+    boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, 
HybridTimestamp timestamp, boolean isNewValueTombstone)
+            throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle partCf = helper.partCf;
+
+        boolean newAndPrevTombstones = false;
+
+        // Try find previous value for the row id.
+        ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+        keyBuffer.clear();
+
+        helper.putDataKey(keyBuffer, rowId, timestamp);
+
+        try (RocksIterator it = db.newIterator(partCf, 
helper.upperBoundReadOpts)) {
+            it.seek(keyBuffer);
+
+            if (invalid(it)) {
+                return false;
+            }
+
+            keyBuffer.clear();
+
+            int keyLen = it.key(keyBuffer);
+
+            RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+            if (readRowId.equals(rowId)) {
+                // Found previous value.
+                assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+                if (isNewValueTombstone) {
+                    // If new value is a tombstone, lets check if previous 
value was also a tombstone.
+                    int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+                    newAndPrevTombstones = valueSize == 0;
+                }
+
+                if (!newAndPrevTombstones) {
+                    keyBuffer.clear();
+
+                    helper.putGcKey(keyBuffer, rowId, timestamp);
+
+                    writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+                }
+            }
+        }
+
+        return newAndPrevTombstones;
+    }
+
+    /**
+     * Polls an element for vacuum. See {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+     *
+     * @param batch Write batch.
+     * @param lowWatermark Low watermark.
+     * @return Garbage collected element.
+     * @throws RocksDBException If failed to collect the garbage.
+     */
+    @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch, 
HybridTimestamp lowWatermark) throws RocksDBException {
+        RocksDB db = helper.db;
+        ColumnFamilyHandle gc = helper.gc;
+        ColumnFamilyHandle partCf = helper.partCf;
+
+        // We retrieve the first element of the GC queue and seek for it in 
the data CF.
+        // However, the element that we need to garbage collect is the next 
(older one) element.
+        // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
+        // If the next element exists, that should be the element that we want 
to garbage collect.
+        try (RocksIterator gcIt = db.newIterator(gc, 
helper.upperBoundReadOpts)) {
+            gcIt.seek(helper.partitionStartPrefix());
+
+            if (invalid(gcIt)) {
+                // GC queue is empty.
+                return null;
+            }
+
+            ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+            gcKeyBuffer.clear();
+
+            gcIt.key(gcKeyBuffer);
+
+            HybridTimestamp gcElementTimestamp = 
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+            if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+                // No elements to garbage collect.
+                return null;
+            }
+
+            RowId gcElementRowId = helper.getRowId(gcKeyBuffer, 
GC_KEY_ROW_ID_OFFSET);
+
+            // Delete element from the GC queue.
+            batch.delete(gc, gcKeyBuffer);
+
+            try (RocksIterator it = db.newIterator(partCf, 
helper.upperBoundReadOpts)) {
+                ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+                dataKeyBuffer.clear();
+
+                // Process the element in data cf that triggered the addition 
to the GC queue.
+                boolean proceed = checkHasNewerRowAndRemoveTombstone(it, 
batch, dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+                if (!proceed) {
+                    // No further processing required.
+                    return null;
+                }
+
+                // Process the element in data cf that should be garbage 
collected.
+                proceed = checkHasRowForGc(it, dataKeyBuffer, gcElementRowId);
+
+                if (!proceed) {
+                    // No further processing required.
+                    return null;
+                }
+
+                // At this point there's definitely a value that needs to be 
garbage collected in the iterator.
+                byte[] valueBytes = it.value();
+
+                var row = new 
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+                TableRowAndRowId retVal = new TableRowAndRowId(row, 
gcElementRowId);
+
+                // Delete the row from the data cf.
+                batch.delete(partCf, dataKeyBuffer);
+
+                return retVal;
+            }
+        }
+    }
+
+    /**
+     * Processes the entry that triggered adding row id to garbage collector's 
queue.
+     * <br>
+     * There might already be no row in the data column family, because GC can 
be run in parallel.
+     * If there is no row in the data column family, returns {@code false} as 
no further processing is required.
+     * if there is a row and this entry is a tombstone, removes tombstone.
+     *
+     * @param it RocksDB data column family iterator.
+     * @param batch Write batch.
+     * @param dataKeyBuffer Buffer for the data column family key.
+     * @param gcElementRowId Row id of the element from the GC queue/
+     * @return {@code true} if further processing by garbage collector is 
needed.
+     */
+    private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it, 
WriteBatchWithIndex batch, ByteBuffer dataKeyBuffer,

Review Comment:
   why do you need the `dataKeyBuffer` parameter if it's already a field?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.Slice;
+
+/** Helper for the partition data. */
+class PartitionDataHelper implements ManuallyCloseable {
+    /** Commit partition id size. */
+    static final int PARTITION_ID_SIZE = Short.BYTES;
+
+    /** UUID size in bytes. */
+    static final int ROW_ID_SIZE = 2 * Long.BYTES;
+
+    /** Position of row id inside the key. */
+    static final int ROW_ID_OFFSET = Short.BYTES;
+
+    /** Size of the key without timestamp. */
+    public static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Maximum size of the data key. */
+    static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE + HYBRID_TIMESTAMP_SIZE;
+
+    /** Transaction id size (part of the transaction state). */
+    static final int TX_ID_SIZE = 2 * Long.BYTES;
+
+    /** Commit table id size (part of the transaction state). */
+    static final int TABLE_ID_SIZE = 2 * Long.BYTES;
+
+    /** Size of the value header (transaction state). */
+    static final int VALUE_HEADER_SIZE = TX_ID_SIZE + TABLE_ID_SIZE + 
PARTITION_ID_SIZE;
+
+    /** Transaction id offset. */
+    static final int TX_ID_OFFSET = 0;
+
+    /** Commit table id offset. */
+    static final int TABLE_ID_OFFSET = TX_ID_SIZE;
+
+    /** Commit partition id offset. */
+    static final int PARTITION_ID_OFFSET = TABLE_ID_OFFSET + TABLE_ID_SIZE;
+
+    /** Value offset (if transaction state is present). */
+    static final int VALUE_OFFSET = VALUE_HEADER_SIZE;
+
+    static final ByteOrder TABLE_ROW_BYTE_ORDER = TableRow.ORDER;
+
+    static final ByteOrder KEY_BYTE_ORDER = ByteOrder.BIG_ENDIAN;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER = withInitial(() -> 
allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Partition id. */
+    private final int partitionId;
+
+    /** Upper bound for scans and reads. */
+    private final Slice upperBound;
+
+    /** RocksDB instance. */
+    final RocksDB db;
+
+    /** Partition data column family. */
+    final ColumnFamilyHandle partCf;
+
+    /** GC queue column family. */
+    final ColumnFamilyHandle gc;

Review Comment:
   Same question as above



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.KEY_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MAX_KEY_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.MV_KEY_BUFFER;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.PARTITION_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_OFFSET;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.ROW_ID_SIZE;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.TABLE_ROW_BYTE_ORDER;
+import static 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper.readTimestampNatural;
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in 
this module.
+ */
+class GarbageCollector {
+    /**
+     * Empty direct byte buffer. Note that allocating memory of size 0 is UB, 
so java actually allocates
+     * a 1-byte space. Be sure not to use this buffer for actual reading or 
writing.
+     * In this instance it is only used for RocksDB to get the size of the 
entry without copying the entry into the buffer.
+     */
+    private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+    /** Garbage collector's queue key's timestamp offset. */
+    private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+    /** Garbage collector's queue key's row id offset. */
+    private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET + 
HYBRID_TIMESTAMP_SIZE;
+
+    /** Garbage collector's queue key's size. */
+    private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+    /** Thread-local direct buffer instance to read keys from RocksDB. */
+    private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER = 
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+    /** Helper for the rocksdb partition. */
+    private final PartitionDataHelper helper;
+
+    GarbageCollector(PartitionDataHelper helper) {
+        this.helper = helper;
+    }
+
+    /**
+     * Tries adding a row to the GC queue. We put new row's timestamp, because 
we can remove previous row only if both this row's
+     * and previous row's timestamps are below the watermark.
+     * Returns {@code true} if new value and previous value are both 
tombstones.
+     *
+     * @param writeBatch Write batch.
+     * @param rowId Row id.
+     * @param timestamp New row's timestamp.
+     * @param isNewValueTombstone If new row is a tombstone.
+     * @return {@code true} if new value and previous value are both 
tombstones.
+     * @throws RocksDBException If failed.
+     */
+    boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, 
HybridTimestamp timestamp, boolean isNewValueTombstone)
+            throws RocksDBException {
+        RocksDB db = helper.db;

Review Comment:
   I think this stuff can simply be this class' fields



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to