rpuch commented on code in PR #3414:
URL: https://github.com/apache/ignite-3/pull/3414#discussion_r1528587642
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -101,13 +115,17 @@ public void close() {
/**
* Collect indexes for {@link PartitionAccess#addWrite(RowId, BinaryRow,
UUID, int, int, int)} (write intent).
*
+ * <p>NOTE: When updating a low watermark, the index storages that were
returned from the method may begin to be destroyed, such a
+ * situation should occur by the calling code.</p>
Review Comment:
```suggestion
* situation should be handled by the calling code.</p>
```
##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -350,7 +350,7 @@ public class IgniteImpl implements Ignite {
private final ClockWaiter clockWaiter;
- private final LowWatermark lowWatermark;
+ private final LowWatermarkImpl lowWatermarkImpl;
Review Comment:
```suggestion
private final LowWatermarkImpl lowWatermark;
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandler.java:
##########
@@ -247,13 +249,52 @@ private TableSchemaAwareIndexStorage[]
indexesSnapshot(@Nullable List<Integer> i
* @throws StorageException If failed to set the row ID.
*/
public void setNextRowIdToBuildIndex(int indexId, @Nullable RowId rowId) {
+ TableSchemaAwareIndexStorage indexStorage =
indexStorageById().get(indexId);
+
+ // We assume that if the index is missing in indexStorageById, then it
has begun to be destroyed and we do not need it.
+ if (indexStorage != null) {
+ setNextRowIdToBuildToIndex(indexStorage, rowId);
+ }
+ }
+
+ private Map<Integer, TableSchemaAwareIndexStorage> indexStorageById() {
Map<Integer, TableSchemaAwareIndexStorage> indexStorageById =
indexes.get();
- TableSchemaAwareIndexStorage indexStorage =
indexStorageById.get(indexId);
+ assert !indexStorageById.isEmpty();
- if (indexStorage != null) {
- // TODO: IGNITE-21514 Handle index destruction
- indexStorage.storage().setNextRowIdToBuild(rowId);
+ return indexStorageById;
+ }
+
+ private static void putToIndex(TableSchemaAwareIndexStorage indexStorage,
BinaryRow binaryRow, RowId rowId) {
+ try {
+ indexStorage.put(binaryRow, rowId);
+ } catch (StorageDestroyedException ignore) {
+ // Index is in the process of being destroyed, which means there
is no need to write to it.
Review Comment:
How about logging this attempt to write to a destroyed index at debug()?
Might be useful
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java:
##########
@@ -17,264 +17,29 @@
package org.apache.ignite.internal.table.distributed;
-import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.internal.failure.FailureContext;
-import org.apache.ignite.internal.failure.FailureProcessor;
-import org.apache.ignite.internal.hlc.HybridClock;
+import java.util.function.Consumer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.lang.NodeStoppingException;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.manager.IgniteComponent;
-import
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.util.ByteUtils;
-import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.vault.VaultEntry;
-import org.apache.ignite.internal.vault.VaultManager;
import org.jetbrains.annotations.Nullable;
/**
- * Class to manage the low watermark.
- *
- * <p>Low watermark is the node's local time, which ensures that read-only
transactions have completed by this time, and new read-only
+ * Low watermark is the node's local time, which ensures that read-only
transactions have completed by this time, and new read-only
* transactions will only be created after this time, and we can safely delete
obsolete/garbage data such as: obsolete versions of table
* rows, remote indexes, remote tables, etc.
*
* @see <a
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol">IEP-91</a>
*/
-public class LowWatermark implements IgniteComponent {
- private static final IgniteLogger LOG =
Loggers.forClass(LowWatermark.class);
-
- static final ByteArray LOW_WATERMARK_VAULT_KEY = new
ByteArray("low-watermark");
-
- private final LowWatermarkConfiguration lowWatermarkConfig;
-
- private final HybridClock clock;
-
- private final TxManager txManager;
-
- private final VaultManager vaultManager;
-
- private final List<LowWatermarkChangedListener> updateListeners = new
CopyOnWriteArrayList<>();
-
- private final ScheduledExecutorService scheduledThreadPool;
-
- private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
-
- private final AtomicBoolean closeGuard = new AtomicBoolean();
-
- private volatile @Nullable HybridTimestamp lowWatermark;
-
- private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture
= new AtomicReference<>();
-
- private final FailureProcessor failureProcessor;
-
- /**
- * Constructor.
- *
- * @param nodeName Node name.
- * @param lowWatermarkConfig Low watermark configuration.
- * @param clock A hybrid logical clock.
- * @param txManager Transaction manager.
- * @param vaultManager Vault manager.
- * @param failureProcessor Failure processor tha is used to handle
critical errors.
- */
- public LowWatermark(
- String nodeName,
- LowWatermarkConfiguration lowWatermarkConfig,
- HybridClock clock,
- TxManager txManager,
- VaultManager vaultManager,
- FailureProcessor failureProcessor
- ) {
- this.lowWatermarkConfig = lowWatermarkConfig;
- this.clock = clock;
- this.txManager = txManager;
- this.vaultManager = vaultManager;
- this.failureProcessor = failureProcessor;
-
- scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
- NamedThreadFactory.create(nodeName, "low-watermark-updater",
LOG)
- );
- }
-
- /**
- * Starts the watermark manager.
- */
- @Override
- public CompletableFuture<Void> start() {
- inBusyLock(busyLock, () -> {
- lowWatermark = readLowWatermarkFromVault();
- });
-
- return nullCompletedFuture();
- }
+public interface LowWatermark {
+ /** Returns the current low watermark, {@code null} means no low watermark
has been assigned yet. */
+ @Nullable HybridTimestamp getLowWatermark();
/**
- * Schedule watermark updates.
+ * Gets a low watermark that will not change until the consumer is
executed, {@code null} means no low watermark has been assigned yet.
*/
- public void scheduleUpdates() {
- inBusyLock(busyLock, () -> {
- HybridTimestamp lowWatermarkCandidate = lowWatermark;
-
- if (lowWatermarkCandidate == null) {
- LOG.info("Previous value of the low watermark was not found,
will schedule to update it");
-
- scheduleUpdateLowWatermarkBusy();
-
- return;
- }
-
- LOG.info("Low watermark has been scheduled to be updated: {}",
lowWatermarkCandidate);
-
- txManager.updateLowWatermark(lowWatermarkCandidate)
- .thenComposeAsync(unused -> inBusyLock(busyLock, () ->
notifyListeners(lowWatermarkCandidate)), scheduledThreadPool)
- .whenComplete((unused, throwable) -> {
- if (throwable == null) {
- inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
- } else if (!(throwable instanceof
NodeStoppingException)) {
- LOG.error("Error during the Watermark manager
start", throwable);
-
- failureProcessor.process(new
FailureContext(CRITICAL_ERROR, throwable));
-
- inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
- }
- });
- });
- }
-
- private @Nullable HybridTimestamp readLowWatermarkFromVault() {
- VaultEntry vaultEntry = vaultManager.get(LOW_WATERMARK_VAULT_KEY);
-
- return vaultEntry == null ? null :
ByteUtils.fromBytes(vaultEntry.value());
- }
-
- @Override
- public void stop() {
- if (!closeGuard.compareAndSet(false, true)) {
- return;
- }
-
- busyLock.block();
-
- ScheduledFuture<?> lastScheduledTaskFuture =
this.lastScheduledTaskFuture.get();
-
- if (lastScheduledTaskFuture != null) {
- lastScheduledTaskFuture.cancel(true);
- }
-
- IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10,
TimeUnit.SECONDS);
- }
-
- /**
- * Returns the current low watermark, {@code null} means no low watermark
has been assigned yet.
- */
- public @Nullable HybridTimestamp getLowWatermark() {
- return lowWatermark;
- }
-
- CompletableFuture<Void> updateLowWatermark() {
- return inBusyLock(busyLock, () -> {
- HybridTimestamp lowWatermarkCandidate =
createNewLowWatermarkCandidate();
-
- // Wait until all the read-only transactions are finished before
the new candidate, since no new RO transactions could be
- // created, then we can safely promote the candidate as a new low
watermark, store it in vault, and we can safely start cleaning
- // up the stale/junk data in the tables.
- return txManager.updateLowWatermark(lowWatermarkCandidate)
- .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
- vaultManager.put(LOW_WATERMARK_VAULT_KEY,
ByteUtils.toBytes(lowWatermarkCandidate));
-
- lowWatermark = lowWatermarkCandidate;
-
- return notifyListeners(lowWatermarkCandidate);
- }), scheduledThreadPool)
- .whenComplete((unused, throwable) -> {
- if (throwable != null) {
- if (!(throwable instanceof NodeStoppingException))
{
- LOG.error("Failed to update low watermark,
will schedule again: {}", throwable, lowWatermarkCandidate);
-
- inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
- }
- } else {
- LOG.info("Successful low watermark update: {}",
lowWatermarkCandidate);
-
- scheduleUpdateLowWatermarkBusy();
- }
- });
- });
- }
-
- public void addUpdateListener(LowWatermarkChangedListener listener) {
- updateListeners.add(listener);
- }
-
- public void removeUpdateListener(LowWatermarkChangedListener listener) {
- updateListeners.remove(listener);
- }
-
- private CompletableFuture<Void> notifyListeners(HybridTimestamp
lowWatermark) {
- if (updateListeners.isEmpty()) {
- return nullCompletedFuture();
- }
-
- ArrayList<CompletableFuture<?>> res = new ArrayList<>();
- for (LowWatermarkChangedListener updateListener : updateListeners) {
- res.add(updateListener.onLwmChanged(lowWatermark));
- }
-
- return CompletableFuture.allOf(res.toArray(CompletableFuture[]::new));
- }
-
- private void scheduleUpdateLowWatermarkBusy() {
- ScheduledFuture<?> previousScheduledFuture =
this.lastScheduledTaskFuture.get();
-
- assert previousScheduledFuture == null ||
previousScheduledFuture.isDone() : "previous scheduled task has not finished";
-
- ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule(
- this::updateLowWatermark,
- lowWatermarkConfig.updateFrequency().value(),
- TimeUnit.MILLISECONDS
- );
-
- boolean casResult =
lastScheduledTaskFuture.compareAndSet(previousScheduledFuture,
newScheduledFuture);
-
- assert casResult : "only one scheduled task is expected";
- }
-
- HybridTimestamp createNewLowWatermarkCandidate() {
- HybridTimestamp now = clock.now();
-
- HybridTimestamp lowWatermarkCandidate = now.addPhysicalTime(
- -lowWatermarkConfig.dataAvailabilityTime().value() -
getMaxClockSkew()
- );
-
- HybridTimestamp lowWatermark = this.lowWatermark;
-
- assert lowWatermark == null ||
lowWatermarkCandidate.compareTo(lowWatermark) > 0 :
- "lowWatermark=" + lowWatermark + ", lowWatermarkCandidate=" +
lowWatermarkCandidate;
+ void getLowWatermarkSafe(Consumer<@Nullable HybridTimestamp> consumer);
Review Comment:
I think a better name can be invented. This `safe` is pretty vague. How
about `withPinnedLowWatermark(Consumer<...>)`?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -101,13 +115,17 @@ public void close() {
/**
* Collect indexes for {@link PartitionAccess#addWrite(RowId, BinaryRow,
UUID, int, int, int)} (write intent).
*
+ * <p>NOTE: When updating a low watermark, the index storages that were
returned from the method may begin to be destroyed, such a
+ * situation should occur by the calling code.</p>
+ *
* <p>Index selection algorithm:</p>
* <ul>
* <li>If the index in the snapshot catalog version is in status
{@link CatalogIndexStatus#BUILDING},
- * {@link CatalogIndexStatus#AVAILABLE} or {@link
CatalogIndexStatus#STOPPING}.</li>
+ * {@link CatalogIndexStatus#AVAILABLE} or {@link
CatalogIndexStatus#STOPPING} and not removed in latest catalog version.</li>
* <li>If the index in status {@link CatalogIndexStatus#REGISTERED}
and it is in this status on the active version of the catalog
- * for {@code beginTs}.</li>
- * <li>For a read-only index, if {@code beginTs} is strictly less than
the activation time of dropping the index.</li>
+ * for {@code beginTs} and not removed in latest catalog version.</li>
Review Comment:
```suggestion
* for {@code beginTs} and not removed in the latest catalog
version.</li>
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandlerTest.java:
##########
@@ -37,47 +50,246 @@
public class IndexUpdateHandlerTest extends BaseIgniteAbstractTest {
private static final int PARTITION_ID = 0;
+ private static final int INDEX_ID = 1;
+
@Test
void testBuildIndex() {
TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
- int indexId = 1;
-
- TableIndexStoragesSupplier indexes =
mock(TableIndexStoragesSupplier.class);
-
- when(indexes.get()).thenReturn(Map.of(indexId, indexStorage));
-
- IndexUpdateHandler indexUpdateHandler = new
IndexUpdateHandler(indexes);
+ IndexUpdateHandler indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
BinaryRowAndRowId row0 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
BinaryRowAndRowId row1 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
- indexUpdateHandler.buildIndex(indexId, Stream.of(row0, row1),
row1.rowId().increment());
+ indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row0, row1),
row1.rowId().increment());
verify(indexStorage).put(row0.binaryRow(), row0.rowId());
verify(indexStorage).put(row1.binaryRow(), row1.rowId());
verify(indexStorage.storage()).setNextRowIdToBuild(row1.rowId().increment());
// Let's check one more batch - it will be the finishing one.
- clearInvocations(indexes, indexStorage);
+ clearInvocations(indexStorage);
BinaryRowAndRowId row2 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
- indexUpdateHandler.buildIndex(indexId, Stream.of(row2), null);
+ indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row2), null);
verify(indexStorage).put(row2.binaryRow(), row2.rowId());
verify(indexStorage.storage()).setNextRowIdToBuild(null);
}
+ @Test
+ void testAddToIndexesOnDestroyIndexes() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageDestroyedException.class).when(indexStorage).put(any(),
any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ var row = mock(BinaryRow.class);
+ var rowId = new RowId(PARTITION_ID);
+
+ assertDoesNotThrow(() -> indexUpdateHandler.addToIndexes(row, rowId,
List.of(INDEX_ID, INDEX_ID + 1)));
+
+ verify(indexStorage).put(eq(row), eq(rowId));
+ }
+
+ @Test
+ void testAddToIndexesWithStorageException() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ assertThrows(
+ StorageException.class,
+ () -> indexUpdateHandler.addToIndexes(mock(BinaryRow.class),
new RowId(PARTITION_ID), List.of(INDEX_ID))
+ );
+ }
+
+ @Test
+ void testAddToIndexOnDestroyIndexes() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageDestroyedException.class).when(indexStorage).put(any(),
any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ var row = mock(BinaryRow.class);
+ var rowId = new RowId(PARTITION_ID);
+
+ assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId,
INDEX_ID));
+ assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId,
INDEX_ID + 1));
+
+ verify(indexStorage).put(eq(row), eq(rowId));
+ }
+
+ @Test
+ void testAddToIndexWithStorageException() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ assertThrows(StorageException.class, () ->
indexUpdateHandler.addToIndex(mock(BinaryRow.class), new RowId(PARTITION_ID),
INDEX_ID));
+ }
+
+ @Test
+ void testBuildIndexOnDestroyIndex() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+ IndexStorage storage = indexStorage.storage();
+
+ doThrow(StorageDestroyedException.class).when(indexStorage).put(any(),
any());
+
doThrow(StorageDestroyedException.class).when(storage).setNextRowIdToBuild(any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ var row = mock(BinaryRow.class);
+ var rowId = new RowId(PARTITION_ID);
+ var binaryRowAndRowId = new BinaryRowAndRowId(row, rowId);
+
+ assertDoesNotThrow(() -> indexUpdateHandler.buildIndex(INDEX_ID,
Stream.of(binaryRowAndRowId), rowId));
+ assertDoesNotThrow(() -> indexUpdateHandler.buildIndex(INDEX_ID + 1,
Stream.of(binaryRowAndRowId), rowId));
+
+ verify(indexStorage).put(eq(row), eq(rowId));
+ verify(storage).setNextRowIdToBuild(eq(rowId));
+ }
+
+ @Test
+ void testBuildIndexWithStorageException() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ var rowId = new RowId(PARTITION_ID);
+ var binaryRowAndRowId = new BinaryRowAndRowId(mock(BinaryRow.class),
rowId);
+
+ assertThrows(StorageException.class, () ->
indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(binaryRowAndRowId), rowId));
+
+ IndexStorage storage = indexStorage.storage();
+
+ doNothing().when(indexStorage).put(any(), any());
+
doThrow(StorageException.class).when(storage).setNextRowIdToBuild(any());
+
+ assertThrows(StorageException.class, () ->
indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(binaryRowAndRowId), rowId));
+ }
+
+ @Test
+ void testGetNextRowIdToBuildIndexOnDestroyIndex() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+ IndexStorage storage = indexStorage.storage();
+
+
doThrow(StorageDestroyedException.class).when(storage).getNextRowIdToBuild();
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ assertNull(indexUpdateHandler.getNextRowIdToBuildIndex(INDEX_ID));
+ assertNull(indexUpdateHandler.getNextRowIdToBuildIndex(INDEX_ID + 1));
+
+ verify(storage).getNextRowIdToBuild();
+ }
+
+ @Test
+ void testGetNextRowIdToBuildIndexWithStorageException() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+ IndexStorage storage = indexStorage.storage();
+
+ doThrow(StorageException.class).when(storage).getNextRowIdToBuild();
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ assertThrows(StorageException.class, () ->
indexUpdateHandler.getNextRowIdToBuildIndex(INDEX_ID));
+ }
+
+ @Test
+ void testSetNextRowIdToBuildIndexOnDestroyIndex() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+ IndexStorage storage = indexStorage.storage();
+
+
doThrow(StorageDestroyedException.class).when(storage).setNextRowIdToBuild(any());
+
+ var rowId = new RowId(PARTITION_ID);
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ assertDoesNotThrow(() ->
indexUpdateHandler.setNextRowIdToBuildIndex(INDEX_ID, rowId));
+ assertDoesNotThrow(() ->
indexUpdateHandler.setNextRowIdToBuildIndex(INDEX_ID + 1, rowId));
+
+ verify(storage).setNextRowIdToBuild(eq(rowId));
+ }
+
+ @Test
+ void testSetNextRowIdToBuildIndexWithStorageException() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+ IndexStorage storage = indexStorage.storage();
+
+
doThrow(StorageException.class).when(storage).setNextRowIdToBuild(any());
+
+ var rowId = new RowId(PARTITION_ID);
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ assertThrows(StorageException.class, () ->
indexUpdateHandler.setNextRowIdToBuildIndex(INDEX_ID, rowId));
+ }
+
+ @Test
+ void testTryRemoveFromIndexesOnDestroyIndex() {
Review Comment:
```suggestion
void testTryRemoveFromIndexesOnDestroyedIndex() {
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -238,21 +260,36 @@ private static List<Integer>
mergeWithoutDuplicates(List<Integer> l0, List<Integ
private void addListenersBusy() {
catalogService.listen(INDEX_CREATE,
fromConsumer(this::onIndexCreated));
catalogService.listen(INDEX_REMOVED,
fromConsumer(this::onIndexRemoved));
+
+ lowWatermark.addUpdateListener(this::onLwmChanged);
}
private void onIndexRemoved(RemoveIndexEventParameters parameters) {
inBusyLock(busyLock, () -> {
int indexId = parameters.indexId();
int catalogVersion = parameters.catalogVersion();
- CatalogIndexDescriptor index = indexBusy(indexId, catalogVersion -
1);
+ lowWatermark.getLowWatermarkSafe(lwm -> {
+ int lwmCatalogVersion =
catalogService.activeCatalogVersion(hybridTimestampToLong(lwm));
- if (index.status() == AVAILABLE) {
- // On drop table event.
- readOnlyIndexes.add(new ReadOnlyIndexInfo(index,
catalogActivationTimestampBusy(catalogVersion)));
- } else if (index.status() == STOPPING) {
- readOnlyIndexes.add(new ReadOnlyIndexInfo(index,
findStoppingActivationTsBusy(indexId, catalogVersion - 1)));
- }
+ if (catalogVersion <= lwmCatalogVersion) {
+ // There is no need to add a read-only indexes, since the
index should be destroyed under the updated low watermark.
+ tableVersionByIndexId.remove(indexId);
+ } else {
+ CatalogIndexDescriptor index = indexBusy(indexId,
catalogVersion - 1);
+
+ if (index.status() == AVAILABLE) {
+ // On drop table event.
+ readOnlyIndexes.add(new ReadOnlyIndexInfo(index,
catalogActivationTimestampBusy(catalogVersion), catalogVersion));
+ } else if (index.status() == STOPPING) {
+ readOnlyIndexes.add(
+ new ReadOnlyIndexInfo(index,
findStoppingActivationTsBusy(indexId, catalogVersion - 1), catalogVersion)
+ );
+ } else {
+ tableVersionByIndexId.remove(indexId);
Review Comment:
Please add a comment what this case means (probably, it's about an index
that is dropped before even becoming available?)
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -138,11 +156,15 @@ public List<IndexIdAndTableVersion> chooseForAddWrite(int
catalogVersion, int ta
/**
* Collect indexes for {@link PartitionAccess#addWriteCommitted(RowId,
BinaryRow, HybridTimestamp, int)} (write committed only).
*
+ * <p>NOTE: When updating a low watermark, the index storages that were
returned from the method may begin to be destroyed, such a
+ * situation should occur by the calling code.</p>
Review Comment:
```suggestion
* situation should be handled by the calling code.</p>
```
##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/distributed/TestLowWatermarkImpl.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/** Test implementation. */
+public class TestLowWatermarkImpl implements LowWatermark {
Review Comment:
```suggestion
public class TestLowWatermark implements LowWatermark {
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -274,41 +311,56 @@ private long catalogActivationTimestampBusy(int
catalogVersion) {
return catalog.time();
}
+ // TODO: IGNITE-21771 Deal with catalog compaction
private void recoverStructuresBusy() {
int earliestCatalogVersion = catalogService.earliestCatalogVersion();
int latestCatalogVersion = catalogService.latestCatalogVersion();
+ int lwnCatalogVersion =
catalogService.activeCatalogVersion(hybridTimestampToLong(lowWatermark.getLowWatermark()));
- var readOnlyIndexById = new HashMap<Integer, ReadOnlyIndexInfo>();
- var previousCatalogVersionTableIds = Set.<Integer>of();
- var tableVersionById = new HashMap<Integer, Integer>();
+ var tableVersionByIndexId = new HashMap<Integer, Integer>();
+ var readOnlyIndexes = new HashSet<ReadOnlyIndexInfo>();
+
+ var stoppingActivationTsByIndexId = new HashMap<Integer, Long>();
+ var previousCatalogVersionIndexIds = Set.<Integer>of();
- // TODO: IGNITE-21514 Deal with catalog compaction
for (int catalogVersion = earliestCatalogVersion; catalogVersion <=
latestCatalogVersion; catalogVersion++) {
- long activationTs = catalogActivationTimestampBusy(catalogVersion);
int finalCatalogVersion = catalogVersion;
- for (CatalogIndexDescriptor index :
catalogService.indexes(catalogVersion)) {
- tableVersionById.computeIfAbsent(index.id(), i ->
tableVersionBusy(index, finalCatalogVersion));
+ var indexIds = new HashSet<Integer>();
+
+ catalogService.indexes(finalCatalogVersion).forEach(index -> {
+ tableVersionByIndexId.computeIfAbsent(index.id(), i ->
tableVersionBusy(index, finalCatalogVersion));
if (index.status() == STOPPING) {
- readOnlyIndexById.computeIfAbsent(index.id(), i -> new
ReadOnlyIndexInfo(index, activationTs));
+ stoppingActivationTsByIndexId.computeIfAbsent(index.id(),
i -> catalogActivationTimestampBusy(finalCatalogVersion));
}
- }
- Set<Integer> currentCatalogVersionTableIds =
tableIds(catalogVersion);
-
- // Here we look for indices that transitioned directly from
AVAILABLE to [deleted] (corresponding to the logical READ_ONLY
- // state) as such transitions only happen when a table is dropped.
- difference(previousCatalogVersionTableIds,
currentCatalogVersionTableIds).stream()
- .flatMap(droppedTableId ->
catalogService.indexes(finalCatalogVersion - 1, droppedTableId).stream())
- .filter(index -> index.status() == AVAILABLE)
- .forEach(index ->
readOnlyIndexById.computeIfAbsent(index.id(), i -> new ReadOnlyIndexInfo(index,
activationTs)));
+ indexIds.add(index.id());
+ });
- previousCatalogVersionTableIds = currentCatalogVersionTableIds;
+ // We are looking for removed indexes.
+ difference(previousCatalogVersionIndexIds, indexIds).stream()
+ .map(index -> catalogService.index(index,
finalCatalogVersion - 1))
Review Comment:
```suggestion
.map(indexId -> catalogService.index(index,
finalCatalogVersion - 1))
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -274,41 +311,56 @@ private long catalogActivationTimestampBusy(int
catalogVersion) {
return catalog.time();
}
+ // TODO: IGNITE-21771 Deal with catalog compaction
private void recoverStructuresBusy() {
int earliestCatalogVersion = catalogService.earliestCatalogVersion();
int latestCatalogVersion = catalogService.latestCatalogVersion();
+ int lwnCatalogVersion =
catalogService.activeCatalogVersion(hybridTimestampToLong(lowWatermark.getLowWatermark()));
Review Comment:
```suggestion
int lwmCatalogVersion =
catalogService.activeCatalogVersion(hybridTimestampToLong(lowWatermark.getLowWatermark()));
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -138,11 +156,15 @@ public List<IndexIdAndTableVersion> chooseForAddWrite(int
catalogVersion, int ta
/**
* Collect indexes for {@link PartitionAccess#addWriteCommitted(RowId,
BinaryRow, HybridTimestamp, int)} (write committed only).
*
+ * <p>NOTE: When updating a low watermark, the index storages that were
returned from the method may begin to be destroyed, such a
+ * situation should occur by the calling code.</p>
+ *
* <p>Index selection algorithm:</p>
* <ul>
* <li>If the index in the snapshot catalog version is in status
{@link CatalogIndexStatus#BUILDING},
- * {@link CatalogIndexStatus#AVAILABLE} or {@link
CatalogIndexStatus#STOPPING}.</li>
- * <li>For a read-only index, if {@code commitTs} is strictly less
than the activation time of dropping the index.</li>
+ * {@link CatalogIndexStatus#AVAILABLE} or {@link
CatalogIndexStatus#STOPPING} and not removed in latest catalog version.</li>
Review Comment:
```suggestion
* {@link CatalogIndexStatus#AVAILABLE} or {@link
CatalogIndexStatus#STOPPING} and not removed in the latest catalog version.</li>
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -101,13 +115,17 @@ public void close() {
/**
* Collect indexes for {@link PartitionAccess#addWrite(RowId, BinaryRow,
UUID, int, int, int)} (write intent).
*
+ * <p>NOTE: When updating a low watermark, the index storages that were
returned from the method may begin to be destroyed, such a
+ * situation should occur by the calling code.</p>
+ *
* <p>Index selection algorithm:</p>
* <ul>
* <li>If the index in the snapshot catalog version is in status
{@link CatalogIndexStatus#BUILDING},
- * {@link CatalogIndexStatus#AVAILABLE} or {@link
CatalogIndexStatus#STOPPING}.</li>
+ * {@link CatalogIndexStatus#AVAILABLE} or {@link
CatalogIndexStatus#STOPPING} and not removed in latest catalog version.</li>
Review Comment:
```suggestion
* {@link CatalogIndexStatus#AVAILABLE} or {@link
CatalogIndexStatus#STOPPING} and not removed in the latest catalog version.</li>
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java:
##########
@@ -238,21 +260,36 @@ private static List<Integer>
mergeWithoutDuplicates(List<Integer> l0, List<Integ
private void addListenersBusy() {
catalogService.listen(INDEX_CREATE,
fromConsumer(this::onIndexCreated));
catalogService.listen(INDEX_REMOVED,
fromConsumer(this::onIndexRemoved));
+
+ lowWatermark.addUpdateListener(this::onLwmChanged);
}
private void onIndexRemoved(RemoveIndexEventParameters parameters) {
inBusyLock(busyLock, () -> {
int indexId = parameters.indexId();
int catalogVersion = parameters.catalogVersion();
- CatalogIndexDescriptor index = indexBusy(indexId, catalogVersion -
1);
+ lowWatermark.getLowWatermarkSafe(lwm -> {
+ int lwmCatalogVersion =
catalogService.activeCatalogVersion(hybridTimestampToLong(lwm));
- if (index.status() == AVAILABLE) {
- // On drop table event.
- readOnlyIndexes.add(new ReadOnlyIndexInfo(index,
catalogActivationTimestampBusy(catalogVersion)));
- } else if (index.status() == STOPPING) {
- readOnlyIndexes.add(new ReadOnlyIndexInfo(index,
findStoppingActivationTsBusy(indexId, catalogVersion - 1)));
- }
+ if (catalogVersion <= lwmCatalogVersion) {
Review Comment:
Here, an index is not tracked anymore if the moment of its **removal** is
below the LWM. This seems correct, but we can stop tracking the index earlier:
when it's **dropped**. If it's dropped with its table, it's the same as
removal; but if it's dropped individually, then the moment of its drop is the
moment when it becomes STOPPED.
Same thing applies to the recovery logic and the LWM listener.
I'm not sure if this needs to be fixed now. Maybe just create a ticket and
add TODOs?
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandlerTest.java:
##########
@@ -37,47 +50,246 @@
public class IndexUpdateHandlerTest extends BaseIgniteAbstractTest {
private static final int PARTITION_ID = 0;
+ private static final int INDEX_ID = 1;
+
@Test
void testBuildIndex() {
TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
- int indexId = 1;
-
- TableIndexStoragesSupplier indexes =
mock(TableIndexStoragesSupplier.class);
-
- when(indexes.get()).thenReturn(Map.of(indexId, indexStorage));
-
- IndexUpdateHandler indexUpdateHandler = new
IndexUpdateHandler(indexes);
+ IndexUpdateHandler indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
BinaryRowAndRowId row0 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
BinaryRowAndRowId row1 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
- indexUpdateHandler.buildIndex(indexId, Stream.of(row0, row1),
row1.rowId().increment());
+ indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row0, row1),
row1.rowId().increment());
verify(indexStorage).put(row0.binaryRow(), row0.rowId());
verify(indexStorage).put(row1.binaryRow(), row1.rowId());
verify(indexStorage.storage()).setNextRowIdToBuild(row1.rowId().increment());
// Let's check one more batch - it will be the finishing one.
- clearInvocations(indexes, indexStorage);
+ clearInvocations(indexStorage);
BinaryRowAndRowId row2 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
- indexUpdateHandler.buildIndex(indexId, Stream.of(row2), null);
+ indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row2), null);
verify(indexStorage).put(row2.binaryRow(), row2.rowId());
verify(indexStorage.storage()).setNextRowIdToBuild(null);
}
+ @Test
+ void testAddToIndexesOnDestroyIndexes() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageDestroyedException.class).when(indexStorage).put(any(),
any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ var row = mock(BinaryRow.class);
+ var rowId = new RowId(PARTITION_ID);
+
+ assertDoesNotThrow(() -> indexUpdateHandler.addToIndexes(row, rowId,
List.of(INDEX_ID, INDEX_ID + 1)));
+
+ verify(indexStorage).put(eq(row), eq(rowId));
+ }
+
+ @Test
+ void testAddToIndexesWithStorageException() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ assertThrows(
+ StorageException.class,
+ () -> indexUpdateHandler.addToIndexes(mock(BinaryRow.class),
new RowId(PARTITION_ID), List.of(INDEX_ID))
+ );
+ }
+
+ @Test
+ void testAddToIndexOnDestroyIndexes() {
Review Comment:
```suggestion
void testAddToIndexOnDestroyedIndexes() {
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandlerTest.java:
##########
@@ -37,47 +50,246 @@
public class IndexUpdateHandlerTest extends BaseIgniteAbstractTest {
private static final int PARTITION_ID = 0;
+ private static final int INDEX_ID = 1;
+
@Test
void testBuildIndex() {
TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
- int indexId = 1;
-
- TableIndexStoragesSupplier indexes =
mock(TableIndexStoragesSupplier.class);
-
- when(indexes.get()).thenReturn(Map.of(indexId, indexStorage));
-
- IndexUpdateHandler indexUpdateHandler = new
IndexUpdateHandler(indexes);
+ IndexUpdateHandler indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
BinaryRowAndRowId row0 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
BinaryRowAndRowId row1 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
- indexUpdateHandler.buildIndex(indexId, Stream.of(row0, row1),
row1.rowId().increment());
+ indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row0, row1),
row1.rowId().increment());
verify(indexStorage).put(row0.binaryRow(), row0.rowId());
verify(indexStorage).put(row1.binaryRow(), row1.rowId());
verify(indexStorage.storage()).setNextRowIdToBuild(row1.rowId().increment());
// Let's check one more batch - it will be the finishing one.
- clearInvocations(indexes, indexStorage);
+ clearInvocations(indexStorage);
BinaryRowAndRowId row2 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
- indexUpdateHandler.buildIndex(indexId, Stream.of(row2), null);
+ indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row2), null);
verify(indexStorage).put(row2.binaryRow(), row2.rowId());
verify(indexStorage.storage()).setNextRowIdToBuild(null);
}
+ @Test
+ void testAddToIndexesOnDestroyIndexes() {
Review Comment:
```suggestion
void testAddToIndexesOnDestroyedIndexes() {
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandlerTest.java:
##########
@@ -37,47 +50,246 @@
public class IndexUpdateHandlerTest extends BaseIgniteAbstractTest {
private static final int PARTITION_ID = 0;
+ private static final int INDEX_ID = 1;
+
@Test
void testBuildIndex() {
TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
- int indexId = 1;
-
- TableIndexStoragesSupplier indexes =
mock(TableIndexStoragesSupplier.class);
-
- when(indexes.get()).thenReturn(Map.of(indexId, indexStorage));
-
- IndexUpdateHandler indexUpdateHandler = new
IndexUpdateHandler(indexes);
+ IndexUpdateHandler indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
BinaryRowAndRowId row0 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
BinaryRowAndRowId row1 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
- indexUpdateHandler.buildIndex(indexId, Stream.of(row0, row1),
row1.rowId().increment());
+ indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row0, row1),
row1.rowId().increment());
verify(indexStorage).put(row0.binaryRow(), row0.rowId());
verify(indexStorage).put(row1.binaryRow(), row1.rowId());
verify(indexStorage.storage()).setNextRowIdToBuild(row1.rowId().increment());
// Let's check one more batch - it will be the finishing one.
- clearInvocations(indexes, indexStorage);
+ clearInvocations(indexStorage);
BinaryRowAndRowId row2 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
- indexUpdateHandler.buildIndex(indexId, Stream.of(row2), null);
+ indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row2), null);
verify(indexStorage).put(row2.binaryRow(), row2.rowId());
verify(indexStorage.storage()).setNextRowIdToBuild(null);
}
+ @Test
+ void testAddToIndexesOnDestroyIndexes() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageDestroyedException.class).when(indexStorage).put(any(),
any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ var row = mock(BinaryRow.class);
+ var rowId = new RowId(PARTITION_ID);
+
+ assertDoesNotThrow(() -> indexUpdateHandler.addToIndexes(row, rowId,
List.of(INDEX_ID, INDEX_ID + 1)));
+
+ verify(indexStorage).put(eq(row), eq(rowId));
+ }
+
+ @Test
+ void testAddToIndexesWithStorageException() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ assertThrows(
+ StorageException.class,
+ () -> indexUpdateHandler.addToIndexes(mock(BinaryRow.class),
new RowId(PARTITION_ID), List.of(INDEX_ID))
+ );
+ }
+
+ @Test
+ void testAddToIndexOnDestroyIndexes() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageDestroyedException.class).when(indexStorage).put(any(),
any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ var row = mock(BinaryRow.class);
+ var rowId = new RowId(PARTITION_ID);
+
+ assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId,
INDEX_ID));
+ assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId,
INDEX_ID + 1));
+
+ verify(indexStorage).put(eq(row), eq(rowId));
+ }
+
+ @Test
+ void testAddToIndexWithStorageException() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ assertThrows(StorageException.class, () ->
indexUpdateHandler.addToIndex(mock(BinaryRow.class), new RowId(PARTITION_ID),
INDEX_ID));
+ }
+
+ @Test
+ void testBuildIndexOnDestroyIndex() {
Review Comment:
```suggestion
void testBuildIndexOnDestroyedIndex() {
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandlerTest.java:
##########
@@ -37,47 +50,246 @@
public class IndexUpdateHandlerTest extends BaseIgniteAbstractTest {
private static final int PARTITION_ID = 0;
+ private static final int INDEX_ID = 1;
+
@Test
void testBuildIndex() {
TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
- int indexId = 1;
-
- TableIndexStoragesSupplier indexes =
mock(TableIndexStoragesSupplier.class);
-
- when(indexes.get()).thenReturn(Map.of(indexId, indexStorage));
-
- IndexUpdateHandler indexUpdateHandler = new
IndexUpdateHandler(indexes);
+ IndexUpdateHandler indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
BinaryRowAndRowId row0 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
BinaryRowAndRowId row1 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
- indexUpdateHandler.buildIndex(indexId, Stream.of(row0, row1),
row1.rowId().increment());
+ indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row0, row1),
row1.rowId().increment());
verify(indexStorage).put(row0.binaryRow(), row0.rowId());
verify(indexStorage).put(row1.binaryRow(), row1.rowId());
verify(indexStorage.storage()).setNextRowIdToBuild(row1.rowId().increment());
// Let's check one more batch - it will be the finishing one.
- clearInvocations(indexes, indexStorage);
+ clearInvocations(indexStorage);
BinaryRowAndRowId row2 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
- indexUpdateHandler.buildIndex(indexId, Stream.of(row2), null);
+ indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row2), null);
verify(indexStorage).put(row2.binaryRow(), row2.rowId());
verify(indexStorage.storage()).setNextRowIdToBuild(null);
}
+ @Test
+ void testAddToIndexesOnDestroyIndexes() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageDestroyedException.class).when(indexStorage).put(any(),
any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ var row = mock(BinaryRow.class);
+ var rowId = new RowId(PARTITION_ID);
+
+ assertDoesNotThrow(() -> indexUpdateHandler.addToIndexes(row, rowId,
List.of(INDEX_ID, INDEX_ID + 1)));
+
+ verify(indexStorage).put(eq(row), eq(rowId));
+ }
+
+ @Test
+ void testAddToIndexesWithStorageException() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ assertThrows(
+ StorageException.class,
+ () -> indexUpdateHandler.addToIndexes(mock(BinaryRow.class),
new RowId(PARTITION_ID), List.of(INDEX_ID))
+ );
+ }
+
+ @Test
+ void testAddToIndexOnDestroyIndexes() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageDestroyedException.class).when(indexStorage).put(any(),
any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ var row = mock(BinaryRow.class);
+ var rowId = new RowId(PARTITION_ID);
+
+ assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId,
INDEX_ID));
+ assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId,
INDEX_ID + 1));
+
+ verify(indexStorage).put(eq(row), eq(rowId));
+ }
+
+ @Test
+ void testAddToIndexWithStorageException() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ assertThrows(StorageException.class, () ->
indexUpdateHandler.addToIndex(mock(BinaryRow.class), new RowId(PARTITION_ID),
INDEX_ID));
+ }
+
+ @Test
+ void testBuildIndexOnDestroyIndex() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+ IndexStorage storage = indexStorage.storage();
+
+ doThrow(StorageDestroyedException.class).when(indexStorage).put(any(),
any());
+
doThrow(StorageDestroyedException.class).when(storage).setNextRowIdToBuild(any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ var row = mock(BinaryRow.class);
+ var rowId = new RowId(PARTITION_ID);
+ var binaryRowAndRowId = new BinaryRowAndRowId(row, rowId);
+
+ assertDoesNotThrow(() -> indexUpdateHandler.buildIndex(INDEX_ID,
Stream.of(binaryRowAndRowId), rowId));
+ assertDoesNotThrow(() -> indexUpdateHandler.buildIndex(INDEX_ID + 1,
Stream.of(binaryRowAndRowId), rowId));
+
+ verify(indexStorage).put(eq(row), eq(rowId));
+ verify(storage).setNextRowIdToBuild(eq(rowId));
+ }
+
+ @Test
+ void testBuildIndexWithStorageException() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ var rowId = new RowId(PARTITION_ID);
+ var binaryRowAndRowId = new BinaryRowAndRowId(mock(BinaryRow.class),
rowId);
+
+ assertThrows(StorageException.class, () ->
indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(binaryRowAndRowId), rowId));
+
+ IndexStorage storage = indexStorage.storage();
+
+ doNothing().when(indexStorage).put(any(), any());
+
doThrow(StorageException.class).when(storage).setNextRowIdToBuild(any());
+
+ assertThrows(StorageException.class, () ->
indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(binaryRowAndRowId), rowId));
+ }
+
+ @Test
+ void testGetNextRowIdToBuildIndexOnDestroyIndex() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+ IndexStorage storage = indexStorage.storage();
+
+
doThrow(StorageDestroyedException.class).when(storage).getNextRowIdToBuild();
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ assertNull(indexUpdateHandler.getNextRowIdToBuildIndex(INDEX_ID));
+ assertNull(indexUpdateHandler.getNextRowIdToBuildIndex(INDEX_ID + 1));
+
+ verify(storage).getNextRowIdToBuild();
+ }
+
+ @Test
+ void testGetNextRowIdToBuildIndexWithStorageException() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+ IndexStorage storage = indexStorage.storage();
+
+ doThrow(StorageException.class).when(storage).getNextRowIdToBuild();
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ assertThrows(StorageException.class, () ->
indexUpdateHandler.getNextRowIdToBuildIndex(INDEX_ID));
+ }
+
+ @Test
+ void testSetNextRowIdToBuildIndexOnDestroyIndex() {
Review Comment:
```suggestion
void testSetNextRowIdToBuildIndexOnDestroyedIndex() {
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandlerTest.java:
##########
@@ -37,47 +50,246 @@
public class IndexUpdateHandlerTest extends BaseIgniteAbstractTest {
private static final int PARTITION_ID = 0;
+ private static final int INDEX_ID = 1;
+
@Test
void testBuildIndex() {
TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
- int indexId = 1;
-
- TableIndexStoragesSupplier indexes =
mock(TableIndexStoragesSupplier.class);
-
- when(indexes.get()).thenReturn(Map.of(indexId, indexStorage));
-
- IndexUpdateHandler indexUpdateHandler = new
IndexUpdateHandler(indexes);
+ IndexUpdateHandler indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
BinaryRowAndRowId row0 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
BinaryRowAndRowId row1 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
- indexUpdateHandler.buildIndex(indexId, Stream.of(row0, row1),
row1.rowId().increment());
+ indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row0, row1),
row1.rowId().increment());
verify(indexStorage).put(row0.binaryRow(), row0.rowId());
verify(indexStorage).put(row1.binaryRow(), row1.rowId());
verify(indexStorage.storage()).setNextRowIdToBuild(row1.rowId().increment());
// Let's check one more batch - it will be the finishing one.
- clearInvocations(indexes, indexStorage);
+ clearInvocations(indexStorage);
BinaryRowAndRowId row2 = new BinaryRowAndRowId(mock(BinaryRow.class),
new RowId(PARTITION_ID));
- indexUpdateHandler.buildIndex(indexId, Stream.of(row2), null);
+ indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(row2), null);
verify(indexStorage).put(row2.binaryRow(), row2.rowId());
verify(indexStorage.storage()).setNextRowIdToBuild(null);
}
+ @Test
+ void testAddToIndexesOnDestroyIndexes() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageDestroyedException.class).when(indexStorage).put(any(),
any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ var row = mock(BinaryRow.class);
+ var rowId = new RowId(PARTITION_ID);
+
+ assertDoesNotThrow(() -> indexUpdateHandler.addToIndexes(row, rowId,
List.of(INDEX_ID, INDEX_ID + 1)));
+
+ verify(indexStorage).put(eq(row), eq(rowId));
+ }
+
+ @Test
+ void testAddToIndexesWithStorageException() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ assertThrows(
+ StorageException.class,
+ () -> indexUpdateHandler.addToIndexes(mock(BinaryRow.class),
new RowId(PARTITION_ID), List.of(INDEX_ID))
+ );
+ }
+
+ @Test
+ void testAddToIndexOnDestroyIndexes() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageDestroyedException.class).when(indexStorage).put(any(),
any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ var row = mock(BinaryRow.class);
+ var rowId = new RowId(PARTITION_ID);
+
+ assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId,
INDEX_ID));
+ assertDoesNotThrow(() -> indexUpdateHandler.addToIndex(row, rowId,
INDEX_ID + 1));
+
+ verify(indexStorage).put(eq(row), eq(rowId));
+ }
+
+ @Test
+ void testAddToIndexWithStorageException() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ assertThrows(StorageException.class, () ->
indexUpdateHandler.addToIndex(mock(BinaryRow.class), new RowId(PARTITION_ID),
INDEX_ID));
+ }
+
+ @Test
+ void testBuildIndexOnDestroyIndex() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+ IndexStorage storage = indexStorage.storage();
+
+ doThrow(StorageDestroyedException.class).when(indexStorage).put(any(),
any());
+
doThrow(StorageDestroyedException.class).when(storage).setNextRowIdToBuild(any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ var row = mock(BinaryRow.class);
+ var rowId = new RowId(PARTITION_ID);
+ var binaryRowAndRowId = new BinaryRowAndRowId(row, rowId);
+
+ assertDoesNotThrow(() -> indexUpdateHandler.buildIndex(INDEX_ID,
Stream.of(binaryRowAndRowId), rowId));
+ assertDoesNotThrow(() -> indexUpdateHandler.buildIndex(INDEX_ID + 1,
Stream.of(binaryRowAndRowId), rowId));
+
+ verify(indexStorage).put(eq(row), eq(rowId));
+ verify(storage).setNextRowIdToBuild(eq(rowId));
+ }
+
+ @Test
+ void testBuildIndexWithStorageException() {
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ doThrow(StorageException.class).when(indexStorage).put(any(), any());
+
+ var indexUpdateHandler = new
IndexUpdateHandler(indexStoragesSupplier(Map.of(INDEX_ID, indexStorage)));
+
+ var rowId = new RowId(PARTITION_ID);
+ var binaryRowAndRowId = new BinaryRowAndRowId(mock(BinaryRow.class),
rowId);
+
+ assertThrows(StorageException.class, () ->
indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(binaryRowAndRowId), rowId));
+
+ IndexStorage storage = indexStorage.storage();
+
+ doNothing().when(indexStorage).put(any(), any());
+
doThrow(StorageException.class).when(storage).setNextRowIdToBuild(any());
+
+ assertThrows(StorageException.class, () ->
indexUpdateHandler.buildIndex(INDEX_ID, Stream.of(binaryRowAndRowId), rowId));
+ }
+
+ @Test
+ void testGetNextRowIdToBuildIndexOnDestroyIndex() {
Review Comment:
```suggestion
void testGetNextRowIdToBuildIndexOnDestroyedIndex() {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]