tkalkirill commented on code in PR #1619:
URL: https://github.com/apache/ignite-3/pull/1619#discussion_r1095678190
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1117,149 @@ public void destroyData(WriteBatch writeBatch) throws
RocksDBException {
writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
}
+ @Override
+ public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
Review Comment:
What about rebalancing?
If we are in the process of rebalancing, should we return rows? I think we
should throw an exception, like in `RocksDbMvPartitionStorage#read`.
Since we clean up the storage at the beginning/aborting of the rebalance, I
think that the queue should also be cleaned up.
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -595,21 +619,97 @@ public void addWriteCommitted(RowId rowId, @Nullable
TableRow row, HybridTimesta
@SuppressWarnings("resource") WriteBatchWithIndex writeBatch =
requireWriteBatch();
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
- putTimestamp(keyBuf, commitTimestamp);
+ putTimestampDesc(keyBuf, commitTimestamp);
+
+ boolean isNewValueTombstone = row == null;
//TODO IGNITE-16913 Add proper way to write row bytes into array
without allocations.
- byte[] rowBytes = rowBytes(row);
+ byte[] rowBytes = row != null ? rowBytes(row) : new byte[0];
+ boolean newAndPrevTombstones;
try {
- writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE),
rowBytes);
-
- return null;
+ newAndPrevTombstones = maybeAddToGcQueue(writeBatch, rowId,
commitTimestamp, isNewValueTombstone);
} catch (RocksDBException e) {
- throw new StorageException("Failed to update a row in
storage", e);
+ throw new StorageException("Failed to add row to the GC
queue", e);
+ }
+
+ if (!newAndPrevTombstones) {
+ try {
+ writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE),
rowBytes);
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to update a row in
storage", e);
+ }
}
+
+ return null;
});
}
+ /**
+ * Tries adding a row to the GC queue. We put new row's timestamp, because
we can remove previous row only if both this row's
+ * and previous row's timestamps are below the watermark.
+ * Returns {@code true} if new value and previous value are both
tombstones.
+ *
+ * @param writeBatch Write batch.
+ * @param rowId Row id.
+ * @param timestamp New row's timestamp.
+ * @param isNewValueTombstone If new row is a tombstone.
+ * @return {@code true} if new value and previous value are both
tombstones.
+ * @throws RocksDBException If failed.
+ */
+ private boolean maybeAddToGcQueue(WriteBatchWithIndex writeBatch, RowId
rowId, HybridTimestamp timestamp, boolean isNewValueTombstone)
+ throws RocksDBException {
+ boolean newAndPrevTombstones = false;
+
+ // Try find previous value for the row id.
+ ByteBuffer key = allocateDirect(MAX_KEY_SIZE);
Review Comment:
Can we rename it to `keyBuffer`? and use something like
`RocksDbMvPartitionStorage#HEAP_KEY_BUFFER`?
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1117,149 @@ public void destroyData(WriteBatch writeBatch) throws
RocksDBException {
writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
}
+ @Override
+ public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ WriteBatchWithIndex batch = requireWriteBatch();
+
+ try (
+ var gcUpperBound = new Slice(partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound).setTotalOrderSeek(true);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts);
+ ) {
+ gcIt.seek(partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKey = allocateDirect(GC_KEY_SIZE);
+ ByteBuffer dataKey = MV_KEY_BUFFER.get();
+
+ gcKey.clear();
+ gcIt.key(gcKey);
+
+ HybridTimestamp timestamp = readTimestampNatural(gcKey,
GC_KEY_TS_OFFSET);
+
+ if (timestamp.compareTo(lowWatermark) > 0) {
+ // No elements to garbage collect.
+ return null;
+ }
+
+ RowId readRowId = getRowId(gcKey, GC_KEY_ROW_ID_OFFSET);
+
+ try (
Review Comment:
Please describe what you are doing here.
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1117,149 @@ public void destroyData(WriteBatch writeBatch) throws
RocksDBException {
writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
}
+ @Override
+ public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ WriteBatchWithIndex batch = requireWriteBatch();
+
+ try (
+ var gcUpperBound = new Slice(partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound).setTotalOrderSeek(true);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts);
+ ) {
+ gcIt.seek(partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKey = allocateDirect(GC_KEY_SIZE);
+ ByteBuffer dataKey = MV_KEY_BUFFER.get();
+
+ gcKey.clear();
+ gcIt.key(gcKey);
+
+ HybridTimestamp timestamp = readTimestampNatural(gcKey,
GC_KEY_TS_OFFSET);
+
+ if (timestamp.compareTo(lowWatermark) > 0) {
+ // No elements to garbage collect.
+ return null;
+ }
+
+ RowId readRowId = getRowId(gcKey, GC_KEY_ROW_ID_OFFSET);
+
+ try (
+ var upperBound = new Slice(partitionEndPrefix());
+ ReadOptions opts = new
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+ RocksIterator it = db.newIterator(cf, opts);
+ ) {
+ dataKey.clear();
+
+ dataKey.putShort((short) partitionId);
+ putRowId(dataKey, readRowId);
+ putTimestampDesc(dataKey, timestamp);
+ dataKey.flip();
+
+ it.seek(dataKey);
+
+ boolean noRowForGcElement = false;
+ if (invalid(it)) {
+ // There is no row for the GC queue element.
+ noRowForGcElement = true;
+ } else {
+ dataKey.clear();
+
+ it.key(dataKey);
+
+ if (!getRowId(dataKey).equals(readRowId)) {
+ // There is no row for the GC queue element.
+ noRowForGcElement = true;
+ }
+ }
+
+ if (noRowForGcElement) {
+ gcKey.rewind();
+
+ try {
+ batch.delete(gc, gcKey);
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to collect
garbage", e);
Review Comment:
Please add `RocksDbMvPartitionStorage#createStorageInfo`
##########
gradle/libs.versions.toml:
##########
@@ -57,7 +57,7 @@ jsonpath = "2.4.0"
classgraph = "4.8.110"
javassist = "3.28.0-GA"
checker = "3.10.0"
-rocksdb = "7.3.1"
+rocksdb = "7.9.2"
Review Comment:
I think we should make a fix that is as close to the problem as possible.
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1117,149 @@ public void destroyData(WriteBatch writeBatch) throws
RocksDBException {
writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
}
+ @Override
+ public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ WriteBatchWithIndex batch = requireWriteBatch();
+
+ try (
+ var gcUpperBound = new Slice(partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound).setTotalOrderSeek(true);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts);
+ ) {
+ gcIt.seek(partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKey = allocateDirect(GC_KEY_SIZE);
+ ByteBuffer dataKey = MV_KEY_BUFFER.get();
+
+ gcKey.clear();
+ gcIt.key(gcKey);
+
+ HybridTimestamp timestamp = readTimestampNatural(gcKey,
GC_KEY_TS_OFFSET);
Review Comment:
```suggestion
HybridTimestamp keyTimestamp = readTimestampNatural(gcKey,
GC_KEY_TS_OFFSET);
```
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -225,8 +225,9 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
* @param lowWatermark A time threshold for the row. Rows younger then the
watermark value will not be removed.
* @return A pair of table row and row id, where a timestamp of the row is
less than or equal to {@code lowWatermark}.
* {@code null} if there's no such value.
+ * @throws StorageException If failed to poll element for vacuum.
*/
- default @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ default @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) throws StorageException {
Review Comment:
You wrote "Every other", but it's not, I suggest not to change the signature.
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1117,149 @@ public void destroyData(WriteBatch writeBatch) throws
RocksDBException {
writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
}
+ @Override
+ public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ WriteBatchWithIndex batch = requireWriteBatch();
+
+ try (
+ var gcUpperBound = new Slice(partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound).setTotalOrderSeek(true);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts);
+ ) {
+ gcIt.seek(partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKey = allocateDirect(GC_KEY_SIZE);
Review Comment:
Let's use the buffer too, shall we?
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1117,149 @@ public void destroyData(WriteBatch writeBatch) throws
RocksDBException {
writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
}
+ @Override
+ public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ WriteBatchWithIndex batch = requireWriteBatch();
+
+ try (
+ var gcUpperBound = new Slice(partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound).setTotalOrderSeek(true);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts);
+ ) {
+ gcIt.seek(partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKey = allocateDirect(GC_KEY_SIZE);
+ ByteBuffer dataKey = MV_KEY_BUFFER.get();
+
+ gcKey.clear();
+ gcIt.key(gcKey);
+
+ HybridTimestamp timestamp = readTimestampNatural(gcKey,
GC_KEY_TS_OFFSET);
+
+ if (timestamp.compareTo(lowWatermark) > 0) {
+ // No elements to garbage collect.
+ return null;
+ }
+
+ RowId readRowId = getRowId(gcKey, GC_KEY_ROW_ID_OFFSET);
+
+ try (
+ var upperBound = new Slice(partitionEndPrefix());
+ ReadOptions opts = new
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+ RocksIterator it = db.newIterator(cf, opts);
+ ) {
+ dataKey.clear();
+
+ dataKey.putShort((short) partitionId);
+ putRowId(dataKey, readRowId);
+ putTimestampDesc(dataKey, timestamp);
+ dataKey.flip();
+
+ it.seek(dataKey);
+
+ boolean noRowForGcElement = false;
+ if (invalid(it)) {
+ // There is no row for the GC queue element.
+ noRowForGcElement = true;
+ } else {
+ dataKey.clear();
+
+ it.key(dataKey);
+
+ if (!getRowId(dataKey).equals(readRowId)) {
+ // There is no row for the GC queue element.
+ noRowForGcElement = true;
+ }
+ }
+
+ if (noRowForGcElement) {
+ gcKey.rewind();
+
+ try {
+ batch.delete(gc, gcKey);
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to collect
garbage", e);
+ }
+
+ return null;
+ }
+
+ // Check if the new element, whose insertion scheduled the
GC, was a tombstone.
+ int len = it.value(EMPTY_DIRECT_BUFFER);
Review Comment:
```suggestion
int len = it.value(EMPTY_DIRECT_BUFFER);
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1117,149 @@ public void destroyData(WriteBatch writeBatch) throws
RocksDBException {
writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
}
+ @Override
+ public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ WriteBatchWithIndex batch = requireWriteBatch();
+
+ try (
Review Comment:
Please describe what you are doing here.
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1117,149 @@ public void destroyData(WriteBatch writeBatch) throws
RocksDBException {
writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
}
+ @Override
+ public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ WriteBatchWithIndex batch = requireWriteBatch();
+
+ try (
+ var gcUpperBound = new Slice(partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound).setTotalOrderSeek(true);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts);
+ ) {
+ gcIt.seek(partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKey = allocateDirect(GC_KEY_SIZE);
+ ByteBuffer dataKey = MV_KEY_BUFFER.get();
+
+ gcKey.clear();
+ gcIt.key(gcKey);
+
+ HybridTimestamp timestamp = readTimestampNatural(gcKey,
GC_KEY_TS_OFFSET);
+
+ if (timestamp.compareTo(lowWatermark) > 0) {
+ // No elements to garbage collect.
+ return null;
+ }
+
+ RowId readRowId = getRowId(gcKey, GC_KEY_ROW_ID_OFFSET);
+
+ try (
+ var upperBound = new Slice(partitionEndPrefix());
+ ReadOptions opts = new
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+ RocksIterator it = db.newIterator(cf, opts);
+ ) {
+ dataKey.clear();
+
+ dataKey.putShort((short) partitionId);
+ putRowId(dataKey, readRowId);
+ putTimestampDesc(dataKey, timestamp);
+ dataKey.flip();
+
+ it.seek(dataKey);
+
+ boolean noRowForGcElement = false;
Review Comment:
```suggestion
boolean noRowForGcElement = false;
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1015,6 +1117,149 @@ public void destroyData(WriteBatch writeBatch) throws
RocksDBException {
writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
}
+ @Override
+ public @Nullable TableRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ WriteBatchWithIndex batch = requireWriteBatch();
+
+ try (
+ var gcUpperBound = new Slice(partitionEndPrefix());
+ ReadOptions gcOpts = new
ReadOptions().setIterateUpperBound(gcUpperBound).setTotalOrderSeek(true);
+ RocksIterator gcIt = db.newIterator(gc, gcOpts);
+ ) {
+ gcIt.seek(partitionStartPrefix());
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ ByteBuffer gcKey = allocateDirect(GC_KEY_SIZE);
Review Comment:
```suggestion
ByteBuffer gcKeyBuffer = allocateDirect(GC_KEY_SIZE);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]