rpuch commented on code in PR #2910:
URL: https://github.com/apache/ignite-3/pull/2910#discussion_r1413863468


##########
modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java:
##########
@@ -291,4 +295,19 @@ void testLast() {
         assertEquals(1, last(List.of(1)));
         assertEquals(2, last(List.of(1, 2)));
     }
+
+    @Test
+    void testViewList() {
+        assertThat(view(List.of(), Function.identity()), empty());
+        assertThat(view(List.of(), Object::toString), empty());
+
+        assertThat(view(List.of(1, 2, 3), Function.identity()), 
equalTo(List.of(1, 2, 3)));
+        assertThat(view(List.of(1, 2, 3), Integer::longValue), 
equalTo(List.of(1L, 2L, 3L)));
+
+        List<Integer> view = view(List.of(1), Function.identity());
+
+        assertThrows(UnsupportedOperationException.class, () -> view.add(0));
+        assertThrows(UnsupportedOperationException.class, () -> view.set(0, 
0));
+        assertThrows(UnsupportedOperationException.class, () -> 
view.remove(0));
+    }

Review Comment:
   It seems that it would be useful to also test that, after a view is created, 
modifications of the original list are visible through the view



##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItRwTransactionAndIndexesTest.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.verify;
+
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableTestUtils;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/** Testing RW transactions and indexes. */
+public class ItRwTransactionAndIndexesTest extends 
ClusterPerClassIntegrationTest {
+    private static final String TABLE_NAME = "TEST_TABLE";
+
+    private static final String INDEX_NAME = "TEST_INDEX";
+
+    private static final String PK_INDEX_NAME = pkIndexName(TABLE_NAME);
+
+    private static final String COLUMN_NAME = "SALARY";
+
+    private static final String ENGINE = "test";
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @AfterEach
+    void tearDown() {
+        sql("DROP TABLE IF EXISTS " + TABLE_NAME);
+        sql("DROP ZONE IF EXISTS " + zoneName(TABLE_NAME));
+
+        CLUSTER.runningNodes().forEach(IgniteImpl::stopDroppingMessages);
+    }
+
+    @Test
+    void testCreateIndexInsideRwTransaction() {
+        TableImpl table = createTable(TABLE_NAME, 1, 1, ENGINE);
+
+        setAwaitIndexAvailability(false);
+        dropBuildAllIndex();
+
+        Transaction rwTx = beginRwTransaction();
+
+        createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME);
+
+        IndexStorage[] indexStorages = indexStorages(table, PK_INDEX_NAME, 
INDEX_NAME);
+        clearInvocations(indexStorages);
+
+        insertPeopleInTransaction(rwTx, TABLE_NAME, newPerson(1));
+
+        verifyPutIntoIndexes(indexStorages);
+
+        rwTx.commit();
+    }
+
+    @Test
+    void testDropIndexInsideRwTransaction() {
+        TableImpl table = createTable(TABLE_NAME, 1, 1, ENGINE);
+
+        createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME);
+
+        Transaction rwTx = beginRwTransaction();
+
+        IndexStorage[] indexStorages = indexStorages(table, PK_INDEX_NAME, 
INDEX_NAME);
+        clearInvocations(indexStorages);
+
+        dropIndex(INDEX_NAME);
+
+        insertPeopleInTransaction(rwTx, TABLE_NAME, newPerson(0));
+
+        verifyPutIntoIndexes(indexStorages);
+
+        rwTx.commit();
+    }
+
+    private static IgniteImpl node() {
+        return CLUSTER.node(0);
+    }
+
+    private static void dropBuildAllIndex() {

Review Comment:
   ```suggestion
       private static void dropBuildAllIndexMessages() {
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -98,8 +99,13 @@ public void handleUpdate(
             boolean trackWriteIntent,
             @Nullable Runnable onApplication,
             @Nullable HybridTimestamp commitTs,
-            @Nullable HybridTimestamp lastCommitTs
+            @Nullable HybridTimestamp lastCommitTs,
+            // TODO: IGNITE-18595 We need to know the indexes for a full 
rebalance, i.e. null must go
+            @Nullable List<Integer> indexIds

Review Comment:
   Would `IntList` be more suitable here?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexUpdateHandler.java:
##########
@@ -49,18 +51,36 @@ public IndexUpdateHandler(TableIndexStoragesSupplier 
indexes) {
     /**
      * Adds a binary row to the indexes, if the tombstone then skips such 
operation.
      *
-     * <p>Must be called inside a {@link 
MvPartitionStorage#runConsistently(WriteClosure)} closure.
+     * <p>Must be called inside a {@link 
MvPartitionStorage#runConsistently(WriteClosure)} closure.</p>
      *
      * @param binaryRow Binary row to insert.
      * @param rowId Row ID.
+     * @param indexIds IDs of indexes that will need to be updated, {@code 
null} for all indexes.
      */
-    public void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
+    public void addToIndexes(
+            @Nullable BinaryRow binaryRow,
+            RowId rowId,
+            // TODO: IGNITE-18595 We need to know the indexes for a full 
rebalance, i.e. null must go
+            @Nullable List<Integer> indexIds
+    ) {
+        assert indexIds == null || !indexIds.isEmpty() : indexIds;
+
         if (binaryRow == null) { // skip removes
             return;
         }
 
-        for (TableSchemaAwareIndexStorage index : indexes.get().values()) {
-            index.put(binaryRow, rowId);
+        Map<Integer, TableSchemaAwareIndexStorage> indexStorageByIndexId = 
indexStorageByIndexId();
+
+        if (indexIds == null) {
+            indexStorageByIndexId.values().forEach(indexStorage -> 
indexStorage.put(binaryRow, rowId));

Review Comment:
   Will not this create a lambda instance per storage per write? This is on a 
hot path.



##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItRwTransactionAndIndexesTest.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.verify;
+
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableTestUtils;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/** Testing RW transactions and indexes. */
+public class ItRwTransactionAndIndexesTest extends 
ClusterPerClassIntegrationTest {
+    private static final String TABLE_NAME = "TEST_TABLE";
+
+    private static final String INDEX_NAME = "TEST_INDEX";
+
+    private static final String PK_INDEX_NAME = pkIndexName(TABLE_NAME);
+
+    private static final String COLUMN_NAME = "SALARY";
+
+    private static final String ENGINE = "test";
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @AfterEach
+    void tearDown() {
+        sql("DROP TABLE IF EXISTS " + TABLE_NAME);
+        sql("DROP ZONE IF EXISTS " + zoneName(TABLE_NAME));
+
+        CLUSTER.runningNodes().forEach(IgniteImpl::stopDroppingMessages);
+    }
+
+    @Test
+    void testCreateIndexInsideRwTransaction() {
+        TableImpl table = createTable(TABLE_NAME, 1, 1, ENGINE);
+
+        setAwaitIndexAvailability(false);
+        dropBuildAllIndex();
+
+        Transaction rwTx = beginRwTransaction();
+
+        createIndex(TABLE_NAME, INDEX_NAME, COLUMN_NAME);
+
+        IndexStorage[] indexStorages = indexStorages(table, PK_INDEX_NAME, 
INDEX_NAME);
+        clearInvocations(indexStorages);
+
+        insertPeopleInTransaction(rwTx, TABLE_NAME, newPerson(1));
+
+        verifyPutIntoIndexes(indexStorages);
+
+        rwTx.commit();

Review Comment:
   Is this called to make sure that commit passes successfully? If yes, then 
please wrap it with `assertDoesNotThrow()` to make the expectation explicit.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java:
##########
@@ -53,4 +54,18 @@ default Map<UUID, TimedBinaryRow> rowsToUpdate() {
 
         return map;
     }
+
+    /** Returns operation timestamp. */
+    @WithSetter
+    long operationTimestampLong();
+
+    /** Setter for the operationTimestamp field. */
+    default void operationTimestampLong(long operationTime) {

Review Comment:
   Will not it work if we write a normal, non-default method here?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -274,6 +276,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      */
     private final Object commandProcessingLinearizationMutex = new Object();
 
+    /** Choose indexes for operations. */

Review Comment:
   ```suggestion
       /** Chooses indexes for operations. */
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -591,4 +613,12 @@ private void markFinished(UUID txId, boolean commit, 
@Nullable HybridTimestamp c
                 commit ? commitTimestamp : null
         ));
     }
