rpuch commented on code in PR #1716:
URL: https://github.com/apache/ignite-3/pull/1716#discussion_r1119644744
##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/storage/DataStorageConfigurationSchema.java:
##########
@@ -28,4 +29,8 @@ public class DataStorageConfigurationSchema {
/** Name of data storage. */
@PolymorphicId(hasDefault = true)
public String name =
UnknownDataStorageConfigurationSchema.UNKNOWN_DATA_STORAGE;
+
+ /** The number of entries in the storage to be garbage collected during
the storage update operation. */
Review Comment:
```suggestion
/** The number of entries in the storage to be garbage collected during
a storage update operation. */
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -150,6 +224,26 @@ public void handleUpdateAll(
});
}
+ private void executeBatchGc(@Nullable HybridTimestamp newLwm) {
+ if (newLwm != null) {
Review Comment:
How about using the Early Return pattern? (that is, `if (newLwm == null) {
return; }` in the very beginning of the method, so the main method logic does
not need to be indented)
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -53,21 +56,46 @@ public class StorageUpdateHandler {
private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes;
+ /** Last recorded GC low watermark. */
+ private final AtomicReference<HybridTimestamp> lastRecordedLwm = new
AtomicReference<>();
+
+ /** The number of entries in the storage to be garbage collected during
the storage update operation. */
+ private final int gcOnUpdateBatchSize;
+
/**
* The constructor.
*
* @param partitionId Partition id.
* @param storage Partition data storage.
* @param indexes Indexes supplier.
*/
+ @TestOnly
public StorageUpdateHandler(int partitionId, PartitionDataStorage storage,
Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes) {
+ this(partitionId, storage, indexes, 5);
+ }
+
+ /**
+ * The constructor.
+ *
+ * @param partitionId Partition id.
+ * @param storage Partition data storage.
+ * @param indexes Indexes supplier.
+ * @param gcOnUpdateBatchSize The number of entries in the storage to be
garbage collected during the storage update operation.
Review Comment:
```suggestion
* @param gcOnUpdateBatchSize The number of entries in the storage to be
garbage collected during a storage update operation.
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -263,26 +357,53 @@ private void tryRemoveFromIndexes(BinaryRow rowToRemove,
RowId rowId, Cursor<Rea
* @see MvPartitionStorage#pollForVacuum(HybridTimestamp)
*/
public boolean vacuum(HybridTimestamp lowWatermark) {
- return storage.runConsistently(() -> {
- BinaryRowAndRowId vacuumed = storage.pollForVacuum(lowWatermark);
+ return storage.runConsistently(() -> internalVacuum(lowWatermark));
+ }
- if (vacuumed == null) {
- // Nothing was garbage collected.
- return false;
+ /**
+ * Tries removing {@code count} oldest stale entries and their indexes.
+ * If there's less entries that can be removed, then exits prematurely.
+ *
+ * @param lowWatermark Low watermark for the vacuum.
+ * @param count Count of entries to GC.
+ */
+ public void vacuumBatch(HybridTimestamp lowWatermark, int count) {
Review Comment:
This can be a private method. Do we really need it to be public?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1985,9 +1994,16 @@ public void onUpdate(WatchEvent evt) {
MvPartitionStorage mvPartitionStorage =
partitionStorages.getMvPartitionStorage();
TxStateStorage txStatePartitionStorage =
partitionStorages.getTxStateStorage();
+ DataStorageConfiguration dsCfg = tblCfg.dataStorage();
+ Integer gcOnUpdateBatchSize =
dsCfg.gcOnUpdateBatchSize().value();
Review Comment:
```suggestion
int gcOnUpdateBatchSize =
dsCfg.gcOnUpdateBatchSize().value();
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteConcurrentTest.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atMostOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/** Tests for concurrent cooperative GC (GC that is executed on write). */
+public class PartitionGcOnWriteConcurrentTest {
Review Comment:
Do we need concurrency tests here? RAFT commands get executed sequentially
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -81,8 +109,32 @@ public void handleUpdate(
TablePartitionId commitPartitionId,
@Nullable ByteBuffer rowBuffer,
@Nullable Consumer<RowId> onReplication
+ ) {
+ handleUpdate(txId, rowUuid, commitPartitionId, rowBuffer,
onReplication, null);
+ }
+
+ /**
+ * Handles single update.
+ *
+ * @param txId Transaction id.
+ * @param rowUuid Row UUID.
+ * @param commitPartitionId Commit partition id.
+ * @param rowBuffer Row buffer.
+ * @param onReplication Callback on replication.
+ * @param lowWatermark GC low watermark.
Review Comment:
What does it mean that `null` as passed as a watermark?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -53,21 +56,46 @@ public class StorageUpdateHandler {
private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes;
+ /** Last recorded GC low watermark. */
+ private final AtomicReference<HybridTimestamp> lastRecordedLwm = new
AtomicReference<>();
+
+ /** The number of entries in the storage to be garbage collected during
the storage update operation. */
Review Comment:
```suggestion
/** The number of entries in the storage to be garbage collected during
a storage update operation. */
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -150,6 +224,26 @@ public void handleUpdateAll(
});
}
+ private void executeBatchGc(@Nullable HybridTimestamp newLwm) {
+ if (newLwm != null) {
+ @Nullable HybridTimestamp oldLwm;
+ do {
+ oldLwm = lastRecordedLwm.get();
+
+ if (oldLwm != null && newLwm.compareTo(oldLwm) <= 0) {
+ break;
+ }
+ } while (!lastRecordedLwm.compareAndSet(oldLwm, newLwm));
+
+ if (oldLwm == null || newLwm.compareTo(oldLwm) > 0) {
+ // Iff the lwm we have is the new lwm.
+ // Otherwise our newLwm is either was smaller than last
recorded lwm or last recorded lwm has changed
Review Comment:
```suggestion
// Otherwise our newLwm is either smaller than last recorded
lwm or last recorded lwm has changed
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -741,8 +742,16 @@ private void
updateAssignmentInternal(ConfigurationNotificationEvent<byte[]> ass
.thenApply(partitionStorages ->
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
internalTbl, partId));
+ DataStorageConfiguration dsCfg = tblCfg.dataStorage();
+ Integer gcOnUpdateBatchSize =
dsCfg.gcOnUpdateBatchSize().value();
Review Comment:
```suggestion
int gcOnUpdateBatchSize =
dsCfg.gcOnUpdateBatchSize().value();
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -81,8 +109,32 @@ public void handleUpdate(
TablePartitionId commitPartitionId,
@Nullable ByteBuffer rowBuffer,
@Nullable Consumer<RowId> onReplication
+ ) {
+ handleUpdate(txId, rowUuid, commitPartitionId, rowBuffer,
onReplication, null);
+ }
+
+ /**
+ * Handles single update.
+ *
+ * @param txId Transaction id.
+ * @param rowUuid Row UUID.
+ * @param commitPartitionId Commit partition id.
+ * @param rowBuffer Row buffer.
+ * @param onReplication Callback on replication.
+ * @param lowWatermark GC low watermark.
+ */
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-18909 Pass low
watermark.
+ public void handleUpdate(
+ UUID txId,
+ UUID rowUuid,
+ TablePartitionId commitPartitionId,
+ @Nullable ByteBuffer rowBuffer,
+ @Nullable Consumer<RowId> onReplication,
+ @Nullable HybridTimestamp lowWatermark
Review Comment:
Looks like LWM can be non-null only in tests. Is this code coding to be
wired to production code later?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -150,6 +224,26 @@ public void handleUpdateAll(
});
}
+ private void executeBatchGc(@Nullable HybridTimestamp newLwm) {
+ if (newLwm != null) {
+ @Nullable HybridTimestamp oldLwm;
+ do {
+ oldLwm = lastRecordedLwm.get();
+
+ if (oldLwm != null && newLwm.compareTo(oldLwm) <= 0) {
+ break;
+ }
+ } while (!lastRecordedLwm.compareAndSet(oldLwm, newLwm));
Review Comment:
This method is called from RAFT listeners that serialize accesses. Do we
really need a CAS here?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -150,6 +224,26 @@ public void handleUpdateAll(
});
}
+ private void executeBatchGc(@Nullable HybridTimestamp newLwm) {
+ if (newLwm != null) {
+ @Nullable HybridTimestamp oldLwm;
+ do {
+ oldLwm = lastRecordedLwm.get();
+
+ if (oldLwm != null && newLwm.compareTo(oldLwm) <= 0) {
+ break;
+ }
+ } while (!lastRecordedLwm.compareAndSet(oldLwm, newLwm));
+
+ if (oldLwm == null || newLwm.compareTo(oldLwm) > 0) {
+ // Iff the lwm we have is the new lwm.
+ // Otherwise our newLwm is either was smaller than last
recorded lwm or last recorded lwm has changed
+ // concurrently and it become greater. If that's the case,
another thread will perform the GC.
+ vacuumBatch(newLwm, gcOnUpdateBatchSize);
Review Comment:
The JIRA ticket says:
> To account for that, each update can be preceded with the manual GC of
**that** row.
But here, a different approach is implemented (using the queue). Should this
be noted in the JIRA issue to make sure that later it is easier to understand
what was going on?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]