sashapolo commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1190873028


##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.asserts;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Assertions related to {@link CompletableFuture}.
+ */
+public class CompletableFutureAssert {

Review Comment:
   This class is redundant, you should use `CompletableFutureExceptionMatcher` 
instead



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/AbortDueToIncompatibleSchemaException.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.tx.TransactionException;
+
+/**
+ * Thrown when, during an attempt to commit a transaction, it turns out that 
the transaction cannot be committed
+ * because an incompatible schema change has happened.
+ */
+public class AbortDueToIncompatibleSchemaException extends 
TransactionException {

Review Comment:
   I would propose to rename this exception to 
`IncompatibleSchemaAbortException`, it's a mouthful otherwise



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and 
all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = 
prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = 
this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), 
prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), 
thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = 
thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))
+                .collect(toList());
+        List<TableColumnDescriptor> removedColumns = 
prevColumnsByName.values().stream()
+                .filter(col -> removedColumnNames.contains(col.name()))
+                .collect(toList());
+
+        Set<String> intersectionColumnNames = 
intersect(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        List<ColumnDefinitionDiff> changedColumns = new ArrayList<>();
+        for (String commonColumnName : intersectionColumnNames) {
+            TableColumnDescriptor prevColumn = 
prevColumnsByName.get(commonColumnName);
+            TableColumnDescriptor thisColumn = 
thisColumnsByName.get(commonColumnName);
+
+            if (columnChanged(prevColumn, thisColumn)) {
+                changedColumns.add(new ColumnDefinitionDiff(prevColumn, 
thisColumn));
+            }
+        }
+
+        Map<String, IndexDescriptor> prevIndexesByName = 
prevSchema.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+        Map<String, IndexDescriptor> thisIndexesByName = this.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+
+        Set<String> addedIndexNames = subtract(thisIndexesByName.keySet(), 
prevIndexesByName.keySet());
+        Set<String> removedIndexNames = subtract(prevIndexesByName.keySet(), 
thisIndexesByName.keySet());
+
+        List<IndexDescriptor> addedIndexes = 
thisIndexesByName.values().stream()
+                .filter(col -> addedIndexNames.contains(col.name()))
+                .collect(toList());
+        List<IndexDescriptor> removedIndexes = 
prevIndexesByName.values().stream()
+                .filter(col -> removedIndexNames.contains(col.name()))
+                .collect(toList());
+
+        return new TableDefinitionDiff(addedColumns, removedColumns, 
changedColumns, addedIndexes, removedIndexes);
+    }
+
+    private static Set<String> subtract(Set<String> minuend, Set<String> 
subtrahend) {

Review Comment:
   There's an exactly the same method in `RebalanceUtil`, I would suggest to 
move it to a more common place, like `CollectionUtils`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/ForwardValidationResult.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import java.util.Objects;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of forward schema compatibility validation.
+ */
+public class ForwardValidationResult {
+    private static final ForwardValidationResult SUCCESS = new 
ForwardValidationResult(true, null, -1, -1);
+
+    private final boolean ok;
+    @Nullable
+    private final UUID failedTableId;
+    private final int fromSchemaVersion;
+    private final int toSchemaVersion;
+
+    /**
+     * Returns a successful validation result.
+     *
+     * @return A successful validation result.
+     */
+    public static ForwardValidationResult success() {
+        return SUCCESS;
+    }
+
+    /**
+     * Creates a validation result denoting validation failure.
+     *
+     * @param failedTableId Table which schema change is incompatible.
+     * @param fromSchemaVersion Version number of the schema from which an 
incompatible transition tried to be made.
+     * @param toSchemaVersion Version number of the schema to which an 
incompatible transition tried to be made.
+     * @return A validation result for a failure.
+     */
+    public static ForwardValidationResult failure(UUID failedTableId, int 
fromSchemaVersion, int toSchemaVersion) {
+        return new ForwardValidationResult(false, failedTableId, 
fromSchemaVersion, toSchemaVersion);
+    }
+
+    private ForwardValidationResult(boolean ok, @Nullable UUID failedTableId, 
int fromSchemaVersion, int toSchemaVersion) {
+        this.ok = ok;
+        this.failedTableId = failedTableId;
+        this.fromSchemaVersion = fromSchemaVersion;
+        this.toSchemaVersion = toSchemaVersion;
+    }
+
+    /**
+     * Returns whether the validation was successful.
+     *
+     * @return Whether the validation was successful
+     */
+    public boolean isOk() {
+        return ok;
+    }
+
+    /**
+     * Returns ID of the table for which the validation has failed. Should 
only be called for a failed validation result,
+     * otherwise an exception is thrown.
+     *
+     * @return Table ID.
+     */
+    public UUID failedTableId() {
+        return Objects.requireNonNull(failedTableId);
+    }
+
+    /**
+     * Returns version number of the schema from which an incompatible 
transition tried to be made.
+     *
+     * @return Version number of the schema from which an incompatible 
transition tried to be made.
+     */
+    public int fromSchemaVersion() {
+        return fromSchemaVersion;

Review Comment:
   Should we also throw an exception here if this is a successful result?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/ForwardValidationResult.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import java.util.Objects;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of forward schema compatibility validation.
+ */
+public class ForwardValidationResult {
+    private static final ForwardValidationResult SUCCESS = new 
ForwardValidationResult(true, null, -1, -1);
+
+    private final boolean ok;
+    @Nullable
+    private final UUID failedTableId;
+    private final int fromSchemaVersion;
+    private final int toSchemaVersion;
+
+    /**
+     * Returns a successful validation result.
+     *
+     * @return A successful validation result.
+     */
+    public static ForwardValidationResult success() {
+        return SUCCESS;
+    }
+
+    /**
+     * Creates a validation result denoting validation failure.
+     *
+     * @param failedTableId Table which schema change is incompatible.
+     * @param fromSchemaVersion Version number of the schema from which an 
incompatible transition tried to be made.
+     * @param toSchemaVersion Version number of the schema to which an 
incompatible transition tried to be made.
+     * @return A validation result for a failure.
+     */
+    public static ForwardValidationResult failure(UUID failedTableId, int 
fromSchemaVersion, int toSchemaVersion) {
+        return new ForwardValidationResult(false, failedTableId, 
fromSchemaVersion, toSchemaVersion);
+    }
+
+    private ForwardValidationResult(boolean ok, @Nullable UUID failedTableId, 
int fromSchemaVersion, int toSchemaVersion) {
+        this.ok = ok;
+        this.failedTableId = failedTableId;
+        this.fromSchemaVersion = fromSchemaVersion;
+        this.toSchemaVersion = toSchemaVersion;
+    }
+
+    /**
+     * Returns whether the validation was successful.
+     *
+     * @return Whether the validation was successful
+     */
+    public boolean isOk() {

Review Comment:
   I would propose to rename this method to `isSuccessful` or `isSuccess`, just 
to be consistent



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> 
processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = 
request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, 
aggregatedGroupIds, request.commit(), request.commitTimestamp())
+                .thenCompose(validationResult -> {
+                    boolean stillCommit = request.commit() && 
validationResult.isOk();
 
-        CompletableFuture<Object> changeStateFuture = 
finishTransaction(aggregatedGroupIds, txId, commit);
+                    return finishAndCleanup(request, stillCommit, 
aggregatedGroupIds, txId)
+                            .thenCompose(result -> {
+                                if (validationResult.isOk()) {
+                                    return completedFuture(result);
+                                } else {
+                                    return failedFuture(new 
AbortDueToIncompatibleSchemaException("Commit failed because schema "

Review Comment:
   I don't understand this logic. Should throwing this exception affect the 
`finishTransaction` method so that the state of the transaction changes to 
`ABORTED`?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Validates schema compatibility.
+ */
+class SchemaCompatValidator {
+    private final Schemas schemas;
+
+    SchemaCompatValidator(Schemas schemas) {
+        this.schemas = schemas;
+    }
+
+    /**
+     * Performs forward compatibility validation (if needed). That is, for 
each table enlisted in the transaction,
+     * checks to see whether the initial schema (identified by the begin 
timestamp) is forward-compatible with the
+     * commit schema (identified by the commit timestamp).
+     *
+     * <p>If doing an abort (and not commit), the validation always succeeds.
+     *
+     * @param txId ID of the transaction that gets validated.
+     * @param enlistedGroupIds IDs of the partitions that are enlisted with 
the transaction.
+     * @param commit Whether it's a commit attempt (otherwise it's an abort).
+     * @param commitTimestamp Commit timestamp (or {@code null} if it's an 
abort).
+     * @return Future completed with validation result.
+     */
+    CompletableFuture<ForwardValidationResult> validateForwards(
+            UUID txId,
+            List<TablePartitionId> enlistedGroupIds,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        if (!commit) {
+            return completedFuture(ForwardValidationResult.success());
+        }
+
+        HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
+
+        List<UUID> tableIds = enlistedGroupIds.stream()

Review Comment:
   I think this intermediate variable is redundant, you can use the stream 
directly inside `thenApply` below



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known 
schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, 
HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the 
pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) 
{
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column 
column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())
+        );
+    }
+
+    private static ColumnType columnTypeFromNativeType(NativeType type) {
+        return getColumnType(type.spec());
+    }
+
+    /**
+     * Detects {@link ColumnType} by {@link NativeTypeSpec}.
+     *
+     * @param spec Native type spec.
+     * @return Detected {@link ColumnType}.
+     */
+    public static ColumnType getColumnType(NativeTypeSpec spec) {

Review Comment:
   There exists an exact copy of this method in `ClientTableCommon`, I would 
suggest to extract it into a common place



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known 
schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, 
HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the 
pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) 
{
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column 
column) {

Review Comment:
   `colName` parameter is redundant, `column` already contains a name



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ColumnDefinitionDiff.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Captures a difference between 'old' and 'new' versions of the same column 
definition.

Review Comment:
   Should the fields be called `old` and `new` instead of `prev` and `next`?



##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemas.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.NonHistoricSchemas;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * Dummy {@link Schemas} implementation that is not historic and always uses 
same {@link SchemaRegistry}.
+ */
+public class DummySchemas implements Schemas {
+    private final SchemaRegistry schemaRegistry;
+
+    public DummySchemas(SchemaRegistry schemaRegistry) {
+        this.schemaRegistry = schemaRegistry;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) 
{
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()

Review Comment:
   Please extract this into a variable



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java:
##########
@@ -290,6 +314,8 @@ public void beforeTest(@InjectConfiguration 
DataStorageConfiguration dsCfg) {
 
         
lenient().when(safeTimeClock.waitFor(any())).thenReturn(completedFuture(null));
 
+        
lenient().when(schemas.waitForSchemasAvailability(any())).thenReturn(completedFuture(null));

Review Comment:
   I know that it's not related to this PR, but all these `lenient()` calls can 
be removed using the `MockitoSettings` annotation



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and 
all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = 
prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = 
this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), 
prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), 
thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = 
thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))
+                .collect(toList());
+        List<TableColumnDescriptor> removedColumns = 
prevColumnsByName.values().stream()
+                .filter(col -> removedColumnNames.contains(col.name()))
+                .collect(toList());
+
+        Set<String> intersectionColumnNames = 
intersect(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        List<ColumnDefinitionDiff> changedColumns = new ArrayList<>();
+        for (String commonColumnName : intersectionColumnNames) {
+            TableColumnDescriptor prevColumn = 
prevColumnsByName.get(commonColumnName);
+            TableColumnDescriptor thisColumn = 
thisColumnsByName.get(commonColumnName);
+
+            if (columnChanged(prevColumn, thisColumn)) {
+                changedColumns.add(new ColumnDefinitionDiff(prevColumn, 
thisColumn));
+            }
+        }
+
+        Map<String, IndexDescriptor> prevIndexesByName = 
prevSchema.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+        Map<String, IndexDescriptor> thisIndexesByName = this.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+
+        Set<String> addedIndexNames = subtract(thisIndexesByName.keySet(), 
prevIndexesByName.keySet());
+        Set<String> removedIndexNames = subtract(prevIndexesByName.keySet(), 
thisIndexesByName.keySet());
+
+        List<IndexDescriptor> addedIndexes = 
thisIndexesByName.values().stream()
+                .filter(col -> addedIndexNames.contains(col.name()))
+                .collect(toList());
+        List<IndexDescriptor> removedIndexes = 
prevIndexesByName.values().stream()
+                .filter(col -> removedIndexNames.contains(col.name()))
+                .collect(toList());
+
+        return new TableDefinitionDiff(addedColumns, removedColumns, 
changedColumns, addedIndexes, removedIndexes);
+    }
+
+    private static Set<String> subtract(Set<String> minuend, Set<String> 
subtrahend) {
+        Set<String> result = new HashSet<>(minuend);
+        result.removeAll(subtrahend);
+        return result;
+    }
+
+    private static Set<String> intersect(Set<String> set1, Set<String> set2) {

Review Comment:
   Same here



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and 
all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.

Review Comment:
   ```suggestion
        * Computes a diff between this and a previous schema.
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and 
all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = 
prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = 
this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), 
prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), 
thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = 
thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))
+                .collect(toList());
+        List<TableColumnDescriptor> removedColumns = 
prevColumnsByName.values().stream()
+                .filter(col -> removedColumnNames.contains(col.name()))
+                .collect(toList());
+
+        Set<String> intersectionColumnNames = 
intersect(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        List<ColumnDefinitionDiff> changedColumns = new ArrayList<>();
+        for (String commonColumnName : intersectionColumnNames) {
+            TableColumnDescriptor prevColumn = 
prevColumnsByName.get(commonColumnName);
+            TableColumnDescriptor thisColumn = 
thisColumnsByName.get(commonColumnName);
+
+            if (columnChanged(prevColumn, thisColumn)) {
+                changedColumns.add(new ColumnDefinitionDiff(prevColumn, 
thisColumn));
+            }
+        }
+
+        Map<String, IndexDescriptor> prevIndexesByName = 
prevSchema.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+        Map<String, IndexDescriptor> thisIndexesByName = this.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+
+        Set<String> addedIndexNames = subtract(thisIndexesByName.keySet(), 
prevIndexesByName.keySet());
+        Set<String> removedIndexNames = subtract(prevIndexesByName.keySet(), 
thisIndexesByName.keySet());
+
+        List<IndexDescriptor> addedIndexes = 
thisIndexesByName.values().stream()
+                .filter(col -> addedIndexNames.contains(col.name()))

Review Comment:
   Same here



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> 
processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = 
request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, 
aggregatedGroupIds, request.commit(), request.commitTimestamp())

Review Comment:
   API of the `schemaCompatValidator` is a little bit counter-intuitive to me: 
we should only validate a request if it's a commit request. I understand that 
we do this check *inside* `validateForwards` but I still think it would be more 
clear to do that outside this mehod



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and 
all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = 
prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));

Review Comment:
   Maybe it would be a little bit cleaner to extract this code into a method 
like `groupByName`, because it's used 4 times in this method.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and 
all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = 
prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = 
this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), 
prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), 
thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = 
thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))

Review Comment:
   This is strange, `thisColumnsByName` is already a map from column name to 
the column you are looking for. Can we do 
`addedColumnNames.stream().map(thisColumnsByName::get)` instead?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and 
all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = 
prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = 
this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), 
prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), 
thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = 
thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))
+                .collect(toList());
+        List<TableColumnDescriptor> removedColumns = 
prevColumnsByName.values().stream()
+                .filter(col -> removedColumnNames.contains(col.name()))

Review Comment:
   same here



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> 
processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = 
request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, 
aggregatedGroupIds, request.commit(), request.commitTimestamp())
+                .thenCompose(validationResult -> {
+                    boolean stillCommit = request.commit() && 
validationResult.isOk();
 
-        CompletableFuture<Object> changeStateFuture = 
finishTransaction(aggregatedGroupIds, txId, commit);
+                    return finishAndCleanup(request, stillCommit, 
aggregatedGroupIds, txId)
+                            .thenCompose(result -> {

Review Comment:
   You should use `thenAccept` here instead of `thenCompose`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/ForwardValidationResult.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import java.util.Objects;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of forward schema compatibility validation.
+ */
+public class ForwardValidationResult {
+    private static final ForwardValidationResult SUCCESS = new 
ForwardValidationResult(true, null, -1, -1);
+
+    private final boolean ok;
+    @Nullable
+    private final UUID failedTableId;
+    private final int fromSchemaVersion;
+    private final int toSchemaVersion;
+
+    /**
+     * Returns a successful validation result.
+     *
+     * @return A successful validation result.
+     */
+    public static ForwardValidationResult success() {
+        return SUCCESS;
+    }
+
+    /**
+     * Creates a validation result denoting validation failure.
+     *
+     * @param failedTableId Table which schema change is incompatible.
+     * @param fromSchemaVersion Version number of the schema from which an 
incompatible transition tried to be made.
+     * @param toSchemaVersion Version number of the schema to which an 
incompatible transition tried to be made.
+     * @return A validation result for a failure.
+     */
+    public static ForwardValidationResult failure(UUID failedTableId, int 
fromSchemaVersion, int toSchemaVersion) {
+        return new ForwardValidationResult(false, failedTableId, 
fromSchemaVersion, toSchemaVersion);
+    }
+
+    private ForwardValidationResult(boolean ok, @Nullable UUID failedTableId, 
int fromSchemaVersion, int toSchemaVersion) {
+        this.ok = ok;
+        this.failedTableId = failedTableId;
+        this.fromSchemaVersion = fromSchemaVersion;
+        this.toSchemaVersion = toSchemaVersion;
+    }
+
+    /**
+     * Returns whether the validation was successful.
+     *
+     * @return Whether the validation was successful
+     */
+    public boolean isOk() {
+        return ok;
+    }
+
+    /**
+     * Returns ID of the table for which the validation has failed. Should 
only be called for a failed validation result,
+     * otherwise an exception is thrown.
+     *
+     * @return Table ID.
+     */
+    public UUID failedTableId() {
+        return Objects.requireNonNull(failedTableId);
+    }
+
+    /**
+     * Returns version number of the schema from which an incompatible 
transition tried to be made.
+     *
+     * @return Version number of the schema from which an incompatible 
transition tried to be made.
+     */
+    public int fromSchemaVersion() {
+        return fromSchemaVersion;
+    }
+
+    /**
+     * Returns version number of the schema to which an incompatible 
transition tried to be made.
+     *
+     * @return Version number of the schema to which an incompatible 
transition tried to be made.
+     */
+    public int toSchemaVersion() {
+        return toSchemaVersion;

Review Comment:
   And here?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known 
schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, 
HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the 
pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) 
{
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()

Review Comment:
   Please extract this into a variable



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known 
schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, 
HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the 
pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) 
{
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column 
column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())
+        );
+    }
+
+    private static ColumnType columnTypeFromNativeType(NativeType type) {
+        return getColumnType(type.spec());
+    }
+
+    /**
+     * Detects {@link ColumnType} by {@link NativeTypeSpec}.
+     *
+     * @param spec Native type spec.
+     * @return Detected {@link ColumnType}.
+     */
+    public static ColumnType getColumnType(NativeTypeSpec spec) {
+        switch (spec) {
+            case INT8:
+                return ColumnType.INT8;
+
+            case INT16:
+                return ColumnType.INT16;
+
+            case INT32:
+                return ColumnType.INT32;
+
+            case INT64:
+                return ColumnType.INT64;
+
+            case FLOAT:
+                return ColumnType.FLOAT;
+
+            case DOUBLE:
+                return ColumnType.DOUBLE;
+
+            case DECIMAL:
+                return ColumnType.DECIMAL;
+
+            case NUMBER:
+                return ColumnType.NUMBER;
+
+            case UUID:
+                return ColumnType.UUID;
+
+            case STRING:
+                return ColumnType.STRING;
+
+            case BYTES:
+                return ColumnType.BYTE_ARRAY;
+
+            case BITMASK:
+                return ColumnType.BITMASK;
+
+            case DATE:
+                return ColumnType.DATE;
+
+            case TIME:
+                return ColumnType.TIME;
+
+            case DATETIME:
+                return ColumnType.DATETIME;
+
+            case TIMESTAMP:
+                return ColumnType.TIMESTAMP;
+
+            default:
+                throw new IgniteException(PROTOCOL_ERR, "Unsupported native 
type: " + spec);

Review Comment:
   Wait a second, you indeed copy-pasted it from `ClientTableCommon`, shame on 
you!



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known 
schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, 
HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the 
pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) 
{
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column 
column) {

Review Comment:
   Should be called `columnDescriptor`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known 
schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, 
HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the 
pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) 
{
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column 
column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())
+        );
+    }
+
+    private static ColumnType columnTypeFromNativeType(NativeType type) {

Review Comment:
   This method seems redundant to me



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known 
schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, 
HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the 
pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) 
{
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column 
column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())

Review Comment:
   Are sure this is correct? There are multiple types of default values



##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemas.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.NonHistoricSchemas;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * Dummy {@link Schemas} implementation that is not historic and always uses 
same {@link SchemaRegistry}.
+ */
+public class DummySchemas implements Schemas {
+    private final SchemaRegistry schemaRegistry;
+
+    public DummySchemas(SchemaRegistry schemaRegistry) {
+        this.schemaRegistry = schemaRegistry;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) 
{
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column 
column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())
+        );
+    }
+
+    private static ColumnType columnTypeFromNativeType(NativeType type) {
+        return NonHistoricSchemas.getColumnType(type.spec());

Review Comment:
   Why do you reuse this method from `NonHistoricSchemas` and not the method 
that creates a `TableColumnDescriptor` from a `Column`?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and 
all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = 
prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = 
this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), 
prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), 
thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = 
thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))

Review Comment:
   Also, this way you don't have to collect `addedColumnNames` into a separate 
set and avoid some copying, you can filter the `thisColumnsByName.keySet()` 
in-place



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to