+
+    private List<Integer> indexIdsForRwUpdateOperation(int tableId, 
HybridTimestamp opTs) {
+        return indexIdsForRwUpdateOperation(tableId, 
catalogService.activeCatalogVersion(opTs.longValue()));
+    }
+
+    private List<Integer> indexIdsForRwUpdateOperation(int tableId, int 
catalogVersion) {
+        return view(indexChooser.chooseForRwTxUpdateOperation(catalogVersion, 
tableId), CatalogObjectDescriptor::id);

Review Comment:
   This method can determine tableId by itself calling `storage.tableId()`, so 
the parameter can be removed (cascading to removal of this parameter in the 
previous method as well)



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java:
##########
@@ -147,7 +147,8 @@ private VacuumResult internalVacuum(HybridTimestamp 
lowWatermark, Locker locker,
             }
 
             try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
-                indexUpdateHandler.tryRemoveFromIndexes(binaryRow, rowId, 
cursor);
+                // TODO: IGNITE-21005 We need to choose only those indexes 
that are not available for transactions
+                indexUpdateHandler.tryRemoveFromIndexes(binaryRow, rowId, 
cursor, null);

Review Comment:
   Can a race between addition of an index (and writing to that index) and 
removal of a row from that new index happen? Like the GC has not yet seen the 
index, but someone has already written to the index, and the record the GC is 
removing has already been put to the index?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -608,62 +612,70 @@ private CompletableFuture<Void> 
validateSchemaMatch(ReplicaRequest request, @Nul
      * {@link SchemaRegistry#schema(int)}.
      *
      * @param request Replica request corresponding to the operation.
-     * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null} 
otherwise.
+     * @param opTs Operation timestamp.
      * @return Future completed when the validation is finished.
      */
-    private CompletableFuture<Void> waitForSchemasBeforeReading(ReplicaRequest 
request, @Nullable HybridTimestamp opTsIfDirectRo) {
-        HybridTimestamp tsToWaitForSchema = getTxStartTimestamp(request);
-        if (tsToWaitForSchema == null) {
-            tsToWaitForSchema = opTsIfDirectRo;
-        }
+    private CompletableFuture<Void> waitForSchemasBeforeReading(ReplicaRequest 
request, HybridTimestamp opTs) {
+        HybridTimestamp tsToWaitForSchema = getTsToWaitForSchema(request, 
opTs);
 
         return tsToWaitForSchema == null ? nullCompletedFuture() : 
schemaSyncService.waitForMetadataCompleteness(tsToWaitForSchema);
     }
 
     /**
-     * Returns timestamp of transaction start (for RW/timestamped RO requests) 
or @{code null} for other requests.
+     * Returns the timestamp for which the schema will need to be awaited.
+     *
+     * <ul>
+     *     <li>For RW - begin timestamp of transaction.</li>
+     *     <li>For RO - read timestamp.</li>
+     *     <li>For RO direct - operation timestamp.</li>
+     *     <li>For others, {@code null}.</li>
+     * </ul>
      *
      * @param request Replica request corresponding to the operation.
+     * @param opTs Operation timestamp.
      */
-    private static @Nullable HybridTimestamp 
getTxStartTimestamp(ReplicaRequest request) {
-        HybridTimestamp txStartTimestamp;
+    private static @Nullable HybridTimestamp 
getTsToWaitForSchema(ReplicaRequest request, HybridTimestamp opTs) {
+        HybridTimestamp tsToWaitForSchema;

Review Comment:
   The variable seems to be excessive, we can just `return` the value right 
away in each branch of if/elseif



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -117,21 +126,27 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
      * @param partitionDataStorage The storage.
      * @param safeTime Safe time tracker.
      * @param storageIndexTracker Storage index tracker.
+     * @param catalogService Catalog service.
+     * @param indexChooser Choose indexes for operations.

Review Comment:
   ```suggestion
        * @param indexChooser Chooses indexes for operations.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to