tkalkirill commented on code in PR #1619:
URL: https://github.com/apache/ignite-3/pull/1619#discussion_r1095714606
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -595,21 +619,97 @@ public void addWriteCommitted(RowId rowId, @Nullable
TableRow row, HybridTimesta
@SuppressWarnings("resource") WriteBatchWithIndex writeBatch =
requireWriteBatch();
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
- putTimestamp(keyBuf, commitTimestamp);
+ putTimestampDesc(keyBuf, commitTimestamp);
+
+ boolean isNewValueTombstone = row == null;
//TODO IGNITE-16913 Add proper way to write row bytes into array
without allocations.
- byte[] rowBytes = rowBytes(row);
+ byte[] rowBytes = row != null ? rowBytes(row) : new byte[0];
+ boolean newAndPrevTombstones;
try {
- writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE),
rowBytes);
-
- return null;
+ newAndPrevTombstones = maybeAddToGcQueue(writeBatch, rowId,
commitTimestamp, isNewValueTombstone);
} catch (RocksDBException e) {
- throw new StorageException("Failed to update a row in
storage", e);
+ throw new StorageException("Failed to add row to the GC
queue", e);
+ }
+
+ if (!newAndPrevTombstones) {
+ try {
+ writeBatch.put(cf, copyOf(keyBuf.array(), MAX_KEY_SIZE),
rowBytes);
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to update a row in
storage", e);
+ }
}
+
+ return null;
});
}
+ /**
+ * Tries adding a row to the GC queue. We put new row's timestamp, because
we can remove previous row only if both this row's
+ * and previous row's timestamps are below the watermark.
+ * Returns {@code true} if new value and previous value are both
tombstones.
+ *
+ * @param writeBatch Write batch.
+ * @param rowId Row id.
+ * @param timestamp New row's timestamp.
+ * @param isNewValueTombstone If new row is a tombstone.
+ * @return {@code true} if new value and previous value are both
tombstones.
+ * @throws RocksDBException If failed.
+ */
+ private boolean maybeAddToGcQueue(WriteBatchWithIndex writeBatch, RowId
rowId, HybridTimestamp timestamp, boolean isNewValueTombstone)
+ throws RocksDBException {
+ boolean newAndPrevTombstones = false;
+
+ // Try find previous value for the row id.
+ ByteBuffer key = allocateDirect(MAX_KEY_SIZE);
Review Comment:
My fault, I meant that we can use `ThreadLocal` so as not to allocate the
buffer each time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]