ibessonov commented on code in PR #2872:
URL: https://github.com/apache/ignite-3/pull/2872#discussion_r1404279517


##########
modules/api/src/main/java/org/apache/ignite/sql/ColumnType.java:
##########
@@ -127,4 +127,45 @@ public boolean scaleAllowed() {
     public boolean lengthAllowed() {
         return lengthAllowed;
     }
+
+    /**
+     * Whether this type can be converted to STRING automatically without a 
loss of precision.
+     */
+    public boolean convertsToStringLosslessly() {

Review Comment:
   I propose removing this functionality, because it's not supported in user 
API. Let's leave only the code that we really need right now, not in the future.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityTest.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests about forward compatibility of table schemas as defined by IEP-110.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-110%3A+Schema+synchronization%3A+basic+schema+changes";>IEP-110</a>
+ */
+class ItSchemaForwardCompatibilityTest extends ClusterPerTestIntegrationTest {
+    private static final int NODES_TO_START = 1;
+
+    private static final String TABLE_NAME = "test";
+
+    private IgniteImpl node;
+
+    @Override
+    protected int initialNodes() {
+        return NODES_TO_START;
+    }
+
+    @BeforeEach
+    void assignNode() {
+        node = cluster.node(0);
+    }
+
+    /**
+     * Makes sure forward-compatible schema changes happenning between 
transaction operations and
+     * commit do not prevent a commit from happening.
+     */
+    @ParameterizedTest
+    @EnumSource(ForwardCompatibleDdl.class)
+    void forwardCompatibleSchemaChangesAllowCommitting(ForwardCompatibleDdl 
ddl) {
+        createTable();
+
+        Transaction tx = node.transactions().begin();
+
+        writeIn(tx);
+
+        ddl.executeOn(cluster);
+
+        assertDoesNotThrow(tx::commit);
+    }
+
+    @SuppressWarnings("resource")
+    private void writeIn(Transaction tx) {
+        putInTx(cluster.node(0).tables().table(TABLE_NAME), tx);
+    }
+
+    /**
+     * Makes sure forward-incompatible schema changes happenning between 
transaction operations and
+     * commit prevent a commit from happening: instead, the transaction is 
aborted.
+     */
+    @ParameterizedTest
+    @EnumSource(ForwardIncompatibleDdl.class)
+    void 
forwardIncompatibleSchemaChangesDoNotAllowCommitting(ForwardIncompatibleDdl 
ddl) {
+        createTable();
+
+        Table table = node.tables().table(TABLE_NAME);
+
+        InternalTransaction tx = (InternalTransaction) 
node.transactions().begin();
+
+        writeIn(tx);
+
+        ddl.executeOn(cluster);
+
+        int tableId = ((TableViewInternal) table).tableId();
+
+        TransactionException ex = assertThrows(TransactionException.class, 
tx::commit);
+        assertThat(
+                ex.getMessage(),
+                containsString(String.format(
+                        "Commit failed because schema 1 is not 
forward-compatible with 2 for table %d",

Review Comment:
   Oof, I hope user will never see this message. It's completely useless, we 
need some TODOs to fix it I guess?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityTest.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests about forward compatibility of table schemas as defined by IEP-110.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-110%3A+Schema+synchronization%3A+basic+schema+changes";>IEP-110</a>
+ */
+class ItSchemaForwardCompatibilityTest extends ClusterPerTestIntegrationTest {
+    private static final int NODES_TO_START = 1;
+
+    private static final String TABLE_NAME = "test";
+
+    private IgniteImpl node;
+
+    @Override
+    protected int initialNodes() {
+        return NODES_TO_START;
+    }
+
+    @BeforeEach
+    void assignNode() {
+        node = cluster.node(0);
+    }
+
+    /**
+     * Makes sure forward-compatible schema changes happenning between 
transaction operations and
+     * commit do not prevent a commit from happening.
+     */
+    @ParameterizedTest
+    @EnumSource(ForwardCompatibleDdl.class)
+    void forwardCompatibleSchemaChangesAllowCommitting(ForwardCompatibleDdl 
ddl) {
+        createTable();
+
+        Transaction tx = node.transactions().begin();
+
+        writeIn(tx);
+
+        ddl.executeOn(cluster);

Review Comment:
   What would happen if I reordered these two lines?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatibilityValidator.java:
##########
@@ -259,4 +286,127 @@ void failIfRequestSchemaDiffersFromTxTs(HybridTimestamp 
txTs, int requestSchemaV
             throw new InternalSchemaVersionMismatchException();
         }
     }
+
+    private enum ValidatorVerdict {
+        COMPATIBLE, INCOMPATIBLE, DONT_CARE

Review Comment:
   Why do we use `DONT_CARE` instead of returning `COMPATIBLE`? The semantics 
is not clear at all.
   According to the loop in `isForwardCompatible`, `DONT_CARE` from all 
validators is equivalent to `INCOMPATIBLE`. On the other hand, according to 
validators, `DONT_CARE` is equivalent to `NO_OP`, and a bunch of `DONT_CARE` 
must be the same as `COMPATIBLE` then (except for some comments in a single 
place).
   I'm confused, and there's no documentation to un-confuse me.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatibilityValidator.java:
##########
@@ -47,6 +49,17 @@ class SchemaCompatibilityValidator {
     // TODO: Remove entries from cache when compacting schemas in 
SchemaManager https://issues.apache.org/jira/browse/IGNITE-20789
     private final ConcurrentMap<TableDefinitionDiffKey, TableDefinitionDiff> 
diffCache = new ConcurrentHashMap<>();
 
+    private final List<ForwardCompatibilityValidator> 
forwardCompatibilityValidators = List.of(

Review Comment:
   I assume that this list is immutable and should be static



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TypeConvertibility.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static org.apache.ignite.sql.ColumnType.DATE;
+import static org.apache.ignite.sql.ColumnType.DATETIME;
+import static org.apache.ignite.sql.ColumnType.DECIMAL;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.FLOAT;
+import static org.apache.ignite.sql.ColumnType.INT16;
+import static org.apache.ignite.sql.ColumnType.INT8;
+import static org.apache.ignite.sql.ColumnType.NUMBER;
+import static org.apache.ignite.sql.ColumnType.STRING;
+import static org.apache.ignite.sql.ColumnType.TIME;
+
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * Utility to determine type convertibility.
+ */
+public class TypeConvertibility {
+    /**
+     * Returns whether type change from old to new column version is lossless 
(that is, no information
+     * is lost during conversion).
+     *
+     * @param oldColumn Old column descriptor.
+     * @param newColumn New column descriptor.
+     */
+    // TODO: IGNITE-20956 - Unify this with Catalog rules.
+    public static boolean typeChangeIsLossless(CatalogTableColumnDescriptor 
oldColumn, CatalogTableColumnDescriptor newColumn) {

Review Comment:
   I believe you should use the same set of rules as in 
`AlterTableAlterColumnCommand#validateColumnChange`. Ideally, both these places 
should use the same method for checks.
   I see a TODO, but it seems weird to specifically add different code and 
assume that we will delete it in the future. The only reason for this that I 
can think of is a conversion to strings or something, but even then you could 
have avoided copy-pasting the same checks



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TypeConvertibility.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static org.apache.ignite.sql.ColumnType.DATE;
+import static org.apache.ignite.sql.ColumnType.DATETIME;
+import static org.apache.ignite.sql.ColumnType.DECIMAL;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.FLOAT;
+import static org.apache.ignite.sql.ColumnType.INT16;
+import static org.apache.ignite.sql.ColumnType.INT8;
+import static org.apache.ignite.sql.ColumnType.NUMBER;
+import static org.apache.ignite.sql.ColumnType.STRING;
+import static org.apache.ignite.sql.ColumnType.TIME;
+
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * Utility to determine type convertibility.
+ */
+public class TypeConvertibility {
+    /**
+     * Returns whether type change from old to new column version is lossless 
(that is, no information
+     * is lost during conversion).
+     *
+     * @param oldColumn Old column descriptor.
+     * @param newColumn New column descriptor.
+     */
+    // TODO: IGNITE-20956 - Unify this with Catalog rules.
+    public static boolean typeChangeIsLossless(CatalogTableColumnDescriptor 
oldColumn, CatalogTableColumnDescriptor newColumn) {
+        ColumnType oldType = oldColumn.type();
+        ColumnType newType = newColumn.type();
+
+        // Int to Int.
+        if (oldType.integral() && newType.integral()) {
+            return oldType.ordinal() <= newType.ordinal();
+        }
+
+        // Int to floating point.
+        if ((oldType == INT8 || oldType == INT16) && newType.floatingPoint()) {

Review Comment:
   This should not be supported as well



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TypeConvertibility.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static org.apache.ignite.sql.ColumnType.DATE;
+import static org.apache.ignite.sql.ColumnType.DATETIME;
+import static org.apache.ignite.sql.ColumnType.DECIMAL;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.FLOAT;
+import static org.apache.ignite.sql.ColumnType.INT16;
+import static org.apache.ignite.sql.ColumnType.INT8;
+import static org.apache.ignite.sql.ColumnType.NUMBER;
+import static org.apache.ignite.sql.ColumnType.STRING;
+import static org.apache.ignite.sql.ColumnType.TIME;
+
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * Utility to determine type convertibility.
+ */
+public class TypeConvertibility {
+    /**
+     * Returns whether type change from old to new column version is lossless 
(that is, no information
+     * is lost during conversion).
+     *
+     * @param oldColumn Old column descriptor.
+     * @param newColumn New column descriptor.
+     */
+    // TODO: IGNITE-20956 - Unify this with Catalog rules.
+    public static boolean typeChangeIsLossless(CatalogTableColumnDescriptor 
oldColumn, CatalogTableColumnDescriptor newColumn) {
+        ColumnType oldType = oldColumn.type();
+        ColumnType newType = newColumn.type();
+
+        // Int to Int.
+        if (oldType.integral() && newType.integral()) {
+            return oldType.ordinal() <= newType.ordinal();
+        }
+
+        // Int to floating point.
+        if ((oldType == INT8 || oldType == INT16) && newType.floatingPoint()) {
+            return true;
+        }
+
+        if (oldType == FLOAT && newType == DOUBLE) {
+            return true;
+        }
+
+        // Time/date to Datetime.
+        if ((oldType == TIME || oldType == DATE) && newType == DATETIME) {
+            return true;
+        }
+
+        // Int <-> NUMBER/DECIMAL.
+        if (oldType.integral() && (newType == NUMBER || newType == DECIMAL) && 
integralMaxDigits(oldType) <= maxExactIntPlaces(newColumn)) {
+            return true;
+        }
+        if ((oldType == NUMBER || oldType == DECIMAL) && newType.integral() && 
oldColumn.scale() <= 0
+                && oldColumn.precision() - oldColumn.scale() < 
integralMaxDigits(newType)) {
+            return true;
+        }
+
+        // NUMBER <-> DECIMAL.
+        if ((oldType == NUMBER && newType == DECIMAL || oldType == DECIMAL && 
newType == NUMBER)

Review Comment:
   Is this supported by altering columns? I don't think so. Please remove all 
options, that are currently not supported by the design, i.e. all 
binary-incompatible conversions



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatibilityValidator.java:
##########
@@ -203,9 +229,10 @@ private CompatValidationResult 
validateBackwardSchemaCompatibility(
         return CompatValidationResult.success();
     }
 
+    @SuppressWarnings("unused")

Review Comment:
   If it's unused, why do we keep it?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatibilityValidatorTest.java:
##########
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_LENGTH;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PRECISION;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_SCALE;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.sql.ColumnType.BITMASK;
+import static org.apache.ignite.sql.ColumnType.BOOLEAN;
+import static org.apache.ignite.sql.ColumnType.BYTE_ARRAY;
+import static org.apache.ignite.sql.ColumnType.DATE;
+import static org.apache.ignite.sql.ColumnType.DATETIME;
+import static org.apache.ignite.sql.ColumnType.DECIMAL;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.DURATION;
+import static org.apache.ignite.sql.ColumnType.FLOAT;
+import static org.apache.ignite.sql.ColumnType.INT16;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.apache.ignite.sql.ColumnType.INT64;
+import static org.apache.ignite.sql.ColumnType.INT8;
+import static org.apache.ignite.sql.ColumnType.NUMBER;
+import static org.apache.ignite.sql.ColumnType.PERIOD;
+import static org.apache.ignite.sql.ColumnType.STRING;
+import static org.apache.ignite.sql.ColumnType.TIME;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
+import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SchemaCompatibilityValidatorTest extends BaseIgniteAbstractTest {
+    @Mock
+    private ValidationSchemasSource schemasSource;
+
+    @Mock
+    private CatalogService catalogService;
+
+    @Mock
+    private SchemaSyncService schemaSyncService;
+
+    @InjectMocks
+    private SchemaCompatibilityValidator validator;
+
+    private final HybridTimestamp beginTimestamp = new HybridTimestamp(1, 1);
+
+    private final HybridTimestamp commitTimestamp = new HybridTimestamp(2, 2);
+
+    private final UUID txId = TransactionIds.transactionId(beginTimestamp, 0);
+
+    private static final int TABLE_ID = 1;
+    private static final String TABLE_NAME = "test";
+    private static final String ANOTHER_NAME = "another";
+
+    private final TablePartitionId tablePartitionId = new 
TablePartitionId(TABLE_ID, 0);
+
+    @BeforeEach
+    void configureMocks() {
+        
when(schemaSyncService.waitForMetadataCompleteness(any())).thenReturn(completedFuture(null));
+        lenient().when(catalogService.table(TABLE_ID, 
commitTimestamp.longValue())).thenReturn(mock(CatalogTableDescriptor.class));
+    }
+
+    @ParameterizedTest
+    @EnumSource(ForwardCompatibleChange.class)
+    void forwardCompatibleChangesAllowCommitting(ForwardCompatibleChange 
change) {
+        assertForwardCompatibleChangeAllowsCommitting(change);
+    }
+
+    private void 
assertForwardCompatibleChangeAllowsCommitting(SchemaChangeSource changeSource) {
+        when(schemasSource.tableSchemaVersionsBetween(TABLE_ID, 
beginTimestamp, commitTimestamp))
+                .thenReturn(changeSource.schemaVersions());
+
+        CompletableFuture<CompatValidationResult> resultFuture = 
validator.validateCommit(txId, List.of(tablePartitionId), commitTimestamp);
+
+        assertThat(resultFuture, willCompleteSuccessfully());
+
+        CompatValidationResult result = resultFuture.getNow(null);
+        assertThat(result, is(notNullValue()));
+
+        assertThat("Change is incompatible", result.isSuccessful(), is(true));
+    }
+
+    @ParameterizedTest
+    @EnumSource(ForwardIncompatibleChange.class)
+    void 
forwardIncompatibleChangesDisallowCommitting(ForwardIncompatibleChange change) {
+        assertForwardIncompatibleChangeDisallowsCommitting(change);
+    }
+
+    private void 
assertForwardIncompatibleChangeDisallowsCommitting(SchemaChangeSource 
changeSource) {
+        when(schemasSource.tableSchemaVersionsBetween(TABLE_ID, 
beginTimestamp, commitTimestamp))
+                .thenReturn(changeSource.schemaVersions());
+
+        CompletableFuture<CompatValidationResult> resultFuture = 
validator.validateCommit(txId, List.of(tablePartitionId), commitTimestamp);
+
+        assertThat(resultFuture, willCompleteSuccessfully());
+
+        CompatValidationResult result = resultFuture.getNow(null);
+        assertThat(result, is(notNullValue()));
+
+        assertThat("Change is compatible", result.isSuccessful(), is(false));
+        assertThat(result.isTableDropped(), is(false));
+
+        assertThat(result.failedTableId(), is(TABLE_ID));
+        assertThat(result.fromSchemaVersion(), is(1));
+        assertThat(result.toSchemaVersion(), is(2));
+    }
+
+    @ParameterizedTest
+    @MethodSource("exactColumnTypeChanges")
+    void exactColumnTypeChangesAllowCommitting(ColumnTypeChange change) {
+        assertForwardCompatibleChangeAllowsCommitting(change);
+    }
+
+    private static Stream<Arguments> exactColumnTypeChanges() {
+        List<ColumnTypeChange> changes = new ArrayList<>();
+
+        for (IgniteBiTuple<ColumnType, ColumnType> pair : 
simpleTypeCompatibleChanges()) {
+            //noinspection ConstantConditions
+            changes.add(new ColumnTypeChange(pair.get1(), pair.get2()));
+        }
+
+        // Integral types to NUMBER with enough precision.
+        changes.add(new ColumnTypeChange(INT8, number(3)));
+        changes.add(new ColumnTypeChange(INT16, number(5)));
+        changes.add(new ColumnTypeChange(INT32, number(9)));
+        changes.add(new ColumnTypeChange(INT64, number(17)));

Review Comment:
   All these magic constants...



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatibilityValidator.java:
##########
@@ -259,4 +286,127 @@ void failIfRequestSchemaDiffersFromTxTs(HybridTimestamp 
txTs, int requestSchemaV
             throw new InternalSchemaVersionMismatchException();
         }
     }
+
+    private enum ValidatorVerdict {
+        COMPATIBLE, INCOMPATIBLE, DONT_CARE
+    }
+
+    @SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+    private interface ForwardCompatibilityValidator {
+        ValidatorVerdict compatible(TableDefinitionDiff diff);
+    }
+
+    private static class RenameTableValidator implements 
ForwardCompatibilityValidator {
+        @Override
+        public ValidatorVerdict compatible(TableDefinitionDiff diff) {
+            return diff.nameDiffers() ? ValidatorVerdict.INCOMPATIBLE : 
ValidatorVerdict.DONT_CARE;
+        }
+    }
+
+    private static class AddColumnsValidator implements 
ForwardCompatibilityValidator {
+        @Override
+        public ValidatorVerdict compatible(TableDefinitionDiff diff) {
+            if (diff.addedColumns().isEmpty()) {
+                return ValidatorVerdict.DONT_CARE;
+            }
+
+            for (CatalogTableColumnDescriptor column : diff.addedColumns()) {
+                if (!column.nullable() && column.defaultValue() == null) {
+                    return ValidatorVerdict.INCOMPATIBLE;
+                }
+            }
+
+            return ValidatorVerdict.COMPATIBLE;
+        }
+    }
+
+    private static class DropColumnsValidator implements 
ForwardCompatibilityValidator {
+        @Override
+        public ValidatorVerdict compatible(TableDefinitionDiff diff) {
+            return diff.removedColumns().isEmpty() ? 
ValidatorVerdict.DONT_CARE : ValidatorVerdict.INCOMPATIBLE;
+        }
+    }
+
+    @SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+    private interface ColumnChangeCompatibilityValidator {
+        ValidatorVerdict compatible(ColumnDefinitionDiff diff);
+    }
+
+    private static class ChangeColumnsValidator implements 
ForwardCompatibilityValidator {
+        private final List<ColumnChangeCompatibilityValidator> validators;
+
+        private 
ChangeColumnsValidator(List<ColumnChangeCompatibilityValidator> validators) {
+            this.validators = List.copyOf(validators);
+        }
+
+        @Override
+        public ValidatorVerdict compatible(TableDefinitionDiff diff) {
+            if (diff.changedColumns().isEmpty()) {
+                return ValidatorVerdict.DONT_CARE;
+            }
+
+            boolean accepted = false;
+
+            for (ColumnDefinitionDiff columnDiff : diff.changedColumns()) {
+                switch (compatible(columnDiff)) {
+                    case COMPATIBLE:
+                        accepted = true;
+                        break;
+                    case INCOMPATIBLE:
+                        return ValidatorVerdict.INCOMPATIBLE;
+                    default:
+                        break;
+                }
+            }
+
+            // If for some column no validator either accepts or refuses the 
change, then this
+            // is an unknown change, that we consider incompatible by default.

Review Comment:
   But why? You're claiming that `[COMPATIBLE, DONT_CARE]` is OK, `[DONT_CARE, 
COMPATIBLE]` is OK, but `[DONT_CARE, DONT_CARE]` is not OK. I don't understand 
what it's supposed to mean



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TypeConvertibility.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static org.apache.ignite.sql.ColumnType.DATE;
+import static org.apache.ignite.sql.ColumnType.DATETIME;
+import static org.apache.ignite.sql.ColumnType.DECIMAL;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.FLOAT;
+import static org.apache.ignite.sql.ColumnType.INT16;
+import static org.apache.ignite.sql.ColumnType.INT8;
+import static org.apache.ignite.sql.ColumnType.NUMBER;
+import static org.apache.ignite.sql.ColumnType.STRING;
+import static org.apache.ignite.sql.ColumnType.TIME;
+
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * Utility to determine type convertibility.
+ */
+public class TypeConvertibility {
+    /**
+     * Returns whether type change from old to new column version is lossless 
(that is, no information
+     * is lost during conversion).
+     *
+     * @param oldColumn Old column descriptor.
+     * @param newColumn New column descriptor.
+     */
+    // TODO: IGNITE-20956 - Unify this with Catalog rules.
+    public static boolean typeChangeIsLossless(CatalogTableColumnDescriptor 
oldColumn, CatalogTableColumnDescriptor newColumn) {
+        ColumnType oldType = oldColumn.type();
+        ColumnType newType = newColumn.type();
+
+        // Int to Int.
+        if (oldType.integral() && newType.integral()) {
+            return oldType.ordinal() <= newType.ordinal();
+        }
+
+        // Int to floating point.
+        if ((oldType == INT8 || oldType == INT16) && newType.floatingPoint()) {
+            return true;
+        }
+
+        if (oldType == FLOAT && newType == DOUBLE) {
+            return true;
+        }
+
+        // Time/date to Datetime.
+        if ((oldType == TIME || oldType == DATE) && newType == DATETIME) {
+            return true;
+        }
+
+        // Int <-> NUMBER/DECIMAL.
+        if (oldType.integral() && (newType == NUMBER || newType == DECIMAL) && 
integralMaxDigits(oldType) <= maxExactIntPlaces(newColumn)) {
+            return true;
+        }
+        if ((oldType == NUMBER || oldType == DECIMAL) && newType.integral() && 
oldColumn.scale() <= 0
+                && oldColumn.precision() - oldColumn.scale() < 
integralMaxDigits(newType)) {
+            return true;
+        }
+
+        // NUMBER <-> DECIMAL.
+        if ((oldType == NUMBER && newType == DECIMAL || oldType == DECIMAL && 
newType == NUMBER)
+                && oldColumn.scale() <= newColumn.scale()
+                && oldColumn.precision() - oldColumn.scale() <= 
newColumn.precision() - newColumn.scale()) {
+            return true;
+        }
+
+        // Increasing length.
+        if (oldType == newType && oldType.lengthAllowed()
+                && oldColumn.length() <= newColumn.length()
+                && oldColumn.scale() == newColumn.scale()
+                && oldColumn.precision() == newColumn.precision()) {
+            return true;
+        }
+
+        // Increasing precision.
+        if (oldType == newType && oldType.precisionAllowed()
+                && oldColumn.length() == newColumn.length()
+                && oldColumn.scale() == newColumn.scale()
+                && oldColumn.precision() <= newColumn.precision()) {
+            return true;
+        }
+
+        // To STRING.
+        if (oldType.convertsToStringLosslessly() && newType == STRING && 
maxCharacters(oldColumn) <= newColumn.length()) {
+            return true;
+        }
+
+        return false;
+    }
+
+    private static int integralMaxDigits(ColumnType integralType) {

Review Comment:
   What should this mean? If that's a number of digits in corresponding 
`MAX_VALUE` values, then this method is simply wrong. Maximal integer has 10 
digits, while maximal long has 19.
   If it's something else, then I'm totally lost



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatibilityValidatorTest.java:
##########
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_LENGTH;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PRECISION;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_SCALE;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.sql.ColumnType.BITMASK;
+import static org.apache.ignite.sql.ColumnType.BOOLEAN;
+import static org.apache.ignite.sql.ColumnType.BYTE_ARRAY;
+import static org.apache.ignite.sql.ColumnType.DATE;
+import static org.apache.ignite.sql.ColumnType.DATETIME;
+import static org.apache.ignite.sql.ColumnType.DECIMAL;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.DURATION;
+import static org.apache.ignite.sql.ColumnType.FLOAT;
+import static org.apache.ignite.sql.ColumnType.INT16;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.apache.ignite.sql.ColumnType.INT64;
+import static org.apache.ignite.sql.ColumnType.INT8;
+import static org.apache.ignite.sql.ColumnType.NUMBER;
+import static org.apache.ignite.sql.ColumnType.PERIOD;
+import static org.apache.ignite.sql.ColumnType.STRING;
+import static org.apache.ignite.sql.ColumnType.TIME;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
+import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SchemaCompatibilityValidatorTest extends BaseIgniteAbstractTest {
+    @Mock
+    private ValidationSchemasSource schemasSource;
+
+    @Mock
+    private CatalogService catalogService;
+
+    @Mock
+    private SchemaSyncService schemaSyncService;
+
+    @InjectMocks
+    private SchemaCompatibilityValidator validator;
+
+    private final HybridTimestamp beginTimestamp = new HybridTimestamp(1, 1);
+
+    private final HybridTimestamp commitTimestamp = new HybridTimestamp(2, 2);
+
+    private final UUID txId = TransactionIds.transactionId(beginTimestamp, 0);
+
+    private static final int TABLE_ID = 1;
+    private static final String TABLE_NAME = "test";
+    private static final String ANOTHER_NAME = "another";
+
+    private final TablePartitionId tablePartitionId = new 
TablePartitionId(TABLE_ID, 0);
+
+    @BeforeEach
+    void configureMocks() {
+        
when(schemaSyncService.waitForMetadataCompleteness(any())).thenReturn(completedFuture(null));
+        lenient().when(catalogService.table(TABLE_ID, 
commitTimestamp.longValue())).thenReturn(mock(CatalogTableDescriptor.class));
+    }
+
+    @ParameterizedTest
+    @EnumSource(ForwardCompatibleChange.class)
+    void forwardCompatibleChangesAllowCommitting(ForwardCompatibleChange 
change) {
+        assertForwardCompatibleChangeAllowsCommitting(change);
+    }
+
+    private void 
assertForwardCompatibleChangeAllowsCommitting(SchemaChangeSource changeSource) {
+        when(schemasSource.tableSchemaVersionsBetween(TABLE_ID, 
beginTimestamp, commitTimestamp))
+                .thenReturn(changeSource.schemaVersions());
+
+        CompletableFuture<CompatValidationResult> resultFuture = 
validator.validateCommit(txId, List.of(tablePartitionId), commitTimestamp);
+
+        assertThat(resultFuture, willCompleteSuccessfully());
+
+        CompatValidationResult result = resultFuture.getNow(null);
+        assertThat(result, is(notNullValue()));
+
+        assertThat("Change is incompatible", result.isSuccessful(), is(true));
+    }
+
+    @ParameterizedTest
+    @EnumSource(ForwardIncompatibleChange.class)
+    void 
forwardIncompatibleChangesDisallowCommitting(ForwardIncompatibleChange change) {
+        assertForwardIncompatibleChangeDisallowsCommitting(change);
+    }
+
+    private void 
assertForwardIncompatibleChangeDisallowsCommitting(SchemaChangeSource 
changeSource) {
+        when(schemasSource.tableSchemaVersionsBetween(TABLE_ID, 
beginTimestamp, commitTimestamp))
+                .thenReturn(changeSource.schemaVersions());
+
+        CompletableFuture<CompatValidationResult> resultFuture = 
validator.validateCommit(txId, List.of(tablePartitionId), commitTimestamp);
+
+        assertThat(resultFuture, willCompleteSuccessfully());
+
+        CompatValidationResult result = resultFuture.getNow(null);
+        assertThat(result, is(notNullValue()));
+
+        assertThat("Change is compatible", result.isSuccessful(), is(false));
+        assertThat(result.isTableDropped(), is(false));
+
+        assertThat(result.failedTableId(), is(TABLE_ID));
+        assertThat(result.fromSchemaVersion(), is(1));
+        assertThat(result.toSchemaVersion(), is(2));
+    }
+
+    @ParameterizedTest
+    @MethodSource("exactColumnTypeChanges")
+    void exactColumnTypeChangesAllowCommitting(ColumnTypeChange change) {
+        assertForwardCompatibleChangeAllowsCommitting(change);
+    }
+
+    private static Stream<Arguments> exactColumnTypeChanges() {
+        List<ColumnTypeChange> changes = new ArrayList<>();
+
+        for (IgniteBiTuple<ColumnType, ColumnType> pair : 
simpleTypeCompatibleChanges()) {
+            //noinspection ConstantConditions
+            changes.add(new ColumnTypeChange(pair.get1(), pair.get2()));
+        }
+
+        // Integral types to NUMBER with enough precision.
+        changes.add(new ColumnTypeChange(INT8, number(3)));
+        changes.add(new ColumnTypeChange(INT16, number(5)));
+        changes.add(new ColumnTypeChange(INT32, number(9)));
+        changes.add(new ColumnTypeChange(INT64, number(17)));
+
+        // Integral types to DECIMAL with enough precision and zero scale.
+        changes.add(new ColumnTypeChange(INT8, decimal(3, 0)));
+        changes.add(new ColumnTypeChange(INT16, decimal(5, 0)));
+        changes.add(new ColumnTypeChange(INT32, decimal(9, 0)));
+        changes.add(new ColumnTypeChange(INT64, decimal(17, 0)));
+
+        // NUMBER to DECIMAL with enough precision and non-zero scale.
+        changes.add(new ColumnTypeChange(number(9), decimal(10, 1)));
+        changes.add(new ColumnTypeChange(number(9), decimal(11, 1)));
+
+        // NUMBER to DECIMAL with enough precision and zero scale.
+        changes.add(new ColumnTypeChange(number(10), decimal(10, 0)));
+        changes.add(new ColumnTypeChange(number(10), decimal(11, 0)));
+
+        // DECIMAL (with scale=0) TO NUMBER with enough precision.
+        changes.add(new ColumnTypeChange(decimal(10, 0), number(10)));
+
+        // Increasing precision.
+        changes.add(new ColumnTypeChange(decimal(10, 5), decimal(15, 5)));
+
+        // Increasing length.
+        changes.add(new ColumnTypeChange(string(10), string(20)));
+        changes.add(new ColumnTypeChange(byteArray(10), byteArray(20)));
+
+        // Conversions to STRING with enough length.
+        changes.add(new ColumnTypeChange(INT8, string(4)));
+        changes.add(new ColumnTypeChange(INT16, string(6)));
+        changes.add(new ColumnTypeChange(INT32, string(10)));
+        changes.add(new ColumnTypeChange(INT64, string(18)));
+        changes.add(new ColumnTypeChange(decimal(10, 0), string(11)));
+        changes.add(new ColumnTypeChange(decimal(10, 3), string(12)));
+        changes.add(new ColumnTypeChange(number(10), string(11)));
+        changes.add(new ColumnTypeChange(ColumnType.UUID, string(36)));
+
+        return changes.stream().map(Arguments::of);
+    }
+
+    private static List<IgniteBiTuple<ColumnType, ColumnType>> 
simpleTypeCompatibleChanges() {
+        List<IgniteBiTuple<ColumnType, ColumnType>> changes = new 
ArrayList<>();
+
+        List<ColumnType> intTypes = List.of(INT8, INT16, INT32, INT64);
+        List<ColumnType> floatingPointTypes = List.of(FLOAT, DOUBLE);
+
+        // INT8->INT16->INT32->INT64.
+        for (int i = 0; i < intTypes.size() - 1; i++) {
+            ColumnType narrowerType = intTypes.get(i);
+
+            for (int j = i + 1; j < intTypes.size(); j++) {
+                ColumnType widerType = intTypes.get(j);
+                changes.add(new IgniteBiTuple<>(narrowerType, widerType));
+            }
+        }
+
+        // Integral types to floating point types.
+        for (ColumnType intType : List.of(INT8, INT16)) {
+            for (ColumnType floatingPointType : floatingPointTypes) {
+                changes.add(new IgniteBiTuple<>(intType, floatingPointType));
+            }
+        }
+
+        changes.add(new IgniteBiTuple<>(FLOAT, DOUBLE));
+
+        changes.add(new IgniteBiTuple<>(DATE, DATETIME));
+        changes.add(new IgniteBiTuple<>(TIME, DATETIME));

Review Comment:
   I never checked myself, are these binary compatible?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchemaTest.java:
##########
@@ -29,17 +29,8 @@
 import org.junit.jupiter.api.Test;
 
 class FullTableSchemaTest {
-    @Test
-    void sameSchemasHaveEmptyDiff() {
-        CatalogTableColumnDescriptor column = someColumn("a");
-
-        var schema1 = new FullTableSchema(1, 1, List.of(column));
-        var schema2 = new FullTableSchema(2, 1, List.of(column));
-
-        TableDefinitionDiff diff = schema2.diffFrom(schema1);
-
-        assertThat(diff.isEmpty(), is(true));
-    }
+    private static final String TABLE_NAME1 = "test1";

Review Comment:
   Why have you deleted the test?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatibilityValidatorTest.java:
##########
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_LENGTH;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PRECISION;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_SCALE;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.sql.ColumnType.BITMASK;
+import static org.apache.ignite.sql.ColumnType.BOOLEAN;
+import static org.apache.ignite.sql.ColumnType.BYTE_ARRAY;
+import static org.apache.ignite.sql.ColumnType.DATE;
+import static org.apache.ignite.sql.ColumnType.DATETIME;
+import static org.apache.ignite.sql.ColumnType.DECIMAL;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.DURATION;
+import static org.apache.ignite.sql.ColumnType.FLOAT;
+import static org.apache.ignite.sql.ColumnType.INT16;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.apache.ignite.sql.ColumnType.INT64;
+import static org.apache.ignite.sql.ColumnType.INT8;
+import static org.apache.ignite.sql.ColumnType.NUMBER;
+import static org.apache.ignite.sql.ColumnType.PERIOD;
+import static org.apache.ignite.sql.ColumnType.STRING;
+import static org.apache.ignite.sql.ColumnType.TIME;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
+import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SchemaCompatibilityValidatorTest extends BaseIgniteAbstractTest {
+    @Mock
+    private ValidationSchemasSource schemasSource;
+
+    @Mock
+    private CatalogService catalogService;
+
+    @Mock
+    private SchemaSyncService schemaSyncService;
+
+    @InjectMocks
+    private SchemaCompatibilityValidator validator;
+
+    private final HybridTimestamp beginTimestamp = new HybridTimestamp(1, 1);
+
+    private final HybridTimestamp commitTimestamp = new HybridTimestamp(2, 2);
+
+    private final UUID txId = TransactionIds.transactionId(beginTimestamp, 0);
+
+    private static final int TABLE_ID = 1;
+    private static final String TABLE_NAME = "test";
+    private static final String ANOTHER_NAME = "another";
+
+    private final TablePartitionId tablePartitionId = new 
TablePartitionId(TABLE_ID, 0);
+
+    @BeforeEach
+    void configureMocks() {
+        
when(schemaSyncService.waitForMetadataCompleteness(any())).thenReturn(completedFuture(null));

Review Comment:
   By the way, isn't it unused in the class? If it's unused, then please remove 
the field completely



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatibilityValidatorTest.java:
##########
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_LENGTH;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PRECISION;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_SCALE;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.sql.ColumnType.BITMASK;
+import static org.apache.ignite.sql.ColumnType.BOOLEAN;
+import static org.apache.ignite.sql.ColumnType.BYTE_ARRAY;
+import static org.apache.ignite.sql.ColumnType.DATE;
+import static org.apache.ignite.sql.ColumnType.DATETIME;
+import static org.apache.ignite.sql.ColumnType.DECIMAL;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.DURATION;
+import static org.apache.ignite.sql.ColumnType.FLOAT;
+import static org.apache.ignite.sql.ColumnType.INT16;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.apache.ignite.sql.ColumnType.INT64;
+import static org.apache.ignite.sql.ColumnType.INT8;
+import static org.apache.ignite.sql.ColumnType.NUMBER;
+import static org.apache.ignite.sql.ColumnType.PERIOD;
+import static org.apache.ignite.sql.ColumnType.STRING;
+import static org.apache.ignite.sql.ColumnType.TIME;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
+import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SchemaCompatibilityValidatorTest extends BaseIgniteAbstractTest {
+    @Mock
+    private ValidationSchemasSource schemasSource;
+
+    @Mock
+    private CatalogService catalogService;
+
+    @Mock
+    private SchemaSyncService schemaSyncService;
+
+    @InjectMocks
+    private SchemaCompatibilityValidator validator;
+
+    private final HybridTimestamp beginTimestamp = new HybridTimestamp(1, 1);
+
+    private final HybridTimestamp commitTimestamp = new HybridTimestamp(2, 2);
+
+    private final UUID txId = TransactionIds.transactionId(beginTimestamp, 0);
+
+    private static final int TABLE_ID = 1;
+    private static final String TABLE_NAME = "test";
+    private static final String ANOTHER_NAME = "another";
+
+    private final TablePartitionId tablePartitionId = new 
TablePartitionId(TABLE_ID, 0);
+
+    @BeforeEach
+    void configureMocks() {
+        
when(schemaSyncService.waitForMetadataCompleteness(any())).thenReturn(completedFuture(null));
+        lenient().when(catalogService.table(TABLE_ID, 
commitTimestamp.longValue())).thenReturn(mock(CatalogTableDescriptor.class));
+    }
+
+    @ParameterizedTest
+    @EnumSource(ForwardCompatibleChange.class)
+    void forwardCompatibleChangesAllowCommitting(ForwardCompatibleChange 
change) {
+        assertForwardCompatibleChangeAllowsCommitting(change);
+    }
+
+    private void 
assertForwardCompatibleChangeAllowsCommitting(SchemaChangeSource changeSource) {
+        when(schemasSource.tableSchemaVersionsBetween(TABLE_ID, 
beginTimestamp, commitTimestamp))
+                .thenReturn(changeSource.schemaVersions());
+
+        CompletableFuture<CompatValidationResult> resultFuture = 
validator.validateCommit(txId, List.of(tablePartitionId), commitTimestamp);
+
+        assertThat(resultFuture, willCompleteSuccessfully());
+
+        CompatValidationResult result = resultFuture.getNow(null);
+        assertThat(result, is(notNullValue()));
+
+        assertThat("Change is incompatible", result.isSuccessful(), is(true));
+    }
+
+    @ParameterizedTest
+    @EnumSource(ForwardIncompatibleChange.class)
+    void 
forwardIncompatibleChangesDisallowCommitting(ForwardIncompatibleChange change) {
+        assertForwardIncompatibleChangeDisallowsCommitting(change);
+    }
+
+    private void 
assertForwardIncompatibleChangeDisallowsCommitting(SchemaChangeSource 
changeSource) {
+        when(schemasSource.tableSchemaVersionsBetween(TABLE_ID, 
beginTimestamp, commitTimestamp))
+                .thenReturn(changeSource.schemaVersions());
+
+        CompletableFuture<CompatValidationResult> resultFuture = 
validator.validateCommit(txId, List.of(tablePartitionId), commitTimestamp);
+
+        assertThat(resultFuture, willCompleteSuccessfully());
+
+        CompatValidationResult result = resultFuture.getNow(null);
+        assertThat(result, is(notNullValue()));
+
+        assertThat("Change is compatible", result.isSuccessful(), is(false));
+        assertThat(result.isTableDropped(), is(false));
+
+        assertThat(result.failedTableId(), is(TABLE_ID));
+        assertThat(result.fromSchemaVersion(), is(1));
+        assertThat(result.toSchemaVersion(), is(2));
+    }
+
+    @ParameterizedTest
+    @MethodSource("exactColumnTypeChanges")
+    void exactColumnTypeChangesAllowCommitting(ColumnTypeChange change) {
+        assertForwardCompatibleChangeAllowsCommitting(change);
+    }
+
+    private static Stream<Arguments> exactColumnTypeChanges() {
+        List<ColumnTypeChange> changes = new ArrayList<>();
+
+        for (IgniteBiTuple<ColumnType, ColumnType> pair : 
simpleTypeCompatibleChanges()) {
+            //noinspection ConstantConditions
+            changes.add(new ColumnTypeChange(pair.get1(), pair.get2()));
+        }
+
+        // Integral types to NUMBER with enough precision.
+        changes.add(new ColumnTypeChange(INT8, number(3)));
+        changes.add(new ColumnTypeChange(INT16, number(5)));
+        changes.add(new ColumnTypeChange(INT32, number(9)));
+        changes.add(new ColumnTypeChange(INT64, number(17)));
+
+        // Integral types to DECIMAL with enough precision and zero scale.
+        changes.add(new ColumnTypeChange(INT8, decimal(3, 0)));
+        changes.add(new ColumnTypeChange(INT16, decimal(5, 0)));
+        changes.add(new ColumnTypeChange(INT32, decimal(9, 0)));
+        changes.add(new ColumnTypeChange(INT64, decimal(17, 0)));
+
+        // NUMBER to DECIMAL with enough precision and non-zero scale.
+        changes.add(new ColumnTypeChange(number(9), decimal(10, 1)));
+        changes.add(new ColumnTypeChange(number(9), decimal(11, 1)));
+
+        // NUMBER to DECIMAL with enough precision and zero scale.
+        changes.add(new ColumnTypeChange(number(10), decimal(10, 0)));
+        changes.add(new ColumnTypeChange(number(10), decimal(11, 0)));
+
+        // DECIMAL (with scale=0) TO NUMBER with enough precision.
+        changes.add(new ColumnTypeChange(decimal(10, 0), number(10)));
+
+        // Increasing precision.
+        changes.add(new ColumnTypeChange(decimal(10, 5), decimal(15, 5)));
+
+        // Increasing length.
+        changes.add(new ColumnTypeChange(string(10), string(20)));
+        changes.add(new ColumnTypeChange(byteArray(10), byteArray(20)));
+
+        // Conversions to STRING with enough length.
+        changes.add(new ColumnTypeChange(INT8, string(4)));
+        changes.add(new ColumnTypeChange(INT16, string(6)));
+        changes.add(new ColumnTypeChange(INT32, string(10)));
+        changes.add(new ColumnTypeChange(INT64, string(18)));
+        changes.add(new ColumnTypeChange(decimal(10, 0), string(11)));
+        changes.add(new ColumnTypeChange(decimal(10, 3), string(12)));
+        changes.add(new ColumnTypeChange(number(10), string(11)));
+        changes.add(new ColumnTypeChange(ColumnType.UUID, string(36)));
+
+        return changes.stream().map(Arguments::of);
+    }
+
+    private static List<IgniteBiTuple<ColumnType, ColumnType>> 
simpleTypeCompatibleChanges() {
+        List<IgniteBiTuple<ColumnType, ColumnType>> changes = new 
ArrayList<>();
+
+        List<ColumnType> intTypes = List.of(INT8, INT16, INT32, INT64);
+        List<ColumnType> floatingPointTypes = List.of(FLOAT, DOUBLE);
+
+        // INT8->INT16->INT32->INT64.
+        for (int i = 0; i < intTypes.size() - 1; i++) {
+            ColumnType narrowerType = intTypes.get(i);
+
+            for (int j = i + 1; j < intTypes.size(); j++) {
+                ColumnType widerType = intTypes.get(j);
+                changes.add(new IgniteBiTuple<>(narrowerType, widerType));
+            }
+        }
+
+        // Integral types to floating point types.
+        for (ColumnType intType : List.of(INT8, INT16)) {
+            for (ColumnType floatingPointType : floatingPointTypes) {
+                changes.add(new IgniteBiTuple<>(intType, floatingPointType));
+            }
+        }
+
+        changes.add(new IgniteBiTuple<>(FLOAT, DOUBLE));
+
+        changes.add(new IgniteBiTuple<>(DATE, DATETIME));
+        changes.add(new IgniteBiTuple<>(TIME, DATETIME));
+
+        return List.copyOf(changes);
+    }
+
+    private static Type decimal(int precision, int scale) {
+        return new Type(DECIMAL, precision, scale, DEFAULT_LENGTH);
+    }
+
+    private static Type number(int precision) {
+        return new Type(NUMBER, precision, DEFAULT_SCALE, DEFAULT_LENGTH);
+    }
+
+    private static Type string(int length) {
+        return new Type(STRING, DEFAULT_PRECISION, DEFAULT_SCALE, length);
+    }
+
+    private static Type byteArray(int length) {
+        return new Type(BYTE_ARRAY, DEFAULT_PRECISION, DEFAULT_SCALE, length);
+    }
+
+    @ParameterizedTest
+    @MethodSource("nonExactColumnTypeChanges")
+    void nonExactColumnTypeChangesDisallowCommitting(ColumnTypeChange change) {
+        assertForwardIncompatibleChangeDisallowsCommitting(change);
+    }
+
+    private static Stream<Arguments> nonExactColumnTypeChanges() {
+        List<ColumnTypeChange> changes = new ArrayList<>();
+
+        List<ColumnType> simpleTypes = Arrays.stream(ColumnType.values())
+                .filter(type -> !type.precisionAllowed() && 
!type.scaleAllowed() && !type.lengthAllowed())
+                .collect(toList());
+
+        List<IgniteBiTuple<ColumnType, ColumnType>> 
simpleTypeCompatibleChanges = simpleTypeCompatibleChanges();
+        Map<ColumnType, Set<ColumnType>> typeToTypes = 
simpleTypeCompatibleChanges.stream()
+                .collect(groupingBy(IgniteBiTuple::get1, 
Collectors.mapping(IgniteBiTuple::get2, toSet())));
+
+        for (ColumnType type1 : simpleTypes) {
+            for (ColumnType type2 : simpleTypes) {
+                if (type1 == type2) {
+                    continue;
+                }
+                Set<ColumnType> types = typeToTypes.get(type1);
+                if (types == null || !types.contains(type2)) {
+                    changes.add(new ColumnTypeChange(type1, type2));
+                }
+            }
+        }
+
+        // Integral types to NUMBER with NOT enough precision.
+        changes.add(new ColumnTypeChange(INT8, number(2)));
+        changes.add(new ColumnTypeChange(INT16, number(4)));
+        changes.add(new ColumnTypeChange(INT32, number(8)));
+        changes.add(new ColumnTypeChange(INT64, number(16)));
+        // Integral types to DECIMAL with NOT enough precision and zero scale.
+        changes.add(new ColumnTypeChange(INT8, decimal(2, 0)));
+        changes.add(new ColumnTypeChange(INT16, decimal(4, 0)));
+        changes.add(new ColumnTypeChange(INT32, decimal(8, 0)));
+        changes.add(new ColumnTypeChange(INT64, decimal(16, 0)));
+
+        // NUMBER to DECIMAL with enough precision and zero scale.
+        changes.add(new ColumnTypeChange(number(10), decimal(9, 0)));
+
+        // Decreasing precision.
+        changes.add(new ColumnTypeChange(decimal(10, 5), decimal(9, 5)));
+
+        changes.add(new ColumnTypeChange(string(10), string(9)));
+        changes.add(new ColumnTypeChange(byteArray(10), byteArray(9)));
+
+        // Conversions to STRING with NOT enough length.
+        changes.add(new ColumnTypeChange(INT8, string(3)));
+        changes.add(new ColumnTypeChange(INT16, string(5)));
+        changes.add(new ColumnTypeChange(INT32, string(9)));
+        changes.add(new ColumnTypeChange(INT64, string(17)));
+        changes.add(new ColumnTypeChange(decimal(10, 0), string(10)));
+        changes.add(new ColumnTypeChange(decimal(10, 3), string(11)));
+        changes.add(new ColumnTypeChange(number(10), string(10)));
+        changes.add(new ColumnTypeChange(ColumnType.UUID, string(35)));
+        changes.add(new ColumnTypeChange(string(10), string(9)));
+
+        // Conversions from STRING.
+        changes.add(new ColumnTypeChange(string(3), INT8));
+        changes.add(new ColumnTypeChange(string(5), INT16));
+        changes.add(new ColumnTypeChange(string(9), INT32));
+        changes.add(new ColumnTypeChange(string(17), INT64));
+        changes.add(new ColumnTypeChange(string(10), decimal(10, 0)));
+        changes.add(new ColumnTypeChange(string(11), decimal(10, 3)));
+        changes.add(new ColumnTypeChange(string(10), number(10)));
+        changes.add(new ColumnTypeChange(string(36), ColumnType.UUID));
+
+        for (ColumnType columnType : ColumnType.values()) {
+            addInconvertible(columnType, BOOLEAN, changes);
+            addInconvertible(columnType, ColumnType.UUID, changes);
+            addInconvertible(columnType, BITMASK, changes);
+            addInconvertible(columnType, BYTE_ARRAY, changes);
+            addInconvertible(columnType, PERIOD, changes);
+            addInconvertible(columnType, DURATION, changes);
+        }
+
+        return changes.stream().map(Arguments::of);
+    }
+
+    private static void addInconvertible(ColumnType type1, ColumnType type2, 
List<ColumnTypeChange> changes) {
+        if (type1 != type2) {
+            if (type1 != STRING || type2.convertsToStringLosslessly()) {
+                changes.add(new ColumnTypeChange(type2, type1));
+            }
+            if (type2 != STRING || type1.convertsToStringLosslessly()) {
+                changes.add(new ColumnTypeChange(type1, type2));
+            }
+        }
+    }
+
+    @Test
+    void combinationOfForwardCompatibleChangesIsCompatible() {
+        assertForwardCompatibleChangeAllowsCommitting(() -> List.of(
+                tableSchema(1, List.of(column(INT32, false))),
+                // Type is widened, NOT NULL dropped.
+                tableSchema(2, List.of(column(INT64, true)))
+        ));
+    }
+
+    private static CatalogTableColumnDescriptor column(ColumnType type, 
boolean nullable) {
+        return new CatalogTableColumnDescriptor(
+                "col",
+                type,
+                nullable,
+                DEFAULT_PRECISION,
+                DEFAULT_SCALE,
+                DEFAULT_LENGTH,
+                null
+        );
+    }
+
+    @Test
+    void oneForwardIncompatibleChangeMakesCombinationIncompatible() {
+        assertForwardIncompatibleChangeDisallowsCommitting(() -> List.of(
+                tableSchema(1, List.of(column(INT32, true))),
+                // Type is widened (compatible), but NOT NULL added 
(incompatible).
+                tableSchema(2, List.of(column(INT64, false)))
+        ));
+    }
+
+    @Test
+    void forwardCompatibleChangeIsNotBackwardCompatible() {
+        
assertChangeIsNotBackwardCompatible(ForwardCompatibleChange.ADD_NULLABLE_COLUMN);
+    }
+
+    @Test
+    void forwardIncompatibleChangeIsNotBackwardCompatible() {
+        
assertChangeIsNotBackwardCompatible(ForwardIncompatibleChange.DROP_COLUMN);
+    }
+
+    @Test
+    void changeOppositeToForwardCompatibleChangeIsNotBackwardCompatible() {
+        assertChangeIsNotBackwardCompatible(() -> List.of(
+                tableSchema(1, List.of(
+                        intColumn("col1"),
+                        intColumnWithDefault("col2", 42)
+                )),
+                tableSchema(2, List.of(
+                        intColumn("col1")
+                ))
+        ));
+    }
+
+    private void assertChangeIsNotBackwardCompatible(SchemaChangeSource 
changeSource) {
+        when(schemasSource.tableSchemaVersionsBetween(TABLE_ID, 
beginTimestamp, 2))
+                .thenReturn(changeSource.schemaVersions());
+        when(schemasSource.waitForSchemaAvailability(anyInt(), 
anyInt())).thenReturn(completedFuture(null));
+
+        CompletableFuture<CompatValidationResult> resultFuture = 
validator.validateBackwards(2, TABLE_ID, txId);
+
+        assertThat(resultFuture, willCompleteSuccessfully());
+
+        CompatValidationResult result = resultFuture.getNow(null);
+        assertThat(result, is(notNullValue()));
+
+        assertThat("Change is compatible", result.isSuccessful(), is(false));
+        assertThat(result.isTableDropped(), is(false));
+
+        assertThat(result.failedTableId(), is(TABLE_ID));
+        assertThat(result.fromSchemaVersion(), is(1));
+        assertThat(result.toSchemaVersion(), is(2));
+    }
+
+    private static CatalogTableColumnDescriptor intColumn(String columnName) {
+        return new CatalogTableColumnDescriptor(
+                columnName,
+                INT32,
+                false,
+                DEFAULT_PRECISION,
+                DEFAULT_SCALE,
+                DEFAULT_LENGTH,
+                null
+        );
+    }
+
+    private static CatalogTableColumnDescriptor nullableIntColumn(String 
columnName) {
+        return new CatalogTableColumnDescriptor(columnName, INT32, true, 
DEFAULT_PRECISION, DEFAULT_SCALE, DEFAULT_LENGTH, null);
+    }
+
+    private static CatalogTableColumnDescriptor intColumnWithDefault(String 
columnName, int defaultValue) {
+        return new CatalogTableColumnDescriptor(
+                columnName,
+                INT32,
+                false,
+                DEFAULT_PRECISION,
+                DEFAULT_SCALE,
+                DEFAULT_LENGTH,
+                DefaultValue.constant(defaultValue)
+        );
+    }
+
+    private static FullTableSchema tableSchema(int schemaVersion, 
List<CatalogTableColumnDescriptor> columns) {
+        return tableSchema(schemaVersion, TABLE_NAME, columns);
+    }
+
+    private static FullTableSchema tableSchema(int schemaVersion, String name, 
List<CatalogTableColumnDescriptor> columns) {
+        return new FullTableSchema(schemaVersion, TABLE_ID, name, columns);
+    }
+
+    @FunctionalInterface
+    private interface SchemaChangeSource {
+        List<FullTableSchema> schemaVersions();
+    }
+
+    private enum ForwardCompatibleChange implements SchemaChangeSource {
+        ADD_NULLABLE_COLUMN(() -> List.of(
+                tableSchema(1, List.of(
+                        intColumn("col1")
+                )),
+                tableSchema(2, List.of(
+                        intColumn("col1"),
+                        nullableIntColumn("col2")
+                ))
+        )),
+        ADD_COLUMN_WITH_DEFAULT(() -> List.of(
+                tableSchema(1, List.of(
+                        intColumn("col1")
+                )),
+                tableSchema(2, List.of(
+                        intColumn("col1"),
+                        intColumnWithDefault("col2", 42)
+                ))
+        )),
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-20948 - 
uncomment this.
+        //RENAME_COLUMN(() -> List.of(
+        //        tableSchema(1, List.of(
+        //                intColumn("col1")
+        //        )),
+        //        tableSchema(2, List.of(
+        //                intColumn("col2")
+        //        ))
+        //)),
+        DROP_NOT_NULL(() -> List.of(
+                tableSchema(1, List.of(
+                        intColumn("col1")
+                )),
+                tableSchema(2, List.of(
+                        nullableIntColumn("col1")
+                ))
+        ));
+
+        private final Supplier<List<FullTableSchema>> schemaVersionsSupplier;
+
+        ForwardCompatibleChange(Supplier<List<FullTableSchema>> 
schemaVersionsSupplier) {
+            this.schemaVersionsSupplier = schemaVersionsSupplier;
+        }
+
+        @Override
+        public List<FullTableSchema> schemaVersions() {
+            return schemaVersionsSupplier.get();
+        }
+    }
+
+    private enum ForwardIncompatibleChange implements SchemaChangeSource {
+        RENAME_TABLE(() -> List.of(

Review Comment:
   What is the reason of passing a supplier instead of passing the list itself? 
A memory constraint, or something different?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TypeConvertibility.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static org.apache.ignite.sql.ColumnType.DATE;
+import static org.apache.ignite.sql.ColumnType.DATETIME;
+import static org.apache.ignite.sql.ColumnType.DECIMAL;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.FLOAT;
+import static org.apache.ignite.sql.ColumnType.INT16;
+import static org.apache.ignite.sql.ColumnType.INT8;
+import static org.apache.ignite.sql.ColumnType.NUMBER;
+import static org.apache.ignite.sql.ColumnType.STRING;
+import static org.apache.ignite.sql.ColumnType.TIME;
+
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * Utility to determine type convertibility.
+ */
+public class TypeConvertibility {
+    /**
+     * Returns whether type change from old to new column version is lossless 
(that is, no information
+     * is lost during conversion).
+     *
+     * @param oldColumn Old column descriptor.
+     * @param newColumn New column descriptor.
+     */
+    // TODO: IGNITE-20956 - Unify this with Catalog rules.
+    public static boolean typeChangeIsLossless(CatalogTableColumnDescriptor 
oldColumn, CatalogTableColumnDescriptor newColumn) {
+        ColumnType oldType = oldColumn.type();
+        ColumnType newType = newColumn.type();
+
+        // Int to Int.
+        if (oldType.integral() && newType.integral()) {
+            return oldType.ordinal() <= newType.ordinal();
+        }
+
+        // Int to floating point.
+        if ((oldType == INT8 || oldType == INT16) && newType.floatingPoint()) {
+            return true;
+        }
+
+        if (oldType == FLOAT && newType == DOUBLE) {
+            return true;
+        }
+
+        // Time/date to Datetime.
+        if ((oldType == TIME || oldType == DATE) && newType == DATETIME) {
+            return true;
+        }
+
+        // Int <-> NUMBER/DECIMAL.
+        if (oldType.integral() && (newType == NUMBER || newType == DECIMAL) && 
integralMaxDigits(oldType) <= maxExactIntPlaces(newColumn)) {

Review Comment:
   As well as this



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatibilityValidatorTest.java:
##########
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_LENGTH;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PRECISION;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_SCALE;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.sql.ColumnType.BITMASK;
+import static org.apache.ignite.sql.ColumnType.BOOLEAN;
+import static org.apache.ignite.sql.ColumnType.BYTE_ARRAY;
+import static org.apache.ignite.sql.ColumnType.DATE;
+import static org.apache.ignite.sql.ColumnType.DATETIME;
+import static org.apache.ignite.sql.ColumnType.DECIMAL;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.DURATION;
+import static org.apache.ignite.sql.ColumnType.FLOAT;
+import static org.apache.ignite.sql.ColumnType.INT16;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.apache.ignite.sql.ColumnType.INT64;
+import static org.apache.ignite.sql.ColumnType.INT8;
+import static org.apache.ignite.sql.ColumnType.NUMBER;
+import static org.apache.ignite.sql.ColumnType.PERIOD;
+import static org.apache.ignite.sql.ColumnType.STRING;
+import static org.apache.ignite.sql.ColumnType.TIME;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
+import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SchemaCompatibilityValidatorTest extends BaseIgniteAbstractTest {
+    @Mock
+    private ValidationSchemasSource schemasSource;
+
+    @Mock
+    private CatalogService catalogService;
+
+    @Mock
+    private SchemaSyncService schemaSyncService;
+
+    @InjectMocks
+    private SchemaCompatibilityValidator validator;
+
+    private final HybridTimestamp beginTimestamp = new HybridTimestamp(1, 1);
+
+    private final HybridTimestamp commitTimestamp = new HybridTimestamp(2, 2);
+
+    private final UUID txId = TransactionIds.transactionId(beginTimestamp, 0);
+
+    private static final int TABLE_ID = 1;
+    private static final String TABLE_NAME = "test";
+    private static final String ANOTHER_NAME = "another";
+
+    private final TablePartitionId tablePartitionId = new 
TablePartitionId(TABLE_ID, 0);
+
+    @BeforeEach
+    void configureMocks() {
+        
when(schemaSyncService.waitForMetadataCompleteness(any())).thenReturn(completedFuture(null));

Review Comment:
   I think we already have `AlwaysSyncedSchemaSyncService`. What is the reason 
for using a mock?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to