ibessonov commented on code in PR #2566:
URL: https://github.com/apache/ignite-3/pull/2566#discussion_r1321148177


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java:
##########
@@ -342,6 +422,108 @@ private static void createTables(Ignite node) {
         createTwoColumnTable(node, ColumnType.number());
         createTwoColumnTable(node, ColumnType.blob());
         createTwoColumnTable(node, ColumnType.bitmaskOf(32));
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(INT8).build(),
+                
ColumnParams.builder().name("VAL").type(INT8).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(BOOLEAN).build(),
+                
ColumnParams.builder().name("VAL").type(BOOLEAN).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(INT16).build(),
+                
ColumnParams.builder().name("VAL").type(INT16).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(INT32).build(),
+                
ColumnParams.builder().name("VAL").type(INT32).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(INT64).build(),
+                
ColumnParams.builder().name("VAL").type(INT64).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(FLOAT).build(),
+                
ColumnParams.builder().name("VAL").type(FLOAT).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(DOUBLE).build(),
+                
ColumnParams.builder().name("VAL").type(DOUBLE).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                
ColumnParams.builder().name("KEY").type(org.apache.ignite.sql.ColumnType.UUID).build(),
+                
ColumnParams.builder().name("VAL").type(org.apache.ignite.sql.ColumnType.UUID).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                
ColumnParams.builder().name("KEY").type(DECIMAL).precision(19).scale(3).build(),
+                
ColumnParams.builder().name("VAL").type(DECIMAL).precision(19).scale(3).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(STRING).build(),
+                
ColumnParams.builder().name("VAL").type(STRING).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(DATE).build(),
+                
ColumnParams.builder().name("VAL").type(DATE).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                
ColumnParams.builder().name("KEY").type(DATETIME).precision(6).build(),
+                
ColumnParams.builder().name("VAL").type(DATETIME).precision(6).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                
ColumnParams.builder().name("KEY").type(TIME).precision(6).build(),
+                
ColumnParams.builder().name("VAL").type(TIME).precision(6).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                
ColumnParams.builder().name("KEY").type(TIMESTAMP).precision(6).build(),
+                
ColumnParams.builder().name("VAL").type(TIMESTAMP).precision(6).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                
ColumnParams.builder().name("KEY").type(NUMBER).precision(Integer.MAX_VALUE).build(),
+                
ColumnParams.builder().name("VAL").type(NUMBER).precision(Integer.MAX_VALUE).nullable(true).build()
+        );
+
+        createTwoColumnTableInCatalog(
+                ignite,
+                ColumnParams.builder().name("KEY").type(BYTE_ARRAY).build(),

Review Comment:
   Do we really allow blobs to be a primary key?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests about basic Schema Synchronization properties that can be tested 
using just one Ignite node.
+ */
+class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
+    private static final int UNKNOWN_RECORD_ID = 999;
+
+    private static final int NODES_TO_START = 1;
+
+    private static final String TABLE_NAME = "test";
+    private static final String UNRELATED_TABLE_NAME = "unrelated_table";
+
+    private IgniteImpl node;
+
+    @Override
+    protected int initialNodes() {
+        return NODES_TO_START;
+    }
+
+    @BeforeEach
+    void assignNode() {
+        node = cluster.node(0);
+    }
+
+    /**
+     * Makes sure that the following sequence results in an operation failure 
and transaction rollback.
+     *
+     * <ol>
+     *     <li>A table is created</li>
+     *     <li>A transaction is started</li>
+     *     <li>The table is enlisted in the transaction</li>
+     *     <li>The table is ALTERed</li>
+     *     <li>An attempt to read or write to the table in the transaction is 
made</li>
+     * </ol>
+     */
+    @ParameterizedTest
+    @EnumSource(Operation.class)
+    void 
readWriteOperationInTxAfterAlteringSchemaOnTargetTableIsRejected(Operation 
operation) {
+        cluster.doInSession(0, session -> {
+            executeUpdate("create table " + TABLE_NAME + " (id int primary 
key, val varchar)", session);
+        });
+
+        Table table = node.tables().table(TABLE_NAME);
+
+        InternalTransaction tx = (InternalTransaction) 
node.transactions().begin();
+
+        enlistTableInTransaction(table, tx);
+
+        cluster.doInSession(0, session -> {
+            executeUpdate("alter table " + TABLE_NAME + " add column added 
int", session);
+        });
+
+        IgniteException ex;
+
+        if (operation.sql()) {
+            ex = assertThrows(IgniteException.class, () -> 
operation.execute(table, tx, cluster));
+            assertThat(
+                    ex.getMessage(),
+                    containsString("Table schema was updated since the 
transaction was started [table=1, startSchema=1, operationSchema=2")
+            );
+        } else {
+            ex = assertThrows(TransactionException.class, () -> 
operation.execute(table, tx, cluster));
+            assertThat(
+                    ex.getMessage(),
+                    is("Table schema was updated since the transaction was 
started [table=1, startSchema=1, operationSchema=2")
+            );
+        }
+
+        assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
+
+        // TODO: IGNITE-20342 - Assert for SQL too.
+        if (!operation.sql()) {
+            assertThat(tx.state(), is(TxState.ABORTED));
+        }
+    }
+
+    private void enlistTableInTransaction(Table table, Transaction tx) {
+        executeReadOn(table, tx, cluster);
+    }
+
+    private static void executeReadOn(Table table, Transaction tx, Cluster 
cluster) {

Review Comment:
   Do we really need this method? Some "doInSession" are right in the code, and 
they look fine. What are you trying to achieve?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests about basic Schema Synchronization properties that can be tested 
using just one Ignite node.
+ */
+class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
+    private static final int UNKNOWN_RECORD_ID = 999;
+
+    private static final int NODES_TO_START = 1;
+
+    private static final String TABLE_NAME = "test";
+    private static final String UNRELATED_TABLE_NAME = "unrelated_table";
+
+    private IgniteImpl node;
+
+    @Override
+    protected int initialNodes() {
+        return NODES_TO_START;
+    }
+
+    @BeforeEach
+    void assignNode() {
+        node = cluster.node(0);
+    }
+
+    /**
+     * Makes sure that the following sequence results in an operation failure 
and transaction rollback.
+     *
+     * <ol>
+     *     <li>A table is created</li>
+     *     <li>A transaction is started</li>
+     *     <li>The table is enlisted in the transaction</li>
+     *     <li>The table is ALTERed</li>
+     *     <li>An attempt to read or write to the table in the transaction is 
made</li>
+     * </ol>
+     */
+    @ParameterizedTest
+    @EnumSource(Operation.class)
+    void 
readWriteOperationInTxAfterAlteringSchemaOnTargetTableIsRejected(Operation 
operation) {
+        cluster.doInSession(0, session -> {
+            executeUpdate("create table " + TABLE_NAME + " (id int primary 
key, val varchar)", session);
+        });
+
+        Table table = node.tables().table(TABLE_NAME);
+
+        InternalTransaction tx = (InternalTransaction) 
node.transactions().begin();
+
+        enlistTableInTransaction(table, tx);
+
+        cluster.doInSession(0, session -> {
+            executeUpdate("alter table " + TABLE_NAME + " add column added 
int", session);
+        });
+
+        IgniteException ex;
+
+        if (operation.sql()) {
+            ex = assertThrows(IgniteException.class, () -> 
operation.execute(table, tx, cluster));
+            assertThat(
+                    ex.getMessage(),
+                    containsString("Table schema was updated since the 
transaction was started [table=1, startSchema=1, operationSchema=2")

Review Comment:
   Maybe it should be "after" instead of "since". My English is not 
particularly good, but I feel suspicious right now, something is off in this 
sentence.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestTypeTest.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator.action;
+
+
+import static 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RO_GET;
+import static 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RO_GET_ALL;
+import static 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RO_SCAN;
+import static 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET;
+import static 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET_ALL;
+import static 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_SCAN;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Set;
+import org.junit.jupiter.api.Test;
+
+class RequestTypeTest {
+    @Test
+    void isRwReadWorksAsExpected() {
+        Set<RequestType> expectedRwReads = Set.of(RW_GET, RW_GET_ALL, RW_SCAN);

Review Comment:
   A good practice is to always use `EnumSet` for sets with enums



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1665,12 +1695,12 @@ private CompletableFuture<Object> 
processMultiEntryAction(ReadWriteMultiRowRepli
                     }
 
                     if (rowIdsToDelete.isEmpty()) {
-                        return completedFuture(result);
+                        return completedFuture(didNoWrites(result));
                     }
 
-                    return updateAllCommand(request, rowIdsToDelete, 
txCoordinatorId)
+                    return chooseOpTsValidateAndBuildUpdateAllCommand(request, 
rowIdsToDelete, txCoordinatorId)

Review Comment:
   All of this gets me thinking - what if we "choose" operation timestamp 
outside of this method and pass it as a parameter? Would that make sense?
   The pattern with "failIfSchemaChangedSinceTxStart" is repeated multiple times



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1469,22 +1478,41 @@ private <T> CompletableFuture<T> appendTxCommand(UUID 
txId, RequestType cmdType,
         }
 
         if (!fut.isDone()) {
-            op.get().whenComplete((v, th) -> {
-                if (full) { // Fast unlock.
-                    releaseTxLocks(txId);
-                }
+            validateAtTsIfRwRead(txId, cmdType)
+                    .thenCompose(unused -> op.get())
+                    // Write actions do ts-dependent validations after taking 
locks. If it turned out that nothing has to be written,
+                    // then no lock is taken and, hence, no ts-dependent 
validations are invoked. But, as such validations
+                    // must be invoked for any write, we do it here.
+                    .thenCompose(actionResult -> 
validateAtTsIfWriteDidNotDoTsValidation(actionResult, txId, cmdType))

Review Comment:
   Comment is a little bit confusing as well. I have hard time understanding, 
why is validation split into different parts in the code, depending on the 
operation. Is this some kind of optimization?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests about basic Schema Synchronization properties that can be tested 
using just one Ignite node.
+ */
+class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
+    private static final int UNKNOWN_RECORD_ID = 999;
+
+    private static final int NODES_TO_START = 1;
+
+    private static final String TABLE_NAME = "test";
+    private static final String UNRELATED_TABLE_NAME = "unrelated_table";
+
+    private IgniteImpl node;
+
+    @Override
+    protected int initialNodes() {
+        return NODES_TO_START;
+    }
+
+    @BeforeEach
+    void assignNode() {
+        node = cluster.node(0);
+    }
+
+    /**
+     * Makes sure that the following sequence results in an operation failure 
and transaction rollback.
+     *
+     * <ol>
+     *     <li>A table is created</li>
+     *     <li>A transaction is started</li>
+     *     <li>The table is enlisted in the transaction</li>
+     *     <li>The table is ALTERed</li>
+     *     <li>An attempt to read or write to the table in the transaction is 
made</li>
+     * </ol>
+     */
+    @ParameterizedTest
+    @EnumSource(Operation.class)
+    void 
readWriteOperationInTxAfterAlteringSchemaOnTargetTableIsRejected(Operation 
operation) {
+        cluster.doInSession(0, session -> {
+            executeUpdate("create table " + TABLE_NAME + " (id int primary 
key, val varchar)", session);

Review Comment:
   Most of our tests use upper case for SQL statements. I wonder, why have you 
decided to use a lower case?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java:
##########
@@ -111,37 +103,27 @@ public void testSessionExpiration() throws Exception {
     @Test
     public void checkTimestampOperations() {
         String kvTblName = "tbl_all_columns_sql";
-        String schemaName = "PUBLIC";
-        String keyCol = "key";
-        int maxTimePrecision = TemporalColumnType.MAX_TIME_PRECISION;
+        String keyCol = "KEY";
 
         Ignite node = CLUSTER_NODES.get(0);
 
         // TODO: https://issues.apache.org/jira/browse/IGNITE-19162 Trim all 
less than millisecond information from timestamp
         //String tsStr = "2023-03-29T08:22:33.005007Z";
-        String tsStr = "2023-03-29T08:22:33.005Z";
+        // TODO: IGNITE-20105 it should be "2023-03-29T08:22:33.005Z";

Review Comment:
   This issue is already resolved. Can you please explain what's going on?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java:
##########
@@ -1693,8 +1743,107 @@ private static Stream<Arguments> 
multiRowsWriteRequestTypes() {
                 .map(Arguments::of);
     }
 
-    private static UUID beginTx() {
-        return TestTransactionIds.newTransactionId();
+    @CartesianTest
+    @CartesianTest.MethodFactory("singleRowReadOrWriteRequestTypesFactory")
+    void 
singleRowRwReadsAndWritesFailIfTableSchemaVersionIncreasedSinceTxStart(
+            RequestType requestType, boolean onExistingRow, boolean full
+    ) {
+        
testRwReadsAndWritesFailIfTableSchemaVersionIncreasedSinceTxStart(onExistingRow,
 (targetTxId, key) -> {

Review Comment:
   So, the actual test is `singleRow....`, but helper method for it is 
`testRwReads...`. I think that the naming is inconsistent.
   It also has too many words. Like why would you say "rw reads and writes", 
instead of "rw operations"? Why so mouthful?
   "TableSchemaVersionIncreased" - in other words, table is altered. This part 
can also be shorter.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1469,22 +1478,41 @@ private <T> CompletableFuture<T> appendTxCommand(UUID 
txId, RequestType cmdType,
         }
 
         if (!fut.isDone()) {
-            op.get().whenComplete((v, th) -> {
-                if (full) { // Fast unlock.
-                    releaseTxLocks(txId);
-                }
+            validateAtTsIfRwRead(txId, cmdType)

Review Comment:
   Honestly, I find these names difficult to read. Is there anything simpler? 
Like what exactly are we validating, and why does it matter that it happens 
only for RW reads? Maybe this information is unnecessary in the name.
   Simple `validateSchemaCompatibility` would be fine I think



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests about basic Schema Synchronization properties that can be tested 
using just one Ignite node.
+ */
+class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
+    private static final int UNKNOWN_RECORD_ID = 999;
+
+    private static final int NODES_TO_START = 1;
+
+    private static final String TABLE_NAME = "test";
+    private static final String UNRELATED_TABLE_NAME = "unrelated_table";
+
+    private IgniteImpl node;
+
+    @Override
+    protected int initialNodes() {
+        return NODES_TO_START;
+    }
+
+    @BeforeEach
+    void assignNode() {
+        node = cluster.node(0);
+    }
+
+    /**
+     * Makes sure that the following sequence results in an operation failure 
and transaction rollback.
+     *
+     * <ol>
+     *     <li>A table is created</li>
+     *     <li>A transaction is started</li>
+     *     <li>The table is enlisted in the transaction</li>
+     *     <li>The table is ALTERed</li>
+     *     <li>An attempt to read or write to the table in the transaction is 
made</li>
+     * </ol>
+     */
+    @ParameterizedTest
+    @EnumSource(Operation.class)
+    void 
readWriteOperationInTxAfterAlteringSchemaOnTargetTableIsRejected(Operation 
operation) {
+        cluster.doInSession(0, session -> {
+            executeUpdate("create table " + TABLE_NAME + " (id int primary 
key, val varchar)", session);
+        });
+
+        Table table = node.tables().table(TABLE_NAME);
+
+        InternalTransaction tx = (InternalTransaction) 
node.transactions().begin();
+
+        enlistTableInTransaction(table, tx);
+
+        cluster.doInSession(0, session -> {
+            executeUpdate("alter table " + TABLE_NAME + " add column added 
int", session);
+        });
+
+        IgniteException ex;
+
+        if (operation.sql()) {
+            ex = assertThrows(IgniteException.class, () -> 
operation.execute(table, tx, cluster));

Review Comment:
   I think we should assert a specific type here as well. If it's not ready, 
please provide a JIRA, where we'll fix it.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1928,10 +1960,12 @@ private CompletableFuture<Object> 
applyUpdateAllCommand(UpdateAllCommand cmd) {
      * Precesses single request.
      *
      * @param request Single request operation.
-     * @param txCoordinatorId Transaction coordinator id.
+     * @param txCoordId Transaction coordinator id.
      * @return Listener response.
      */
-    private CompletableFuture<Object> 
processSingleEntryAction(ReadWriteSingleRowReplicaRequest request, String 
txCoordinatorId) {
+    private CompletableFuture<ActionResult<Object>> processSingleEntryAction(
+            ReadWriteSingleRowReplicaRequest request, String txCoordId

Review Comment:
   Please split parameters on individual lines



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests about basic Schema Synchronization properties that can be tested 
using just one Ignite node.
+ */
+class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
+    private static final int UNKNOWN_RECORD_ID = 999;
+
+    private static final int NODES_TO_START = 1;
+
+    private static final String TABLE_NAME = "test";
+    private static final String UNRELATED_TABLE_NAME = "unrelated_table";
+
+    private IgniteImpl node;
+
+    @Override
+    protected int initialNodes() {
+        return NODES_TO_START;
+    }
+
+    @BeforeEach
+    void assignNode() {
+        node = cluster.node(0);
+    }
+
+    /**
+     * Makes sure that the following sequence results in an operation failure 
and transaction rollback.
+     *
+     * <ol>
+     *     <li>A table is created</li>
+     *     <li>A transaction is started</li>
+     *     <li>The table is enlisted in the transaction</li>
+     *     <li>The table is ALTERed</li>
+     *     <li>An attempt to read or write to the table in the transaction is 
made</li>
+     * </ol>
+     */
+    @ParameterizedTest
+    @EnumSource(Operation.class)
+    void 
readWriteOperationInTxAfterAlteringSchemaOnTargetTableIsRejected(Operation 
operation) {
+        cluster.doInSession(0, session -> {
+            executeUpdate("create table " + TABLE_NAME + " (id int primary 
key, val varchar)", session);
+        });
+
+        Table table = node.tables().table(TABLE_NAME);
+
+        InternalTransaction tx = (InternalTransaction) 
node.transactions().begin();
+
+        enlistTableInTransaction(table, tx);
+
+        cluster.doInSession(0, session -> {
+            executeUpdate("alter table " + TABLE_NAME + " add column added 
int", session);
+        });
+
+        IgniteException ex;
+
+        if (operation.sql()) {
+            ex = assertThrows(IgniteException.class, () -> 
operation.execute(table, tx, cluster));
+            assertThat(
+                    ex.getMessage(),
+                    containsString("Table schema was updated since the 
transaction was started [table=1, startSchema=1, operationSchema=2")
+            );
+        } else {
+            ex = assertThrows(TransactionException.class, () -> 
operation.execute(table, tx, cluster));

Review Comment:
   Is there even more specific type of exception for this case? We should be as 
specific as possible, according to our error handling design



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1469,22 +1478,41 @@ private <T> CompletableFuture<T> appendTxCommand(UUID 
txId, RequestType cmdType,
         }
 
         if (!fut.isDone()) {
-            op.get().whenComplete((v, th) -> {
-                if (full) { // Fast unlock.
-                    releaseTxLocks(txId);
-                }
+            validateAtTsIfRwRead(txId, cmdType)
+                    .thenCompose(unused -> op.get())
+                    // Write actions do ts-dependent validations after taking 
locks. If it turned out that nothing has to be written,
+                    // then no lock is taken and, hence, no ts-dependent 
validations are invoked. But, as such validations
+                    // must be invoked for any write, we do it here.
+                    .thenCompose(actionResult -> 
validateAtTsIfWriteDidNotDoTsValidation(actionResult, txId, cmdType))
+                    .whenComplete((v, th) -> {
+                        if (full) { // Fast unlock.
+                            releaseTxLocks(txId);
+                        }
 
-                if (th != null) {
-                    fut.completeExceptionally(th);
-                } else {
-                    fut.complete(v);
-                }
-            });
+                        if (th != null) {
+                            fut.completeExceptionally(th);
+                        } else {
+                            fut.complete(v);
+                        }
+                    });
         }
 
         return fut;
     }
 
+    private CompletableFuture<Void> validateAtTsIfRwRead(UUID txId, 
RequestType cmdType) {
+        return cmdType.isRwRead() ? chooseOpTsAndValidateAtIt(txId) : 
completedFuture(null);

Review Comment:
   Again, feels more like `validateAtCurrentTimestamp` or something shorter. We 
don't have to declare that the method chooses timestamp, that's overkill in my 
opinion.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1469,22 +1478,41 @@ private <T> CompletableFuture<T> appendTxCommand(UUID 
txId, RequestType cmdType,
         }
 
         if (!fut.isDone()) {
-            op.get().whenComplete((v, th) -> {
-                if (full) { // Fast unlock.
-                    releaseTxLocks(txId);
-                }
+            validateAtTsIfRwRead(txId, cmdType)
+                    .thenCompose(unused -> op.get())
+                    // Write actions do ts-dependent validations after taking 
locks. If it turned out that nothing has to be written,
+                    // then no lock is taken and, hence, no ts-dependent 
validations are invoked. But, as such validations
+                    // must be invoked for any write, we do it here.
+                    .thenCompose(actionResult -> 
validateAtTsIfWriteDidNotDoTsValidation(actionResult, txId, cmdType))

Review Comment:
   Same goes to this name. It's very difficult to read, especially considering 
the number of short/shortened words.
   1. Can the comment be moved to the javadoc of the method itself?
   2. Can we rename it into something more simple?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java:
##########
@@ -1693,8 +1743,107 @@ private static Stream<Arguments> 
multiRowsWriteRequestTypes() {
                 .map(Arguments::of);
     }
 
-    private static UUID beginTx() {
-        return TestTransactionIds.newTransactionId();
+    @CartesianTest
+    @CartesianTest.MethodFactory("singleRowReadOrWriteRequestTypesFactory")
+    void 
singleRowRwReadsAndWritesFailIfTableSchemaVersionIncreasedSinceTxStart(
+            RequestType requestType, boolean onExistingRow, boolean full
+    ) {
+        
testRwReadsAndWritesFailIfTableSchemaVersionIncreasedSinceTxStart(onExistingRow,
 (targetTxId, key) -> {
+            return doSingleRowRequest(targetTxId, 
marshalKeyOrKeyValue(requestType, key), requestType);
+        });
+    }
+
+    @SuppressWarnings("unused")
+    private static ArgumentSets singleRowReadOrWriteRequestTypesFactory() {
+        return 
ArgumentSets.argumentsForFirstParameter(singleRowReadOrWriteRequestTypes())
+                .argumentsForNextParameter(false, true)
+                .argumentsForNextParameter(false, true);
+    }
+
+    private static Stream<RequestType> singleRowReadOrWriteRequestTypes() {
+        return Arrays.stream(RequestType.values())
+                .filter(type -> RequestTypes.isSingleRowRwRead(type) || 
RequestTypes.isSingleRowWrite(type));
+    }
+
+    private void 
testRwReadsAndWritesFailIfTableSchemaVersionIncreasedSinceTxStart(
+            boolean onExistingRow, ListenerInvocation listenerInvocation
+    ) {
+        TestKey key = nextKey();
+
+        if (onExistingRow) {
+            upsertInNewTxFor(key);
+        }
+
+        UUID txId = beginTx();
+        HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId);
+
+        CatalogTableDescriptor tableVersion1 = 
mock(CatalogTableDescriptor.class);
+        CatalogTableDescriptor tableVersion2 = 
mock(CatalogTableDescriptor.class);
+        when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION);
+        when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION);
+
+        when(catalogTables.table(tblId, 
txBeginTs.longValue())).thenReturn(tableVersion1);
+        when(catalogTables.table(eq(tblId), 
gt(txBeginTs.longValue()))).thenReturn(tableVersion2);
+
+        CompletableFuture<?> future = listenerInvocation.invoke(txId, key);
+
+        TransactionException ex = assertWillThrowFast(future, 
TransactionException.class);
+        assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
+        assertThat(
+                ex.getMessage(),
+                is("Table schema was updated since the transaction was started 
[table=1, startSchema=1, operationSchema=2")

Review Comment:
   Why did you skip the closing bracket? Is it there in the actual message?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java:
##########
@@ -1693,8 +1743,107 @@ private static Stream<Arguments> 
multiRowsWriteRequestTypes() {
                 .map(Arguments::of);
     }
 
-    private static UUID beginTx() {
-        return TestTransactionIds.newTransactionId();
+    @CartesianTest
+    @CartesianTest.MethodFactory("singleRowReadOrWriteRequestTypesFactory")
+    void 
singleRowRwReadsAndWritesFailIfTableSchemaVersionIncreasedSinceTxStart(
+            RequestType requestType, boolean onExistingRow, boolean full
+    ) {
+        
testRwReadsAndWritesFailIfTableSchemaVersionIncreasedSinceTxStart(onExistingRow,
 (targetTxId, key) -> {
+            return doSingleRowRequest(targetTxId, 
marshalKeyOrKeyValue(requestType, key), requestType);
+        });
+    }
+
+    @SuppressWarnings("unused")
+    private static ArgumentSets singleRowReadOrWriteRequestTypesFactory() {
+        return 
ArgumentSets.argumentsForFirstParameter(singleRowReadOrWriteRequestTypes())
+                .argumentsForNextParameter(false, true)
+                .argumentsForNextParameter(false, true);
+    }
+
+    private static Stream<RequestType> singleRowReadOrWriteRequestTypes() {
+        return Arrays.stream(RequestType.values())
+                .filter(type -> RequestTypes.isSingleRowRwRead(type) || 
RequestTypes.isSingleRowWrite(type));
+    }
+
+    private void 
testRwReadsAndWritesFailIfTableSchemaVersionIncreasedSinceTxStart(
+            boolean onExistingRow, ListenerInvocation listenerInvocation
+    ) {
+        TestKey key = nextKey();
+
+        if (onExistingRow) {
+            upsertInNewTxFor(key);
+        }
+
+        UUID txId = beginTx();
+        HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId);
+
+        CatalogTableDescriptor tableVersion1 = 
mock(CatalogTableDescriptor.class);
+        CatalogTableDescriptor tableVersion2 = 
mock(CatalogTableDescriptor.class);
+        when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION);
+        when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION);
+
+        when(catalogTables.table(tblId, 
txBeginTs.longValue())).thenReturn(tableVersion1);
+        when(catalogTables.table(eq(tblId), 
gt(txBeginTs.longValue()))).thenReturn(tableVersion2);
+
+        CompletableFuture<?> future = listenerInvocation.invoke(txId, key);
+
+        TransactionException ex = assertWillThrowFast(future, 
TransactionException.class);
+        assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
+        assertThat(
+                ex.getMessage(),
+                is("Table schema was updated since the transaction was started 
[table=1, startSchema=1, operationSchema=2")
+        );
+    }
+
+    @CartesianTest
+    @CartesianTest.MethodFactory("multiRowsReadOrWriteRequestTypesFactory")
+    void multiRowRwReadsAndWritesFailIfTableSchemaVersionIncreasedSinceTxStart(
+            RequestType requestType, boolean onExistingRow, boolean full
+    ) {
+        
testRwReadsAndWritesFailIfTableSchemaVersionIncreasedSinceTxStart(onExistingRow,
 (targetTxId, key) -> {
+            return doMultiRowRequest(targetTxId, 
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
+        });
+    }
+
+    @SuppressWarnings("unused")
+    private static ArgumentSets multiRowsReadOrWriteRequestTypesFactory() {
+        return 
ArgumentSets.argumentsForFirstParameter(multiRowsReadOrWriteRequestTypes())
+                .argumentsForNextParameter(false, true)
+                .argumentsForNextParameter(false, true);
+    }
+
+    private static Stream<RequestType> multiRowsReadOrWriteRequestTypes() {
+        return Arrays.stream(RequestType.values())
+                .filter(type -> RequestTypes.isMultipleRowsRwRead(type) || 
RequestTypes.isMultipleRowsWrite(type));
+    }
+
+    @CartesianTest
+    void replaceRequestFailsIfTableSchemaVersionIncreasedSinceTxStart(
+            @Values(booleans = {false, true}) boolean onExistingRow,
+            @Values(booleans = {false, true}) boolean full
+    ) {
+        
testRwReadsAndWritesFailIfTableSchemaVersionIncreasedSinceTxStart(onExistingRow,
 (targetTxId, key) -> {
+            return doReplaceRequest(
+                    targetTxId,
+                    marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
+                    marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
+                    full
+            );
+        });
+    }
+
+    @CartesianTest
+    void rwScanRequestFailsIfTableSchemaVersionIncreasedSinceTxStart(
+            @Values(booleans = {false, true}) boolean onExistingRow,
+            @Values(booleans = {false, true}) boolean full
+    ) {
+        
testRwReadsAndWritesFailIfTableSchemaVersionIncreasedSinceTxStart(onExistingRow,
 (targetTxId, key) -> {
+            return doRwFullScanRetrieveBatchRequest(targetTxId, full);
+        });
+    }
+
+    private UUID beginTx() {
+        return transactionIdFor(clock.now());

Review Comment:
   So it's not a "begin", it just generates new TX ID. It confused be for a 
little bit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to