ibessonov commented on code in PR #1673:
URL: https://github.com/apache/ignite-3/pull/1673#discussion_r1109865778
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java:
##########
@@ -228,24 +228,24 @@ public long getIndexTreeMetaPageId(long pageAddr) {
}
/**
- * Sets an garbage collection tree meta page id.
+ * Sets an garbage collection queue meta page id.
Review Comment:
```suggestion
* Sets a garbage collection queue meta page id.
```
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java:
##########
@@ -254,22 +254,22 @@ public void indexTreeMetaPageId(@Nullable UUID
checkpointId, long indexTreeMetaP
}
/**
- * Returns garbage collection tree meta page id.
+ * Returns garbage collection queue meta page id.
*/
- public long garbageCollectionTreeMetaPageId() {
- return garbageCollectionTreeMetaPageId;
+ public long gcQueueMetaPageId() {
+ return gcQueueMetaPageId;
}
/**
- * Sets an garbage collection tree meta page id.
+ * Sets an garbage collection queue meta page id.
Review Comment:
```suggestion
* Sets a garbage collection queue meta page id.
```
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java:
##########
@@ -113,26 +113,27 @@ public void finish() {
/**
* Row version filter in the version chain.
*/
+ @FunctionalInterface
interface RowVersionFilter {
/**
* Returns {@code true} if the version matches.
*
* @param rowVersionLink Row version link;
- * @param rowVersionAdder Address to row version (including page
address + offset within it).
+ * @param rowVersionAddr Address of row version (including page
address + offset within it).
*/
- boolean apply(long rowVersionLink, long rowVersionAdder);
+ boolean apply(long rowVersionLink, long rowVersionAddr);
static RowVersionFilter equalsByTimestamp(HybridTimestamp timestamp) {
- return (rowVersionLink, rowVersionAdder) -> {
- HybridTimestamp readTimestamp =
HybridTimestamps.readTimestamp(rowVersionAdder, RowVersion.TIMESTAMP_OFFSET);
+ return (rowVersionLink, rowVersionAddr) -> {
+ HybridTimestamp readTimestamp =
HybridTimestamps.readTimestamp(rowVersionAddr, RowVersion.TIMESTAMP_OFFSET);
return readTimestamp != null &&
readTimestamp.compareTo(timestamp) == 0;
Review Comment:
Can this be a `Objects.equals(readTimestamp, timestamp)`?
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java:
##########
@@ -53,7 +54,11 @@ class AddWriteCommittedInvokeClosure implements
InvokeClosure<VersionChain> {
private @Nullable VersionChain newRow;
- private boolean addToGc;
+ /**
+ * Link to the row version for which the commit occurred. It will be a
{@link PageIdUtils#NULL_LINK} if the current and the previous
+ * row versions are tombstones or have only one row version in the version
chain.
Review Comment:
This comment is very confusing actually. Can you please elaborate on it?
Maybe a rename would help. It's a link that would be added to a GC queue
after the closure execution, right? It's hard to decipher from the name, cause
it looks more like "rowLink that will be processed by GC"
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java:
##########
@@ -922,4 +960,58 @@ protected <T> T inUpdateVersionChainLock(RowId rowId,
Supplier<T> supplier) {
});
}
}
+
+ @Override
+ public @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableState();
+
+ GarbageCollectionRowVersion first = gcQueue.findFirstFromGc();
+
+ // Garbage collection queue is empty.
+ if (first == null) {
+ return null;
+ }
+
+ HybridTimestamp rowTimestamp = first.getTimestamp();
+
+ // There are no versions in the garbage collection queue before
watermark.
+ if (rowTimestamp.compareTo(lowWatermark) > 0) {
+ return null;
+ }
+
+ RowId rowId = first.getRowId();
+
+ return inUpdateVersionChainLock(rowId, () -> {
+ // Someone processed the element in parallel.
+ if (!gcQueue.removeFromGc(rowId, rowTimestamp)) {
+ return null;
Review Comment:
I see no TODO here. This place should be fixed, it will mess up the GC
process in the future. It should also be fixed in RocksDB-based implementation
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/io/GcIo.java:
##########
@@ -118,15 +131,16 @@ default int compare(long pageAddr, int idx,
GarbageCollectionRowVersion rowVersi
* @param idx Element's index.
* @param partitionId Partition id to enrich read partitionless links.
*/
- default GarbageCollectionRowVersion getRow(long pageAddr, int idx, int
partitionId) {
+ default GcRowVersion getRow(long pageAddr, int idx, int partitionId) {
int offset = offset(idx);
long rowIdMsb = getLong(pageAddr, offset + ROW_ID_MSB_OFFSET);
long rowIdLsb = getLong(pageAddr, offset + ROW_ID_LSB_OFFSET);
- return new GarbageCollectionRowVersion(
+ return new GcRowVersion(
new RowId(partitionId, rowIdMsb, rowIdLsb),
- HybridTimestamps.readTimestamp(pageAddr, offset +
ROW_TIMESTAMP_OFFSET)
+ requireNonNull(HybridTimestamps.readTimestamp(pageAddr, offset
+ ROW_TIMESTAMP_OFFSET)),
Review Comment:
What's up with the `requireNonNull`? We can validate it during insertions,
for example, not during the read
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java:
##########
@@ -58,35 +58,35 @@ class FindRowVersion implements
PageMemoryTraversal<RowVersionFilter> {
}
@Override
- public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, RowVersionFilter arg) {
- if (!rowVersionFounded) {
- long nextLink = readPartitionless(partitionId, pageAddr,
payload.offset() + RowVersion.NEXT_LINK_OFFSET);
-
- if (arg.apply(link, pageAddr + payload.offset())) {
- rowVersionFounded = true;
-
- rowLink = link;
- rowTimestamp = HybridTimestamps.readTimestamp(pageAddr,
payload.offset() + RowVersion.TIMESTAMP_OFFSET);
- rowNextLink = nextLink;
-
- if (!loadValueBytes) {
- rowValueSize = PageUtils.getInt(pageAddr, payload.offset()
+ RowVersion.VALUE_SIZE_OFFSET);
-
- return STOP_TRAVERSAL;
- } else {
- return readRowVersionValue.consumePagePayload(link,
pageAddr, payload, null);
- }
- } else {
- return nextLink;
- }
- } else {
+ public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, RowVersionFilter filter) {
+ if (rowVersionFound) {
+ return readRowVersionValue.consumePagePayload(link, pageAddr,
payload, null);
+ }
+
+ long nextLink = readPartitionless(partitionId, pageAddr,
payload.offset() + RowVersion.NEXT_LINK_OFFSET);
+
+ if (!filter.apply(link, pageAddr + payload.offset())) {
+ return nextLink;
+ }
+
+ rowVersionFound = true;
+
+ rowLink = link;
+ rowTimestamp = HybridTimestamps.readTimestamp(pageAddr,
payload.offset() + RowVersion.TIMESTAMP_OFFSET);
+ rowNextLink = nextLink;
+
+ if (loadValueBytes) {
return readRowVersionValue.consumePagePayload(link, pageAddr,
payload, null);
}
+
+ rowValueSize = PageUtils.getInt(pageAddr, payload.offset() +
RowVersion.VALUE_SIZE_OFFSET);
Review Comment:
btw, you don't read size if you need to load the value. What's the reason
for such behavior? Don't we need a valid value for it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]