rpuch commented on code in PR #2910:
URL: https://github.com/apache/ignite-3/pull/2910#discussion_r1413863468
##########
modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java:
##########
@@ -291,4 +295,19 @@ void testLast() {
assertEquals(1, last(List.of(1)));
assertEquals(2, last(List.of(1, 2)));
}
+
+ @Test
+ void testViewList() {
+ assertThat(view(List.of(), Function.identity()), empty());
+ assertThat(view(List.of(), Object::toString), empty());
+
+ assertThat(view(List.of(1, 2, 3), Function.identity()),
equalTo(List.of(1, 2, 3)));
+ assertThat(view(List.of(1, 2, 3), Integer::longValue),
equalTo(List.of(1L, 2L, 3L)));
+
+ List<Integer> view = view(List.of(1), Function.identity());
+
+ assertThrows(UnsupportedOperationException.class, () -> view.add(0));
+ assertThrows(UnsupportedOperationException.class, () -> view.set(0,
0));
+ assertThrows(UnsupportedOperationException.class, () ->
view.remove(0));
+ }
Review Comment:
It seems that it would be useful to also test that, after a view is created,
modifications of the original list are visible through the view
##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItRwTransactionAndIndexesTest.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.verify;
+
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableTestUtils;
+import
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/** Testing RW transactions and indexes. */
+public class ItRwTransactionAndIndexesTest extends
ClusterPerClassIntegrationTest {
+ private static final String TABLE_NAME = "TEST_TABLE";
+
+ private static final String INDEX_NAME = "TEST_INDEX";
+
+ private static final String PK_INDEX_NAME = pkIndexName(TABLE_NAME);
+
+ private static final String COLUMN_NAME = "SALARY";
+
+ private static final String ENGINE = "test";
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @AfterEach
+ void tearDown() {
+ sql("DROP TABLE IF EXISTS " + TABLE_NAME);
+ sql("DROP ZONE IF EXISTS " + zoneName(TABLE_NAME));
+
+ CLUSTER.runningNodes().forEach(IgniteImpl::stopDroppingMessages);
+ }
+
+ @Test
+ void testCreateIndexInsideRwTransaction() {
+ TableImpl table = createTable(TABLE_NAME, 1, 1, ENGINE);
+
+ setAwaitIndexAvailability(false);
+ dropBuildAllIndex();
+
+ Transaction rwTx = beginRwTransaction();
+
+ createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME);
+
+ IndexStorage[] indexStorages = indexStorages(table, PK_INDEX_NAME,
INDEX_NAME);
+ clearInvocations(indexStorages);
+
+ insertPeopleInTransaction(rwTx, TABLE_NAME, newPerson(1));
+
+ verifyPutIntoIndexes(indexStorages);
+
+ rwTx.commit();
+ }
+
+ @Test
+ void testDropIndexInsideRwTransaction() {
+ TableImpl table = createTable(TABLE_NAME, 1, 1, ENGINE);
+
+ createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME);
+
+ Transaction rwTx = beginRwTransaction();
+
+ IndexStorage[] indexStorages = indexStorages(table, PK_INDEX_NAME,
INDEX_NAME);
+ clearInvocations(indexStorages);
+
+ dropIndex(INDEX_NAME);
+
+ insertPeopleInTransaction(rwTx, TABLE_NAME, newPerson(0));
+
+ verifyPutIntoIndexes(indexStorages);
+
+ rwTx.commit();
+ }
+
+ private static IgniteImpl node() {
+ return CLUSTER.node(0);
+ }
+
+ private static void dropBuildAllIndex() {
Review Comment:
```suggestion
private static void dropBuildAllIndexMessages() {
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -98,8 +99,13 @@ public void handleUpdate(
boolean trackWriteIntent,
@Nullable Runnable onApplication,
@Nullable HybridTimestamp commitTs,
- @Nullable HybridTimestamp lastCommitTs
+ @Nullable HybridTimestamp lastCommitTs,
+ // TODO: IGNITE-18595 We need to know the indexes for a full
rebalance, i.e. null must go
+ @Nullable List<Integer> indexIds
Review Comment:
Would `IntList` be more suitable here?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandler.java:
##########
@@ -49,18 +51,36 @@ public IndexUpdateHandler(TableIndexStoragesSupplier
indexes) {
/**
* Adds a binary row to the indexes, if the tombstone then skips such
operation.
*
- * <p>Must be called inside a {@link
MvPartitionStorage#runConsistently(WriteClosure)} closure.
+ * <p>Must be called inside a {@link
MvPartitionStorage#runConsistently(WriteClosure)} closure.</p>
*
* @param binaryRow Binary row to insert.
* @param rowId Row ID.
+ * @param indexIds IDs of indexes that will need to be updated, {@code
null} for all indexes.
*/
- public void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
+ public void addToIndexes(
+ @Nullable BinaryRow binaryRow,
+ RowId rowId,
+ // TODO: IGNITE-18595 We need to know the indexes for a full
rebalance, i.e. null must go
+ @Nullable List<Integer> indexIds
+ ) {
+ assert indexIds == null || !indexIds.isEmpty() : indexIds;
+
if (binaryRow == null) { // skip removes
return;
}
- for (TableSchemaAwareIndexStorage index : indexes.get().values()) {
- index.put(binaryRow, rowId);
+ Map<Integer, TableSchemaAwareIndexStorage> indexStorageByIndexId =
indexStorageByIndexId();
+
+ if (indexIds == null) {
+ indexStorageByIndexId.values().forEach(indexStorage ->
indexStorage.put(binaryRow, rowId));
Review Comment:
Will not this create a lambda instance per storage per write? This is on a
hot path.
##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItRwTransactionAndIndexesTest.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.verify;
+
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableTestUtils;
+import
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/** Testing RW transactions and indexes. */
+public class ItRwTransactionAndIndexesTest extends
ClusterPerClassIntegrationTest {
+ private static final String TABLE_NAME = "TEST_TABLE";
+
+ private static final String INDEX_NAME = "TEST_INDEX";
+
+ private static final String PK_INDEX_NAME = pkIndexName(TABLE_NAME);
+
+ private static final String COLUMN_NAME = "SALARY";
+
+ private static final String ENGINE = "test";
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @AfterEach
+ void tearDown() {
+ sql("DROP TABLE IF EXISTS " + TABLE_NAME);
+ sql("DROP ZONE IF EXISTS " + zoneName(TABLE_NAME));
+
+ CLUSTER.runningNodes().forEach(IgniteImpl::stopDroppingMessages);
+ }
+
+ @Test
+ void testCreateIndexInsideRwTransaction() {
+ TableImpl table = createTable(TABLE_NAME, 1, 1, ENGINE);
+
+ setAwaitIndexAvailability(false);
+ dropBuildAllIndex();
+
+ Transaction rwTx = beginRwTransaction();
+
+ createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME);
+
+ IndexStorage[] indexStorages = indexStorages(table, PK_INDEX_NAME,
INDEX_NAME);
+ clearInvocations(indexStorages);
+
+ insertPeopleInTransaction(rwTx, TABLE_NAME, newPerson(1));
+
+ verifyPutIntoIndexes(indexStorages);
+
+ rwTx.commit();
Review Comment:
Is this called to make sure that commit passes successfully? If yes, then
please wrap it with `assertDoesNotThrow()` to make the expectation explicit.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java:
##########
@@ -53,4 +54,18 @@ default Map<UUID, TimedBinaryRow> rowsToUpdate() {
return map;
}
+
+ /** Returns operation timestamp. */
+ @WithSetter
+ long operationTimestampLong();
+
+ /** Setter for the operationTimestamp field. */
+ default void operationTimestampLong(long operationTime) {
Review Comment:
Will not it work if we write a normal, non-default method here?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -274,6 +276,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
*/
private final Object commandProcessingLinearizationMutex = new Object();
+ /** Choose indexes for operations. */
Review Comment:
```suggestion
/** Chooses indexes for operations. */
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -591,4 +613,12 @@ private void markFinished(UUID txId, boolean commit,
@Nullable HybridTimestamp c
commit ? commitTimestamp : null
));
}
+
+ private List<Integer> indexIdsForRwUpdateOperation(int tableId,
HybridTimestamp opTs) {
+ return indexIdsForRwUpdateOperation(tableId,
catalogService.activeCatalogVersion(opTs.longValue()));
+ }
+
+ private List<Integer> indexIdsForRwUpdateOperation(int tableId, int
catalogVersion) {
+ return view(indexChooser.chooseForRwTxUpdateOperation(catalogVersion,
tableId), CatalogObjectDescriptor::id);
Review Comment:
This method can determine tableId by itself calling `storage.tableId()`, so
the parameter can be removed (cascading to removal of this parameter in the
previous method as well)
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java:
##########
@@ -147,7 +147,8 @@ private VacuumResult internalVacuum(HybridTimestamp
lowWatermark, Locker locker,
}
try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
- indexUpdateHandler.tryRemoveFromIndexes(binaryRow, rowId,
cursor);
+ // TODO: IGNITE-21005 We need to choose only those indexes
that are not available for transactions
+ indexUpdateHandler.tryRemoveFromIndexes(binaryRow, rowId,
cursor, null);
Review Comment:
Can a race between addition of an index (and writing to that index) and
removal of a row from that new index happen? Like the GC has not yet seen the
index, but someone has already written to the index, and the record the GC is
removing has already been put to the index?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -608,62 +612,70 @@ private CompletableFuture<Void>
validateSchemaMatch(ReplicaRequest request, @Nul
* {@link SchemaRegistry#schema(int)}.
*
* @param request Replica request corresponding to the operation.
- * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null}
otherwise.
+ * @param opTs Operation timestamp.
* @return Future completed when the validation is finished.
*/
- private CompletableFuture<Void> waitForSchemasBeforeReading(ReplicaRequest
request, @Nullable HybridTimestamp opTsIfDirectRo) {
- HybridTimestamp tsToWaitForSchema = getTxStartTimestamp(request);
- if (tsToWaitForSchema == null) {
- tsToWaitForSchema = opTsIfDirectRo;
- }
+ private CompletableFuture<Void> waitForSchemasBeforeReading(ReplicaRequest
request, HybridTimestamp opTs) {
+ HybridTimestamp tsToWaitForSchema = getTsToWaitForSchema(request,
opTs);
return tsToWaitForSchema == null ? nullCompletedFuture() :
schemaSyncService.waitForMetadataCompleteness(tsToWaitForSchema);
}
/**
- * Returns timestamp of transaction start (for RW/timestamped RO requests)
or @{code null} for other requests.
+ * Returns the timestamp for which the schema will need to be awaited.
+ *
+ * <ul>
+ * <li>For RW - begin timestamp of transaction.</li>
+ * <li>For RO - read timestamp.</li>
+ * <li>For RO direct - operation timestamp.</li>
+ * <li>For others, {@code null}.</li>
+ * </ul>
*
* @param request Replica request corresponding to the operation.
+ * @param opTs Operation timestamp.
*/
- private static @Nullable HybridTimestamp
getTxStartTimestamp(ReplicaRequest request) {
- HybridTimestamp txStartTimestamp;
+ private static @Nullable HybridTimestamp
getTsToWaitForSchema(ReplicaRequest request, HybridTimestamp opTs) {
+ HybridTimestamp tsToWaitForSchema;
Review Comment:
The variable seems to be excessive, we can just `return` the value right
away in each branch of if/elseif
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -117,21 +126,27 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
* @param partitionDataStorage The storage.
* @param safeTime Safe time tracker.
* @param storageIndexTracker Storage index tracker.
+ * @param catalogService Catalog service.
+ * @param indexChooser Choose indexes for operations.
Review Comment:
```suggestion
* @param indexChooser Chooses indexes for operations.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]