tkalkirill commented on code in PR #1619:
URL: https://github.com/apache/ignite-3/pull/1619#discussion_r1096983649
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/TableRowAndRowId.java:
##########
@@ -24,7 +24,7 @@
* Wrapper that holds both {@link TableRow} and {@link RowId}. {@link
TableRow} is null for tombstones.
*/
public class TableRowAndRowId {
- /** Table row. */
+ /** Table row. {@code null} if tombstone. */
Review Comment:
It seems that we discussed that such a statement is not entirely true, and
we may have a situation that this is not a tombstone)
I propose to describe this in the documentation for the method.
##########
modules/storage-rocksdb/docs/garbage-collection.md:
##########
@@ -0,0 +1,83 @@
+# Garbage Collection in the RocksDB partition storage
+
+## Garbage Collection queue
+
+We store garbage collector's queue in the RocksDB column family in the
following
+format. The key:
+
+| Partition id | Timestamp | Row id |
+|--------------|-------------------------------------------|----------------|
+| 2-byte | 12-byte (8-byte physical, 4-byte logical) | 16-byte (uuid) |
+
+The value is not stored, as we only need the key. We can make row id the value,
+because for the ascending order processing of the queue we only need the
timestamp,
+however, multiple row ids can have same timestamp, so making row id a value
requires storing a list of
+row ids, hence the commit in this implementation of the storage becomes more
sophisticated and, probably,
+less performant.
+
+Each time a row is being committed to the storage, we perform a check whether
+there is already a value for this row. If there is one and both it and new
version are not tombstones, we put
+new commit's timestamp and row id into the GC queue. To understand why we only
put new value's timestamp
+please refer to the Garbage Collection
[algorithm](#garbage-collection-algorithm).
+The queue is updated along with the data column family in a single batch and
is destroyed when the storage
+is being cleared or destroyed.
+
+## Garbage Collection algorithm
+
+It's important to understand when we actually need to perform garbage
collection.
+
+Consider the following example:
+*Note that **Record number** is a hypothetical value that helps referring to
the specific entries, there
+is no such value in the storage.*
+
+| Record number | Row id | Timestamp |
+|---------------|--------|-----------|
+| 1 | Foo | 1 |
+| 2 | Foo | 10 |
+
+In this case, we can only remove record 1 if the low watermark is 10 or
higher. If watermark is at 9,
+then it means that there can still occur a transaction with a 9 timestamp,
which means that the record number 1
+is still needed.
Review Comment:
I think that if the LW is 9 then we only need the row with 10, right?
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Helper.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+
+/** Helper for the partition data. */
+class Helper {
Review Comment:
The name is too generic for me. Maybe rename it to something like
`PartitionDataHelper`?
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Helper.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+
+/** Helper for the partition data. */
+class Helper {
+ /** Commit partition id size. */
+ static final int PARTITION_ID_SIZE = Short.BYTES;
+
+ /** UUID size in bytes. */
+ static final int ROW_ID_SIZE = 2 * Long.BYTES;
+
+ /** Position of row id inside the key. */
+ static final int ROW_ID_OFFSET = Short.BYTES;
+
+ /** Size of the key without timestamp. */
+ static final int ROW_PREFIX_SIZE = ROW_ID_OFFSET + ROW_ID_SIZE;
+
+ /** Maximum size of the data key. */
+ static final int MAX_KEY_SIZE = ROW_PREFIX_SIZE + HYBRID_TIMESTAMP_SIZE;
+
+ /** Transaction id size (part of the transaction state). */
+ static final int TX_ID_SIZE = 2 * Long.BYTES;
+
+ /** Commit table id size (part of the transaction state). */
+ static final int TABLE_ID_SIZE = 2 * Long.BYTES;
+
+ /** Size of the value header (transaction state). */
+ static final int VALUE_HEADER_SIZE = TX_ID_SIZE + TABLE_ID_SIZE +
PARTITION_ID_SIZE;
+
+ /** Transaction id offset. */
+ static final int TX_ID_OFFSET = 0;
+
+ /** Commit table id offset. */
+ static final int TABLE_ID_OFFSET = TX_ID_SIZE;
+
+ /** Commit partition id offset. */
+ static final int PARTITION_ID_OFFSET = TABLE_ID_OFFSET + TABLE_ID_SIZE;
+
+ /** Value offset (if transaction state is present). */
+ static final int VALUE_OFFSET = VALUE_HEADER_SIZE;
+
+ static final ByteOrder TABLE_ROW_BYTE_ORDER = TableRow.ORDER;
+
+ static final ByteOrder KEY_BYTE_ORDER = ByteOrder.BIG_ENDIAN;
+
+ /** Thread-local direct buffer instance to read keys from RocksDB. */
+ static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER = withInitial(() ->
allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+ final int partitionId;
+
+ final RocksDB db;
+
+ final ColumnFamilyHandle cf;
Review Comment:
Let's rename it to `partCf`.
##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java:
##########
@@ -181,7 +158,7 @@ protected static List<IgniteBiTuple<TestKey, TestValue>>
drainToList(Cursor<Read
}
}
- protected final void assertRowMatches(TableRow rowUnderQuestion, TableRow
expectedRow) {
+ protected final void assertRowMatches(@Nullable TableRow rowUnderQuestion,
TableRow expectedRow) {
Review Comment:
Can we still remove the `@Nullable`? it confuses a little, like `null` is a
valid value, and we will fall here.
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in
this module.
+ */
+class GarbageCollector {
+ /**
+ * Empty direct byte buffer. Note that allocating memory of size 0 is UB,
so java actually allocates
+ * a 1-byte space. Be sure not to use this buffer for actual reading or
writing.
+ * In this instance it is only used for RocksDB to get the size of the
entry without copying the entry into the buffer.
+ */
+ private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+ /** Garbage collector's queue key's timestamp offset. */
+ private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+ /** Garbage collector's queue key's row id offset. */
+ private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET +
HYBRID_TIMESTAMP_SIZE;
+
+ /** Garbage collector's queue key's size. */
+ private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+ /** Thread-local direct buffer instance to read keys from RocksDB. */
+ private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER =
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+ /** Helper for the rocksdb partition. */
+ private final Helper helper;
+
+ GarbageCollector(Helper helper) {
+ this.helper = helper;
+ }
+
+ /**
+ * Tries adding a row to the GC queue. We put new row's timestamp, because
we can remove previous row only if both this row's
+ * and previous row's timestamps are below the watermark.
+ * Returns {@code true} if new value and previous value are both
tombstones.
+ *
+ * @param writeBatch Write batch.
+ * @param rowId Row id.
+ * @param timestamp New row's timestamp.
+ * @param isNewValueTombstone If new row is a tombstone.
+ * @return {@code true} if new value and previous value are both
tombstones.
+ * @throws RocksDBException If failed.
+ */
+ boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId,
HybridTimestamp timestamp, boolean isNewValueTombstone)
+ throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
Review Comment:
Let's rename to `partCf`
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1494,7 +1473,7 @@ void startRebalance(WriteBatch writeBatch) {
*
* @throws StorageRebalanceException If there was an error when aborting
the rebalance.
*/
- void abortReblance(WriteBatch writeBatch) {
Review Comment:
Lol
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in
this module.
+ */
+class GarbageCollector {
+ /**
+ * Empty direct byte buffer. Note that allocating memory of size 0 is UB,
so java actually allocates
+ * a 1-byte space. Be sure not to use this buffer for actual reading or
writing.
+ * In this instance it is only used for RocksDB to get the size of the
entry without copying the entry into the buffer.
+ */
+ private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+ /** Garbage collector's queue key's timestamp offset. */
+ private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+ /** Garbage collector's queue key's row id offset. */
+ private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET +
HYBRID_TIMESTAMP_SIZE;
+
+ /** Garbage collector's queue key's size. */
+ private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+ /** Thread-local direct buffer instance to read keys from RocksDB. */
+ private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER =
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+ /** Helper for the rocksdb partition. */
+ private final Helper helper;
+
+ GarbageCollector(Helper helper) {
+ this.helper = helper;
+ }
+
+ /**
+ * Tries adding a row to the GC queue. We put new row's timestamp, because
we can remove previous row only if both this row's
+ * and previous row's timestamps are below the watermark.
+ * Returns {@code true} if new value and previous value are both
tombstones.
+ *
+ * @param writeBatch Write batch.
+ * @param rowId Row id.
+ * @param timestamp New row's timestamp.
+ * @param isNewValueTombstone If new row is a tombstone.
+ * @return {@code true} if new value and previous value are both
tombstones.
+ * @throws RocksDBException If failed.
+ */
+ boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId,
HybridTimestamp timestamp, boolean isNewValueTombstone)
+ throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
+
+ boolean newAndPrevTombstones = false;
+
+ // Try find previous value for the row id.
+ ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+ keyBuffer.clear();
+
+ helper.putDataKey(keyBuffer, rowId, timestamp);
+
+ try (
+ Slice upperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions readOpts = new
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, readOpts)
+ ) {
+ it.seek(keyBuffer);
+
+ if (invalid(it)) {
+ return false;
+ }
+
+ keyBuffer.clear();
+
+ int keyLen = it.key(keyBuffer);
+
+ RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+ if (readRowId.equals(rowId)) {
+ // Found previous value.
+ assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+ if (isNewValueTombstone) {
+ // If new value is a tombstone, lets check if previous
value was also a tombstone.
+ int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+ newAndPrevTombstones = valueSize == 0;
+ }
+
+ if (!newAndPrevTombstones) {
+ keyBuffer.clear();
+
+ helper.putGcKey(keyBuffer, rowId, timestamp);
+
+ writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+ }
+ }
+ }
+
+ return newAndPrevTombstones;
+ }
+
+ /**
+ * Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+ *
+ * @param batch Write batch.
+ * @param lowWatermark Low watermark.
+ * @return Garbage collected element.
+ * @throws RocksDBException If failed to collect the garbage.
+ */
+ @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch,
HybridTimestamp lowWatermark) throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
Review Comment:
Let's rename to `partCf`
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in
this module.
+ */
+class GarbageCollector {
+ /**
+ * Empty direct byte buffer. Note that allocating memory of size 0 is UB,
so java actually allocates
+ * a 1-byte space. Be sure not to use this buffer for actual reading or
writing.
+ * In this instance it is only used for RocksDB to get the size of the
entry without copying the entry into the buffer.
+ */
+ private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+ /** Garbage collector's queue key's timestamp offset. */
+ private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+ /** Garbage collector's queue key's row id offset. */
+ private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET +
HYBRID_TIMESTAMP_SIZE;
+
+ /** Garbage collector's queue key's size. */
+ private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+ /** Thread-local direct buffer instance to read keys from RocksDB. */
+ private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER =
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+ /** Helper for the rocksdb partition. */
+ private final Helper helper;
+
+ GarbageCollector(Helper helper) {
+ this.helper = helper;
+ }
+
+ /**
+ * Tries adding a row to the GC queue. We put new row's timestamp, because
we can remove previous row only if both this row's
+ * and previous row's timestamps are below the watermark.
+ * Returns {@code true} if new value and previous value are both
tombstones.
+ *
+ * @param writeBatch Write batch.
+ * @param rowId Row id.
+ * @param timestamp New row's timestamp.
+ * @param isNewValueTombstone If new row is a tombstone.
+ * @return {@code true} if new value and previous value are both
tombstones.
+ * @throws RocksDBException If failed.
+ */
+ boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId,
HybridTimestamp timestamp, boolean isNewValueTombstone)
+ throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
+
+ boolean newAndPrevTombstones = false;
+
+ // Try find previous value for the row id.
+ ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+ keyBuffer.clear();
+
+ helper.putDataKey(keyBuffer, rowId, timestamp);
+
+ try (
+ Slice upperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions readOpts = new
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, readOpts)
+ ) {
+ it.seek(keyBuffer);
+
+ if (invalid(it)) {
+ return false;
+ }
+
+ keyBuffer.clear();
+
+ int keyLen = it.key(keyBuffer);
+
+ RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+ if (readRowId.equals(rowId)) {
+ // Found previous value.
+ assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+ if (isNewValueTombstone) {
+ // If new value is a tombstone, lets check if previous
value was also a tombstone.
+ int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+ newAndPrevTombstones = valueSize == 0;
+ }
+
+ if (!newAndPrevTombstones) {
+ keyBuffer.clear();
+
+ helper.putGcKey(keyBuffer, rowId, timestamp);
+
+ writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+ }
+ }
+ }
+
+ return newAndPrevTombstones;
+ }
+
+ /**
+ * Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+ *
+ * @param batch Write batch.
+ * @param lowWatermark Low watermark.
+ * @return Garbage collected element.
+ * @throws RocksDBException If failed to collect the garbage.
+ */
+ @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch,
HybridTimestamp lowWatermark) throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
+
+ // We retrieve the first element of the GC queue and seek for it in
the data CF.
+ // However, the element that we need to garbage collect is the next
(older one) element.
+ // First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
+ // If the next element exists, that should be the element that we want
to garbage collect.
+ try (
+ var gcUpperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts)
+ ) {
+ gcIt.seek(helper.partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+ gcKeyBuffer.clear();
+
+ gcIt.key(gcKeyBuffer);
+
+ HybridTimestamp gcElementTimestamp =
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+ if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+ // No elements to garbage collect.
+ return null;
+ }
+
+ RowId gcElementRowId = helper.getRowId(gcKeyBuffer,
GC_KEY_ROW_ID_OFFSET);
+
+ // Delete element from the GC queue.
+ batch.delete(gc, gcKeyBuffer);
+
+ try (
Review Comment:
Please also add some description.
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in
this module.
+ */
+class GarbageCollector {
+ /**
+ * Empty direct byte buffer. Note that allocating memory of size 0 is UB,
so java actually allocates
+ * a 1-byte space. Be sure not to use this buffer for actual reading or
writing.
+ * In this instance it is only used for RocksDB to get the size of the
entry without copying the entry into the buffer.
+ */
+ private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+ /** Garbage collector's queue key's timestamp offset. */
+ private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+ /** Garbage collector's queue key's row id offset. */
+ private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET +
HYBRID_TIMESTAMP_SIZE;
+
+ /** Garbage collector's queue key's size. */
+ private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+ /** Thread-local direct buffer instance to read keys from RocksDB. */
+ private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER =
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+ /** Helper for the rocksdb partition. */
+ private final Helper helper;
+
+ GarbageCollector(Helper helper) {
+ this.helper = helper;
+ }
+
+ /**
+ * Tries adding a row to the GC queue. We put new row's timestamp, because
we can remove previous row only if both this row's
+ * and previous row's timestamps are below the watermark.
+ * Returns {@code true} if new value and previous value are both
tombstones.
+ *
+ * @param writeBatch Write batch.
+ * @param rowId Row id.
+ * @param timestamp New row's timestamp.
+ * @param isNewValueTombstone If new row is a tombstone.
+ * @return {@code true} if new value and previous value are both
tombstones.
+ * @throws RocksDBException If failed.
+ */
+ boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId,
HybridTimestamp timestamp, boolean isNewValueTombstone)
+ throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
+
+ boolean newAndPrevTombstones = false;
+
+ // Try find previous value for the row id.
+ ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+ keyBuffer.clear();
+
+ helper.putDataKey(keyBuffer, rowId, timestamp);
+
+ try (
+ Slice upperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions readOpts = new
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, readOpts)
+ ) {
+ it.seek(keyBuffer);
+
+ if (invalid(it)) {
+ return false;
+ }
+
+ keyBuffer.clear();
+
+ int keyLen = it.key(keyBuffer);
+
+ RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+ if (readRowId.equals(rowId)) {
+ // Found previous value.
+ assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+ if (isNewValueTombstone) {
+ // If new value is a tombstone, lets check if previous
value was also a tombstone.
+ int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+ newAndPrevTombstones = valueSize == 0;
+ }
+
+ if (!newAndPrevTombstones) {
+ keyBuffer.clear();
+
+ helper.putGcKey(keyBuffer, rowId, timestamp);
+
+ writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+ }
+ }
+ }
+
+ return newAndPrevTombstones;
+ }
+
+ /**
+ * Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+ *
+ * @param batch Write batch.
+ * @param lowWatermark Low watermark.
+ * @return Garbage collected element.
+ * @throws RocksDBException If failed to collect the garbage.
+ */
+ @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch,
HybridTimestamp lowWatermark) throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
+
+ // We retrieve the first element of the GC queue and seek for it in
the data CF.
+ // However, the element that we need to garbage collect is the next
(older one) element.
+ // First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
+ // If the next element exists, that should be the element that we want
to garbage collect.
+ try (
+ var gcUpperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts)
+ ) {
+ gcIt.seek(helper.partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+ gcKeyBuffer.clear();
+
+ gcIt.key(gcKeyBuffer);
+
+ HybridTimestamp gcElementTimestamp =
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+ if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+ // No elements to garbage collect.
+ return null;
+ }
+
+ RowId gcElementRowId = helper.getRowId(gcKeyBuffer,
GC_KEY_ROW_ID_OFFSET);
+
+ // Delete element from the GC queue.
+ batch.delete(gc, gcKeyBuffer);
+
+ try (
+ var upperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions opts = new
ReadOptions().setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, opts)
+ ) {
+ ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+ dataKeyBuffer.clear();
+
+ // Process the element in data cf that triggered the addition
to the GC queue.
+ boolean proceed = checkHasNewerRowAndRemoveTombstone(it,
batch, dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+ if (!proceed) {
+ // No further processing required.
+ return null;
+ }
+
+ // Process the element in data cf that should be garbage
collected.
+ proceed = checkHasRowForGc(it, dataKeyBuffer, gcElementRowId);
+
+ if (!proceed) {
+ // No further processing required.
+ return null;
+ }
+
+ // At this point there's definitely a value that needs to be
garbage collected in the iterator.
+ byte[] valueBytes = it.value();
+
+ var row = new
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+ TableRowAndRowId retVal = new TableRowAndRowId(row,
gcElementRowId);
+
+ // Delete the row from the data cf.
+ batch.delete(cf, dataKeyBuffer);
+
+ return retVal;
+ }
+ }
+ }
+
+ /**
+ * Processes the entry that triggered adding row id to garbage collector's
queue.
+ * <br>
+ * There might already be no row in the data column family, because GC can
be run in parallel.
+ * If there is no row in the data column family, returns {@code false} as
no further processing is required.
+ * if there is a row and this entry is a tombstone, removes tombstone.
+ *
+ * @param it RocksDB data column family iterator.
+ * @param batch Write batch.
+ * @param dataKeyBuffer Buffer for the data column family key.
+ * @param gcElementRowId Row id of the element from the GC queue/
+ * @return {@code true} if further processing by garbage collector is
needed.
+ */
+ private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it,
WriteBatchWithIndex batch, ByteBuffer dataKeyBuffer,
+ RowId gcElementRowId, HybridTimestamp gcElementTimestamp) throws
RocksDBException {
+ ColumnFamilyHandle cf = helper.cf;
+
+ // Set up the data key.
+ helper.putDataKey(dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+ // Seek to the row id and timestamp from the GC queue.
+ // Note that it doesn't mean that the element in this iterator has
matching row id or even partition id.
+ it.seek(dataKeyBuffer);
+
+ boolean hasRowForGc = true;
+
+ if (invalid(it)) {
+ // There is no row for the GC queue element.
+ hasRowForGc = false;
+ } else {
+ dataKeyBuffer.clear();
+
+ it.key(dataKeyBuffer);
+
+ if (!helper.getRowId(dataKeyBuffer,
ROW_ID_OFFSET).equals(gcElementRowId)) {
+ // There is no row for the GC queue element.
+ hasRowForGc = false;
+ }
+ }
+
+ if (!hasRowForGc) {
+ return false;
+ }
+
+ // Check if the new element, whose insertion scheduled the GC, was a
tombstone.
+ int len = it.value(EMPTY_DIRECT_BUFFER);
+
+ if (len == 0) {
+ // This is a tombstone, we need to delete it.
+ batch.delete(cf, dataKeyBuffer);
+ }
+
+ return true;
Review Comment:
But what about the `hasRowForGc` ? =)
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in
this module.
+ */
+class GarbageCollector {
+ /**
+ * Empty direct byte buffer. Note that allocating memory of size 0 is UB,
so java actually allocates
+ * a 1-byte space. Be sure not to use this buffer for actual reading or
writing.
+ * In this instance it is only used for RocksDB to get the size of the
entry without copying the entry into the buffer.
+ */
+ private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+ /** Garbage collector's queue key's timestamp offset. */
+ private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+ /** Garbage collector's queue key's row id offset. */
+ private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET +
HYBRID_TIMESTAMP_SIZE;
+
+ /** Garbage collector's queue key's size. */
+ private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+ /** Thread-local direct buffer instance to read keys from RocksDB. */
+ private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER =
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+ /** Helper for the rocksdb partition. */
+ private final Helper helper;
+
+ GarbageCollector(Helper helper) {
+ this.helper = helper;
+ }
+
+ /**
+ * Tries adding a row to the GC queue. We put new row's timestamp, because
we can remove previous row only if both this row's
+ * and previous row's timestamps are below the watermark.
+ * Returns {@code true} if new value and previous value are both
tombstones.
+ *
+ * @param writeBatch Write batch.
+ * @param rowId Row id.
+ * @param timestamp New row's timestamp.
+ * @param isNewValueTombstone If new row is a tombstone.
+ * @return {@code true} if new value and previous value are both
tombstones.
+ * @throws RocksDBException If failed.
+ */
+ boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId,
HybridTimestamp timestamp, boolean isNewValueTombstone)
+ throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
+
+ boolean newAndPrevTombstones = false;
+
+ // Try find previous value for the row id.
+ ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+ keyBuffer.clear();
+
+ helper.putDataKey(keyBuffer, rowId, timestamp);
+
+ try (
+ Slice upperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions readOpts = new
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, readOpts)
+ ) {
+ it.seek(keyBuffer);
+
+ if (invalid(it)) {
+ return false;
+ }
+
+ keyBuffer.clear();
+
+ int keyLen = it.key(keyBuffer);
+
+ RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+ if (readRowId.equals(rowId)) {
+ // Found previous value.
+ assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+ if (isNewValueTombstone) {
+ // If new value is a tombstone, lets check if previous
value was also a tombstone.
+ int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+ newAndPrevTombstones = valueSize == 0;
+ }
+
+ if (!newAndPrevTombstones) {
+ keyBuffer.clear();
+
+ helper.putGcKey(keyBuffer, rowId, timestamp);
+
+ writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+ }
+ }
+ }
+
+ return newAndPrevTombstones;
+ }
+
+ /**
+ * Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+ *
+ * @param batch Write batch.
+ * @param lowWatermark Low watermark.
+ * @return Garbage collected element.
+ * @throws RocksDBException If failed to collect the garbage.
+ */
+ @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch,
HybridTimestamp lowWatermark) throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
+
+ // We retrieve the first element of the GC queue and seek for it in
the data CF.
+ // However, the element that we need to garbage collect is the next
(older one) element.
+ // First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
+ // If the next element exists, that should be the element that we want
to garbage collect.
+ try (
+ var gcUpperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts)
+ ) {
+ gcIt.seek(helper.partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+ gcKeyBuffer.clear();
+
+ gcIt.key(gcKeyBuffer);
+
+ HybridTimestamp gcElementTimestamp =
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+ if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+ // No elements to garbage collect.
+ return null;
+ }
+
+ RowId gcElementRowId = helper.getRowId(gcKeyBuffer,
GC_KEY_ROW_ID_OFFSET);
+
+ // Delete element from the GC queue.
+ batch.delete(gc, gcKeyBuffer);
+
+ try (
+ var upperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions opts = new
ReadOptions().setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, opts)
+ ) {
+ ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+ dataKeyBuffer.clear();
+
+ // Process the element in data cf that triggered the addition
to the GC queue.
+ boolean proceed = checkHasNewerRowAndRemoveTombstone(it,
batch, dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+ if (!proceed) {
+ // No further processing required.
+ return null;
+ }
+
+ // Process the element in data cf that should be garbage
collected.
+ proceed = checkHasRowForGc(it, dataKeyBuffer, gcElementRowId);
+
+ if (!proceed) {
+ // No further processing required.
+ return null;
+ }
+
+ // At this point there's definitely a value that needs to be
garbage collected in the iterator.
+ byte[] valueBytes = it.value();
+
+ var row = new
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+ TableRowAndRowId retVal = new TableRowAndRowId(row,
gcElementRowId);
+
+ // Delete the row from the data cf.
+ batch.delete(cf, dataKeyBuffer);
+
+ return retVal;
+ }
+ }
+ }
+
+ /**
+ * Processes the entry that triggered adding row id to garbage collector's
queue.
+ * <br>
+ * There might already be no row in the data column family, because GC can
be run in parallel.
+ * If there is no row in the data column family, returns {@code false} as
no further processing is required.
+ * if there is a row and this entry is a tombstone, removes tombstone.
+ *
+ * @param it RocksDB data column family iterator.
+ * @param batch Write batch.
+ * @param dataKeyBuffer Buffer for the data column family key.
+ * @param gcElementRowId Row id of the element from the GC queue/
+ * @return {@code true} if further processing by garbage collector is
needed.
+ */
+ private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it,
WriteBatchWithIndex batch, ByteBuffer dataKeyBuffer,
+ RowId gcElementRowId, HybridTimestamp gcElementTimestamp) throws
RocksDBException {
+ ColumnFamilyHandle cf = helper.cf;
+
+ // Set up the data key.
+ helper.putDataKey(dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+ // Seek to the row id and timestamp from the GC queue.
+ // Note that it doesn't mean that the element in this iterator has
matching row id or even partition id.
+ it.seek(dataKeyBuffer);
+
+ boolean hasRowForGc = true;
+
+ if (invalid(it)) {
+ // There is no row for the GC queue element.
+ hasRowForGc = false;
+ } else {
+ dataKeyBuffer.clear();
+
+ it.key(dataKeyBuffer);
+
+ if (!helper.getRowId(dataKeyBuffer,
ROW_ID_OFFSET).equals(gcElementRowId)) {
+ // There is no row for the GC queue element.
+ hasRowForGc = false;
+ }
+ }
+
+ if (!hasRowForGc) {
+ return false;
+ }
+
+ // Check if the new element, whose insertion scheduled the GC, was a
tombstone.
+ int len = it.value(EMPTY_DIRECT_BUFFER);
+
+ if (len == 0) {
+ // This is a tombstone, we need to delete it.
+ batch.delete(cf, dataKeyBuffer);
+ }
+
+ return true;
+ }
+
+ /**
+ * Checks if there is a row for garbage collection.
+ * There might already be no row in the data column family, because GC can
be run in parallel.
+ *
+ * @param it RocksDB data column family iterator.
+ * @param dataKeyBuffer Buffer for the data column family key.
+ * @param gcElementRowId Row id of the element from the GC queue/
+ * @return {@code true} if further processing by garbage collector is
needed.
+ */
+ private boolean checkHasRowForGc(RocksIterator it, ByteBuffer
dataKeyBuffer, RowId gcElementRowId) {
+ // Let's move to the element that was scheduled for GC.
+ it.next();
+
+ boolean hasRowForGc = true;
+
+ RowId gcRowId;
+
+ if (invalid(it)) {
+ hasRowForGc = false;
+ } else {
+ dataKeyBuffer.clear();
+
+ int keyLen = it.key(dataKeyBuffer);
+
+ if (keyLen != MAX_KEY_SIZE) {
+ // We moved to the next row id's write-intent, so there was no
row to GC for the current row id.
+ hasRowForGc = false;
+ } else {
+ gcRowId = helper.getRowId(dataKeyBuffer, ROW_ID_OFFSET);
+
+ if (!gcElementRowId.equals(gcRowId)) {
+ // We moved to the next row id, so there was no row to GC
for the current row id.
+ hasRowForGc = false;
Review Comment:
Can be returned immediately. =)
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in
this module.
+ */
+class GarbageCollector {
+ /**
+ * Empty direct byte buffer. Note that allocating memory of size 0 is UB,
so java actually allocates
+ * a 1-byte space. Be sure not to use this buffer for actual reading or
writing.
+ * In this instance it is only used for RocksDB to get the size of the
entry without copying the entry into the buffer.
+ */
+ private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+ /** Garbage collector's queue key's timestamp offset. */
+ private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+ /** Garbage collector's queue key's row id offset. */
+ private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET +
HYBRID_TIMESTAMP_SIZE;
+
+ /** Garbage collector's queue key's size. */
+ private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+ /** Thread-local direct buffer instance to read keys from RocksDB. */
+ private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER =
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+ /** Helper for the rocksdb partition. */
+ private final Helper helper;
+
+ GarbageCollector(Helper helper) {
+ this.helper = helper;
+ }
+
+ /**
+ * Tries adding a row to the GC queue. We put new row's timestamp, because
we can remove previous row only if both this row's
+ * and previous row's timestamps are below the watermark.
+ * Returns {@code true} if new value and previous value are both
tombstones.
+ *
+ * @param writeBatch Write batch.
+ * @param rowId Row id.
+ * @param timestamp New row's timestamp.
+ * @param isNewValueTombstone If new row is a tombstone.
+ * @return {@code true} if new value and previous value are both
tombstones.
+ * @throws RocksDBException If failed.
+ */
+ boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId,
HybridTimestamp timestamp, boolean isNewValueTombstone)
+ throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
+
+ boolean newAndPrevTombstones = false;
+
+ // Try find previous value for the row id.
+ ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+ keyBuffer.clear();
+
+ helper.putDataKey(keyBuffer, rowId, timestamp);
+
+ try (
+ Slice upperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions readOpts = new
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, readOpts)
+ ) {
+ it.seek(keyBuffer);
+
+ if (invalid(it)) {
+ return false;
+ }
+
+ keyBuffer.clear();
+
+ int keyLen = it.key(keyBuffer);
+
+ RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+ if (readRowId.equals(rowId)) {
+ // Found previous value.
+ assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+ if (isNewValueTombstone) {
+ // If new value is a tombstone, lets check if previous
value was also a tombstone.
+ int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+ newAndPrevTombstones = valueSize == 0;
+ }
+
+ if (!newAndPrevTombstones) {
+ keyBuffer.clear();
+
+ helper.putGcKey(keyBuffer, rowId, timestamp);
+
+ writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+ }
+ }
+ }
+
+ return newAndPrevTombstones;
+ }
+
+ /**
+ * Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+ *
+ * @param batch Write batch.
+ * @param lowWatermark Low watermark.
+ * @return Garbage collected element.
+ * @throws RocksDBException If failed to collect the garbage.
+ */
+ @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch,
HybridTimestamp lowWatermark) throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
+
+ // We retrieve the first element of the GC queue and seek for it in
the data CF.
+ // However, the element that we need to garbage collect is the next
(older one) element.
+ // First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
+ // If the next element exists, that should be the element that we want
to garbage collect.
+ try (
+ var gcUpperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts)
+ ) {
+ gcIt.seek(helper.partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+ gcKeyBuffer.clear();
+
+ gcIt.key(gcKeyBuffer);
+
+ HybridTimestamp gcElementTimestamp =
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+ if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+ // No elements to garbage collect.
+ return null;
+ }
+
+ RowId gcElementRowId = helper.getRowId(gcKeyBuffer,
GC_KEY_ROW_ID_OFFSET);
+
+ // Delete element from the GC queue.
+ batch.delete(gc, gcKeyBuffer);
+
+ try (
+ var upperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions opts = new
ReadOptions().setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, opts)
+ ) {
+ ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+ dataKeyBuffer.clear();
+
+ // Process the element in data cf that triggered the addition
to the GC queue.
+ boolean proceed = checkHasNewerRowAndRemoveTombstone(it,
batch, dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+ if (!proceed) {
+ // No further processing required.
+ return null;
+ }
+
+ // Process the element in data cf that should be garbage
collected.
+ proceed = checkHasRowForGc(it, dataKeyBuffer, gcElementRowId);
+
+ if (!proceed) {
+ // No further processing required.
+ return null;
+ }
+
+ // At this point there's definitely a value that needs to be
garbage collected in the iterator.
+ byte[] valueBytes = it.value();
+
+ var row = new
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+ TableRowAndRowId retVal = new TableRowAndRowId(row,
gcElementRowId);
+
+ // Delete the row from the data cf.
+ batch.delete(cf, dataKeyBuffer);
+
+ return retVal;
+ }
+ }
+ }
+
+ /**
+ * Processes the entry that triggered adding row id to garbage collector's
queue.
+ * <br>
+ * There might already be no row in the data column family, because GC can
be run in parallel.
+ * If there is no row in the data column family, returns {@code false} as
no further processing is required.
+ * if there is a row and this entry is a tombstone, removes tombstone.
+ *
+ * @param it RocksDB data column family iterator.
+ * @param batch Write batch.
+ * @param dataKeyBuffer Buffer for the data column family key.
+ * @param gcElementRowId Row id of the element from the GC queue/
+ * @return {@code true} if further processing by garbage collector is
needed.
+ */
+ private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it,
WriteBatchWithIndex batch, ByteBuffer dataKeyBuffer,
+ RowId gcElementRowId, HybridTimestamp gcElementTimestamp) throws
RocksDBException {
+ ColumnFamilyHandle cf = helper.cf;
+
+ // Set up the data key.
+ helper.putDataKey(dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+ // Seek to the row id and timestamp from the GC queue.
+ // Note that it doesn't mean that the element in this iterator has
matching row id or even partition id.
+ it.seek(dataKeyBuffer);
+
+ boolean hasRowForGc = true;
+
+ if (invalid(it)) {
+ // There is no row for the GC queue element.
+ hasRowForGc = false;
+ } else {
+ dataKeyBuffer.clear();
+
+ it.key(dataKeyBuffer);
+
+ if (!helper.getRowId(dataKeyBuffer,
ROW_ID_OFFSET).equals(gcElementRowId)) {
+ // There is no row for the GC queue element.
+ hasRowForGc = false;
+ }
+ }
+
+ if (!hasRowForGc) {
+ return false;
+ }
+
+ // Check if the new element, whose insertion scheduled the GC, was a
tombstone.
+ int len = it.value(EMPTY_DIRECT_BUFFER);
+
+ if (len == 0) {
+ // This is a tombstone, we need to delete it.
+ batch.delete(cf, dataKeyBuffer);
+ }
+
+ return true;
+ }
+
+ /**
+ * Checks if there is a row for garbage collection.
+ * There might already be no row in the data column family, because GC can
be run in parallel.
+ *
+ * @param it RocksDB data column family iterator.
+ * @param dataKeyBuffer Buffer for the data column family key.
+ * @param gcElementRowId Row id of the element from the GC queue/
+ * @return {@code true} if further processing by garbage collector is
needed.
+ */
+ private boolean checkHasRowForGc(RocksIterator it, ByteBuffer
dataKeyBuffer, RowId gcElementRowId) {
+ // Let's move to the element that was scheduled for GC.
+ it.next();
+
+ boolean hasRowForGc = true;
+
+ RowId gcRowId;
+
+ if (invalid(it)) {
+ hasRowForGc = false;
+ } else {
+ dataKeyBuffer.clear();
+
+ int keyLen = it.key(dataKeyBuffer);
+
+ if (keyLen != MAX_KEY_SIZE) {
+ // We moved to the next row id's write-intent, so there was no
row to GC for the current row id.
+ hasRowForGc = false;
Review Comment:
Can be returned immediately. =)
##########
modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.file.Path;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.storage.AbstractMvPartitionStorageGcTest;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test implementation for {@link RocksDbStorageEngine}.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class RocksDbMvPartitionStorageGcTest extends
AbstractMvPartitionStorageGcTest {
+ @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size =
16777216, writeBufferSize = 16777216}}")
Review Comment:
I will assume that for `RocksDbStorageEngine`.
But perhaps the question is why not use the default configuration, I have
the same question for Semyon.
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -165,9 +135,15 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
/** RocksDb instance. */
private final RocksDB db;
+ /** Helper for the rocksdb partition. */
+ private final Helper helper;
+
/** Partitions column family. */
private final ColumnFamilyHandle cf;
Review Comment:
As I understand it is now stored in `Helper` and can be removed from here.
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in
this module.
+ */
+class GarbageCollector {
+ /**
+ * Empty direct byte buffer. Note that allocating memory of size 0 is UB,
so java actually allocates
+ * a 1-byte space. Be sure not to use this buffer for actual reading or
writing.
+ * In this instance it is only used for RocksDB to get the size of the
entry without copying the entry into the buffer.
+ */
+ private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+ /** Garbage collector's queue key's timestamp offset. */
+ private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+ /** Garbage collector's queue key's row id offset. */
+ private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET +
HYBRID_TIMESTAMP_SIZE;
+
+ /** Garbage collector's queue key's size. */
+ private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+ /** Thread-local direct buffer instance to read keys from RocksDB. */
+ private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER =
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+ /** Helper for the rocksdb partition. */
+ private final Helper helper;
+
+ GarbageCollector(Helper helper) {
+ this.helper = helper;
+ }
+
+ /**
+ * Tries adding a row to the GC queue. We put new row's timestamp, because
we can remove previous row only if both this row's
+ * and previous row's timestamps are below the watermark.
+ * Returns {@code true} if new value and previous value are both
tombstones.
+ *
+ * @param writeBatch Write batch.
+ * @param rowId Row id.
+ * @param timestamp New row's timestamp.
+ * @param isNewValueTombstone If new row is a tombstone.
+ * @return {@code true} if new value and previous value are both
tombstones.
+ * @throws RocksDBException If failed.
+ */
+ boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId,
HybridTimestamp timestamp, boolean isNewValueTombstone)
+ throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
+
+ boolean newAndPrevTombstones = false;
+
+ // Try find previous value for the row id.
+ ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+ keyBuffer.clear();
+
+ helper.putDataKey(keyBuffer, rowId, timestamp);
+
+ try (
+ Slice upperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions readOpts = new
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, readOpts)
+ ) {
+ it.seek(keyBuffer);
+
+ if (invalid(it)) {
+ return false;
+ }
+
+ keyBuffer.clear();
+
+ int keyLen = it.key(keyBuffer);
+
+ RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+ if (readRowId.equals(rowId)) {
+ // Found previous value.
+ assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+ if (isNewValueTombstone) {
+ // If new value is a tombstone, lets check if previous
value was also a tombstone.
+ int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+ newAndPrevTombstones = valueSize == 0;
+ }
+
+ if (!newAndPrevTombstones) {
+ keyBuffer.clear();
+
+ helper.putGcKey(keyBuffer, rowId, timestamp);
+
+ writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+ }
+ }
+ }
+
+ return newAndPrevTombstones;
+ }
+
+ /**
+ * Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+ *
+ * @param batch Write batch.
+ * @param lowWatermark Low watermark.
+ * @return Garbage collected element.
+ * @throws RocksDBException If failed to collect the garbage.
+ */
+ @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch,
HybridTimestamp lowWatermark) throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
+
+ // We retrieve the first element of the GC queue and seek for it in
the data CF.
+ // However, the element that we need to garbage collect is the next
(older one) element.
+ // First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
+ // If the next element exists, that should be the element that we want
to garbage collect.
+ try (
+ var gcUpperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts)
+ ) {
+ gcIt.seek(helper.partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+ gcKeyBuffer.clear();
+
+ gcIt.key(gcKeyBuffer);
+
+ HybridTimestamp gcElementTimestamp =
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+ if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+ // No elements to garbage collect.
+ return null;
+ }
+
+ RowId gcElementRowId = helper.getRowId(gcKeyBuffer,
GC_KEY_ROW_ID_OFFSET);
+
+ // Delete element from the GC queue.
+ batch.delete(gc, gcKeyBuffer);
+
+ try (
+ var upperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions opts = new
ReadOptions().setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, opts)
+ ) {
+ ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+ dataKeyBuffer.clear();
+
+ // Process the element in data cf that triggered the addition
to the GC queue.
+ boolean proceed = checkHasNewerRowAndRemoveTombstone(it,
batch, dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+ if (!proceed) {
+ // No further processing required.
+ return null;
+ }
+
+ // Process the element in data cf that should be garbage
collected.
+ proceed = checkHasRowForGc(it, dataKeyBuffer, gcElementRowId);
+
+ if (!proceed) {
+ // No further processing required.
+ return null;
+ }
+
+ // At this point there's definitely a value that needs to be
garbage collected in the iterator.
+ byte[] valueBytes = it.value();
+
+ var row = new
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+ TableRowAndRowId retVal = new TableRowAndRowId(row,
gcElementRowId);
+
+ // Delete the row from the data cf.
+ batch.delete(cf, dataKeyBuffer);
+
+ return retVal;
+ }
+ }
+ }
+
+ /**
+ * Processes the entry that triggered adding row id to garbage collector's
queue.
+ * <br>
+ * There might already be no row in the data column family, because GC can
be run in parallel.
+ * If there is no row in the data column family, returns {@code false} as
no further processing is required.
+ * if there is a row and this entry is a tombstone, removes tombstone.
+ *
+ * @param it RocksDB data column family iterator.
+ * @param batch Write batch.
+ * @param dataKeyBuffer Buffer for the data column family key.
+ * @param gcElementRowId Row id of the element from the GC queue/
+ * @return {@code true} if further processing by garbage collector is
needed.
+ */
+ private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it,
WriteBatchWithIndex batch, ByteBuffer dataKeyBuffer,
+ RowId gcElementRowId, HybridTimestamp gcElementTimestamp) throws
RocksDBException {
+ ColumnFamilyHandle cf = helper.cf;
+
+ // Set up the data key.
+ helper.putDataKey(dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+ // Seek to the row id and timestamp from the GC queue.
+ // Note that it doesn't mean that the element in this iterator has
matching row id or even partition id.
+ it.seek(dataKeyBuffer);
+
+ boolean hasRowForGc = true;
+
+ if (invalid(it)) {
+ // There is no row for the GC queue element.
+ hasRowForGc = false;
Review Comment:
Can be returned immediately. =)
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in
this module.
+ */
+class GarbageCollector {
+ /**
+ * Empty direct byte buffer. Note that allocating memory of size 0 is UB,
so java actually allocates
+ * a 1-byte space. Be sure not to use this buffer for actual reading or
writing.
+ * In this instance it is only used for RocksDB to get the size of the
entry without copying the entry into the buffer.
+ */
+ private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+ /** Garbage collector's queue key's timestamp offset. */
+ private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+ /** Garbage collector's queue key's row id offset. */
+ private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET +
HYBRID_TIMESTAMP_SIZE;
+
+ /** Garbage collector's queue key's size. */
+ private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+ /** Thread-local direct buffer instance to read keys from RocksDB. */
+ private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER =
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+ /** Helper for the rocksdb partition. */
+ private final Helper helper;
+
+ GarbageCollector(Helper helper) {
+ this.helper = helper;
+ }
+
+ /**
+ * Tries adding a row to the GC queue. We put new row's timestamp, because
we can remove previous row only if both this row's
+ * and previous row's timestamps are below the watermark.
+ * Returns {@code true} if new value and previous value are both
tombstones.
+ *
+ * @param writeBatch Write batch.
+ * @param rowId Row id.
+ * @param timestamp New row's timestamp.
+ * @param isNewValueTombstone If new row is a tombstone.
+ * @return {@code true} if new value and previous value are both
tombstones.
+ * @throws RocksDBException If failed.
+ */
+ boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId,
HybridTimestamp timestamp, boolean isNewValueTombstone)
+ throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
+
+ boolean newAndPrevTombstones = false;
+
+ // Try find previous value for the row id.
+ ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+ keyBuffer.clear();
+
+ helper.putDataKey(keyBuffer, rowId, timestamp);
+
+ try (
+ Slice upperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions readOpts = new
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, readOpts)
+ ) {
+ it.seek(keyBuffer);
+
+ if (invalid(it)) {
+ return false;
+ }
+
+ keyBuffer.clear();
+
+ int keyLen = it.key(keyBuffer);
+
+ RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+ if (readRowId.equals(rowId)) {
+ // Found previous value.
+ assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+ if (isNewValueTombstone) {
+ // If new value is a tombstone, lets check if previous
value was also a tombstone.
+ int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+ newAndPrevTombstones = valueSize == 0;
+ }
+
+ if (!newAndPrevTombstones) {
+ keyBuffer.clear();
+
+ helper.putGcKey(keyBuffer, rowId, timestamp);
+
+ writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+ }
+ }
+ }
+
+ return newAndPrevTombstones;
+ }
+
+ /**
+ * Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+ *
+ * @param batch Write batch.
+ * @param lowWatermark Low watermark.
+ * @return Garbage collected element.
+ * @throws RocksDBException If failed to collect the garbage.
+ */
+ @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch,
HybridTimestamp lowWatermark) throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
+
+ // We retrieve the first element of the GC queue and seek for it in
the data CF.
+ // However, the element that we need to garbage collect is the next
(older one) element.
+ // First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
+ // If the next element exists, that should be the element that we want
to garbage collect.
+ try (
+ var gcUpperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts)
+ ) {
+ gcIt.seek(helper.partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+ gcKeyBuffer.clear();
+
+ gcIt.key(gcKeyBuffer);
+
+ HybridTimestamp gcElementTimestamp =
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+ if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+ // No elements to garbage collect.
+ return null;
+ }
+
+ RowId gcElementRowId = helper.getRowId(gcKeyBuffer,
GC_KEY_ROW_ID_OFFSET);
+
+ // Delete element from the GC queue.
+ batch.delete(gc, gcKeyBuffer);
+
+ try (
+ var upperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions opts = new
ReadOptions().setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, opts)
+ ) {
+ ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+ dataKeyBuffer.clear();
+
+ // Process the element in data cf that triggered the addition
to the GC queue.
+ boolean proceed = checkHasNewerRowAndRemoveTombstone(it,
batch, dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+ if (!proceed) {
+ // No further processing required.
+ return null;
+ }
+
+ // Process the element in data cf that should be garbage
collected.
+ proceed = checkHasRowForGc(it, dataKeyBuffer, gcElementRowId);
+
+ if (!proceed) {
+ // No further processing required.
+ return null;
+ }
+
+ // At this point there's definitely a value that needs to be
garbage collected in the iterator.
+ byte[] valueBytes = it.value();
+
+ var row = new
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+ TableRowAndRowId retVal = new TableRowAndRowId(row,
gcElementRowId);
+
+ // Delete the row from the data cf.
+ batch.delete(cf, dataKeyBuffer);
+
+ return retVal;
+ }
+ }
+ }
+
+ /**
+ * Processes the entry that triggered adding row id to garbage collector's
queue.
+ * <br>
+ * There might already be no row in the data column family, because GC can
be run in parallel.
+ * If there is no row in the data column family, returns {@code false} as
no further processing is required.
+ * if there is a row and this entry is a tombstone, removes tombstone.
+ *
+ * @param it RocksDB data column family iterator.
+ * @param batch Write batch.
+ * @param dataKeyBuffer Buffer for the data column family key.
+ * @param gcElementRowId Row id of the element from the GC queue/
+ * @return {@code true} if further processing by garbage collector is
needed.
+ */
+ private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it,
WriteBatchWithIndex batch, ByteBuffer dataKeyBuffer,
+ RowId gcElementRowId, HybridTimestamp gcElementTimestamp) throws
RocksDBException {
+ ColumnFamilyHandle cf = helper.cf;
Review Comment:
Let's rename to `partCf`
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import static java.lang.ThreadLocal.withInitial;
+import static java.nio.ByteBuffer.allocateDirect;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.KEY_BYTE_ORDER;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MAX_KEY_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.MV_KEY_BUFFER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.PARTITION_ID_SIZE;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_OFFSET;
+import static org.apache.ignite.internal.storage.rocksdb.Helper.ROW_ID_SIZE;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.TABLE_ROW_BYTE_ORDER;
+import static
org.apache.ignite.internal.storage.rocksdb.Helper.readTimestampNatural;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMvPartitionStorage.invalid;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TableRowAndRowId;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchWithIndex;
+
+/**
+ * Garbage collector.
+ *
+ * <p>Key:
+ * <pre>{@code
+ * | partId (2 bytes, BE) | timestamp (12 bytes, ASC) | rowId (16 bytes, BE) |
+ * }</pre>
+ * Value is an empty byte array.
+ *
+ * <p>For more information refer to the tech-notes/garbage-collection.md in
this module.
+ */
+class GarbageCollector {
+ /**
+ * Empty direct byte buffer. Note that allocating memory of size 0 is UB,
so java actually allocates
+ * a 1-byte space. Be sure not to use this buffer for actual reading or
writing.
+ * In this instance it is only used for RocksDB to get the size of the
entry without copying the entry into the buffer.
+ */
+ private static final ByteBuffer EMPTY_DIRECT_BUFFER = allocateDirect(0);
+
+ /** Garbage collector's queue key's timestamp offset. */
+ private static final int GC_KEY_TS_OFFSET = PARTITION_ID_SIZE;
+
+ /** Garbage collector's queue key's row id offset. */
+ private static final int GC_KEY_ROW_ID_OFFSET = GC_KEY_TS_OFFSET +
HYBRID_TIMESTAMP_SIZE;
+
+ /** Garbage collector's queue key's size. */
+ private static final int GC_KEY_SIZE = GC_KEY_ROW_ID_OFFSET + ROW_ID_SIZE;
+
+ /** Thread-local direct buffer instance to read keys from RocksDB. */
+ private static final ThreadLocal<ByteBuffer> GC_KEY_BUFFER =
withInitial(() -> allocateDirect(GC_KEY_SIZE).order(KEY_BYTE_ORDER));
+
+ /** Helper for the rocksdb partition. */
+ private final Helper helper;
+
+ GarbageCollector(Helper helper) {
+ this.helper = helper;
+ }
+
+ /**
+ * Tries adding a row to the GC queue. We put new row's timestamp, because
we can remove previous row only if both this row's
+ * and previous row's timestamps are below the watermark.
+ * Returns {@code true} if new value and previous value are both
tombstones.
+ *
+ * @param writeBatch Write batch.
+ * @param rowId Row id.
+ * @param timestamp New row's timestamp.
+ * @param isNewValueTombstone If new row is a tombstone.
+ * @return {@code true} if new value and previous value are both
tombstones.
+ * @throws RocksDBException If failed.
+ */
+ boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId,
HybridTimestamp timestamp, boolean isNewValueTombstone)
+ throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
+
+ boolean newAndPrevTombstones = false;
+
+ // Try find previous value for the row id.
+ ByteBuffer keyBuffer = MV_KEY_BUFFER.get();
+ keyBuffer.clear();
+
+ helper.putDataKey(keyBuffer, rowId, timestamp);
+
+ try (
+ Slice upperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions readOpts = new
ReadOptions().setTotalOrderSeek(true).setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, readOpts)
+ ) {
+ it.seek(keyBuffer);
+
+ if (invalid(it)) {
+ return false;
+ }
+
+ keyBuffer.clear();
+
+ int keyLen = it.key(keyBuffer);
+
+ RowId readRowId = helper.getRowId(keyBuffer, ROW_ID_OFFSET);
+
+ if (readRowId.equals(rowId)) {
+ // Found previous value.
+ assert keyLen == MAX_KEY_SIZE; // Can not be write-intent.
+
+ if (isNewValueTombstone) {
+ // If new value is a tombstone, lets check if previous
value was also a tombstone.
+ int valueSize = it.value(EMPTY_DIRECT_BUFFER);
+
+ newAndPrevTombstones = valueSize == 0;
+ }
+
+ if (!newAndPrevTombstones) {
+ keyBuffer.clear();
+
+ helper.putGcKey(keyBuffer, rowId, timestamp);
+
+ writeBatch.put(gc, keyBuffer, EMPTY_DIRECT_BUFFER);
+ }
+ }
+ }
+
+ return newAndPrevTombstones;
+ }
+
+ /**
+ * Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+ *
+ * @param batch Write batch.
+ * @param lowWatermark Low watermark.
+ * @return Garbage collected element.
+ * @throws RocksDBException If failed to collect the garbage.
+ */
+ @Nullable TableRowAndRowId pollForVacuum(WriteBatchWithIndex batch,
HybridTimestamp lowWatermark) throws RocksDBException {
+ RocksDB db = helper.db;
+ ColumnFamilyHandle gc = helper.gc;
+ ColumnFamilyHandle cf = helper.cf;
+
+ // We retrieve the first element of the GC queue and seek for it in
the data CF.
+ // However, the element that we need to garbage collect is the next
(older one) element.
+ // First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
+ // If the next element exists, that should be the element that we want
to garbage collect.
+ try (
+ var gcUpperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts)
+ ) {
+ gcIt.seek(helper.partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+ gcKeyBuffer.clear();
+
+ gcIt.key(gcKeyBuffer);
+
+ HybridTimestamp gcElementTimestamp =
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+
+ if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
+ // No elements to garbage collect.
+ return null;
+ }
+
+ RowId gcElementRowId = helper.getRowId(gcKeyBuffer,
GC_KEY_ROW_ID_OFFSET);
+
+ // Delete element from the GC queue.
+ batch.delete(gc, gcKeyBuffer);
+
+ try (
+ var upperBound = new Slice(helper.partitionEndPrefix());
+ ReadOptions opts = new
ReadOptions().setIterateUpperBound(upperBound);
+ RocksIterator it = db.newIterator(cf, opts)
+ ) {
+ ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
+ dataKeyBuffer.clear();
+
+ // Process the element in data cf that triggered the addition
to the GC queue.
+ boolean proceed = checkHasNewerRowAndRemoveTombstone(it,
batch, dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+ if (!proceed) {
+ // No further processing required.
+ return null;
+ }
+
+ // Process the element in data cf that should be garbage
collected.
+ proceed = checkHasRowForGc(it, dataKeyBuffer, gcElementRowId);
+
+ if (!proceed) {
+ // No further processing required.
+ return null;
+ }
+
+ // At this point there's definitely a value that needs to be
garbage collected in the iterator.
+ byte[] valueBytes = it.value();
+
+ var row = new
TableRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
+ TableRowAndRowId retVal = new TableRowAndRowId(row,
gcElementRowId);
+
+ // Delete the row from the data cf.
+ batch.delete(cf, dataKeyBuffer);
+
+ return retVal;
+ }
+ }
+ }
+
+ /**
+ * Processes the entry that triggered adding row id to garbage collector's
queue.
+ * <br>
+ * There might already be no row in the data column family, because GC can
be run in parallel.
+ * If there is no row in the data column family, returns {@code false} as
no further processing is required.
+ * if there is a row and this entry is a tombstone, removes tombstone.
+ *
+ * @param it RocksDB data column family iterator.
+ * @param batch Write batch.
+ * @param dataKeyBuffer Buffer for the data column family key.
+ * @param gcElementRowId Row id of the element from the GC queue/
+ * @return {@code true} if further processing by garbage collector is
needed.
+ */
+ private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it,
WriteBatchWithIndex batch, ByteBuffer dataKeyBuffer,
+ RowId gcElementRowId, HybridTimestamp gcElementTimestamp) throws
RocksDBException {
+ ColumnFamilyHandle cf = helper.cf;
+
+ // Set up the data key.
+ helper.putDataKey(dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+
+ // Seek to the row id and timestamp from the GC queue.
+ // Note that it doesn't mean that the element in this iterator has
matching row id or even partition id.
+ it.seek(dataKeyBuffer);
+
+ boolean hasRowForGc = true;
+
+ if (invalid(it)) {
+ // There is no row for the GC queue element.
+ hasRowForGc = false;
+ } else {
+ dataKeyBuffer.clear();
+
+ it.key(dataKeyBuffer);
+
+ if (!helper.getRowId(dataKeyBuffer,
ROW_ID_OFFSET).equals(gcElementRowId)) {
+ // There is no row for the GC queue element.
+ hasRowForGc = false;
Review Comment:
Can be returned immediately. =)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]