tkalkirill commented on code in PR #2787:
URL: https://github.com/apache/ignite-3/pull/2787#discussion_r1383052518


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -328,7 +328,7 @@ public PartitionReplicaListener(
 
         cursors = new 
ConcurrentSkipListMap<>(IgniteUuid.globalOrderComparator());
 
-        schemaCompatValidator = new SchemaCompatValidator(schemas, 
catalogService);
+        schemaCompatValidator = new 
SchemaCompatValidator(validationSchemasSource, catalogService, 
schemaSyncService);

Review Comment:
   Maybe rename to `SchemaCompatibilityValidator` ?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java:
##########
@@ -240,4 +260,34 @@ void failIfRequestSchemaDiffersFromTxTs(HybridTimestamp 
txTs, int requestSchemaV
             throw new InternalSchemaVersionMismatchException();
         }
     }
+
+    private static class DiffKey {

Review Comment:
   I suggest moving it to a separate package private class.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java:
##########
@@ -240,4 +260,34 @@ void failIfRequestSchemaDiffersFromTxTs(HybridTimestamp 
txTs, int requestSchemaV
             throw new InternalSchemaVersionMismatchException();
         }
     }
+
+    private static class DiffKey {
+        private final int tableId;
+        private final int fromSchemaVersion;
+        private final int toSchemaVersion;
+
+        private DiffKey(int tableId, int fromSchemaVersion, int 
toSchemaVersion) {
+            this.tableId = tableId;
+            this.fromSchemaVersion = fromSchemaVersion;
+            this.toSchemaVersion = toSchemaVersion;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            DiffKey diffKey = (DiffKey) o;
+            return tableId == diffKey.tableId && fromSchemaVersion == 
diffKey.fromSchemaVersion
+                    && toSchemaVersion == diffKey.toSchemaVersion;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tableId, fromSchemaVersion, toSchemaVersion);

Review Comment:
   Maybe we shouldn’t use varags, but honestly calculate the hash code the old 
fashioned way?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSource.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaManager;
+
+/**
+ * An implementation over {@link CatalogService}.
+ */

Review Comment:
   ```suggestion
   /** An implementation over {@link CatalogService}. */
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSource.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaManager;
+
+/**
+ * An implementation over {@link CatalogService}.
+ */
+public class CatalogValidationSchemasSource implements ValidationSchemasSource 
{
+    private final CatalogService catalogService;
+
+    private final SchemaManager schemaManager;
+
+    private final ConcurrentMap<CatalogVersionsSpan, List<FullTableSchema>> 
catalogVersionSpansCache = new ConcurrentHashMap<>();
+
+    // TODO: Remove entries from cache when compacting Catalog 
https://issues.apache.org/jira/browse/IGNITE-20790
+    // TODO: Remove entries from cache when compacting schemas in 
SchemaManager https://issues.apache.org/jira/browse/IGNITE-20789
+    private final ConcurrentMap<CatalogVersionToTableVersionSpan, 
List<FullTableSchema>> catalogVersionToTableVersionSpansCache
+            = new ConcurrentHashMap<>();
+
+    /** Constructor. */
+    public CatalogValidationSchemasSource(CatalogService catalogService, 
SchemaManager schemaManager) {
+        this.catalogService = catalogService;
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<Void> waitForSchemaAvailability(int tableId, int 
schemaVersion) {
+        return schemaManager.schemaRegistry(tableId)
+                .schemaAsync(schemaVersion)
+                .thenApply(unused -> null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        int fromCatalogVersion = 
catalogService.activeCatalogVersion(fromIncluding.longValue());
+        int toCatalogVersion = 
catalogService.activeCatalogVersion(toIncluding.longValue());
+
+        return catalogVersionSpansCache.computeIfAbsent(
+                new CatalogVersionsSpan(tableId, fromCatalogVersion, 
toCatalogVersion),
+                key -> tableSchemaVersionsBetweenCatalogVersions(tableId, 
fromCatalogVersion, toCatalogVersion)
+        );
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, int toIncluding) {
+        int fromCatalogVersion = 
catalogService.activeCatalogVersion(fromIncluding.longValue());
+
+        return catalogVersionToTableVersionSpansCache.computeIfAbsent(
+                new CatalogVersionToTableVersionSpan(tableId, 
fromCatalogVersion, toIncluding),
+                key -> 
tableSchemaVersionsBetweenCatalogAndTableVersions(tableId, fromCatalogVersion, 
toIncluding)
+        );
+    }
+
+    private List<FullTableSchema> 
tableSchemaVersionsBetweenCatalogVersions(int tableId, int fromCatalogVersion, 
int toCatalogVersion) {
+        return tableVersionsBetween(tableId, fromCatalogVersion, 
toCatalogVersion)
+                
.map(CatalogValidationSchemasSource::fullSchemaFromTableDescriptor)
+                .collect(toList());
+    }
+
+    // It's ok to use Stream as the results of the methods that call this are 
cached.
+    private Stream<CatalogTableDescriptor> tableVersionsBetween(
+            int tableId,
+            int fromCatalogVersionIncluding,
+            int toCatalogVersionIncluding
+    ) {
+        return IntStream.rangeClosed(fromCatalogVersionIncluding, 
toCatalogVersionIncluding)
+                .mapToObj(catalogVersion -> catalogService.table(tableId, 
catalogVersion))
+                .filter(new Predicate<>() {
+                    int prevVersion = Integer.MIN_VALUE;
+
+                    @Override
+                    public boolean test(CatalogTableDescriptor 
tableDescriptor) {
+                        if (tableDescriptor.tableVersion() == prevVersion) {
+                            return false;
+                        }
+
+                        assert prevVersion == Integer.MIN_VALUE || 
tableDescriptor.tableVersion() == prevVersion + 1;
+
+                        prevVersion = tableDescriptor.tableVersion();
+
+                        return true;
+                    }
+                });
+    }
+
+    private List<FullTableSchema> 
tableSchemaVersionsBetweenCatalogAndTableVersions(
+            int tableId,
+            int fromCatalogVersion,
+            int toTableVersion
+    ) {
+        return tableVersionsBetween(tableId, fromCatalogVersion, 
catalogService.latestCatalogVersion())
+                .takeWhile(tableDescriptor -> tableDescriptor.tableVersion() 
<= toTableVersion)
+                
.map(CatalogValidationSchemasSource::fullSchemaFromTableDescriptor)
+                .collect(toList());
+    }
+
+    private static FullTableSchema 
fullSchemaFromTableDescriptor(CatalogTableDescriptor tableDescriptor) {
+        return new FullTableSchema(
+                tableDescriptor.tableVersion(),
+                tableDescriptor.id(),
+                tableDescriptor.columns()
+        );
+    }
+
+    private static class CatalogVersionsSpan {
+        private final int tableId;
+        private final int fromCatalogVersion;
+        private final int toCatalogVersion;
+
+        private CatalogVersionsSpan(int tableId, int fromCatalogVersion, int 
toCatalogVersion) {
+            this.tableId = tableId;
+            this.fromCatalogVersion = fromCatalogVersion;
+            this.toCatalogVersion = toCatalogVersion;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            CatalogVersionsSpan that = (CatalogVersionsSpan) o;
+            return tableId == that.tableId && fromCatalogVersion == 
that.fromCatalogVersion && toCatalogVersion == that.toCatalogVersion;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tableId, fromCatalogVersion, toCatalogVersion);

Review Comment:
   Maybe we can use the good old hash code calculation?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSource.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaManager;
+
+/**
+ * An implementation over {@link CatalogService}.
+ */
+public class CatalogValidationSchemasSource implements ValidationSchemasSource 
{
+    private final CatalogService catalogService;
+
+    private final SchemaManager schemaManager;
+
+    private final ConcurrentMap<CatalogVersionsSpan, List<FullTableSchema>> 
catalogVersionSpansCache = new ConcurrentHashMap<>();
+
+    // TODO: Remove entries from cache when compacting Catalog 
https://issues.apache.org/jira/browse/IGNITE-20790
+    // TODO: Remove entries from cache when compacting schemas in 
SchemaManager https://issues.apache.org/jira/browse/IGNITE-20789
+    private final ConcurrentMap<CatalogVersionToTableVersionSpan, 
List<FullTableSchema>> catalogVersionToTableVersionSpansCache
+            = new ConcurrentHashMap<>();
+
+    /** Constructor. */
+    public CatalogValidationSchemasSource(CatalogService catalogService, 
SchemaManager schemaManager) {
+        this.catalogService = catalogService;
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<Void> waitForSchemaAvailability(int tableId, int 
schemaVersion) {
+        return schemaManager.schemaRegistry(tableId)
+                .schemaAsync(schemaVersion)
+                .thenApply(unused -> null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        int fromCatalogVersion = 
catalogService.activeCatalogVersion(fromIncluding.longValue());
+        int toCatalogVersion = 
catalogService.activeCatalogVersion(toIncluding.longValue());
+
+        return catalogVersionSpansCache.computeIfAbsent(
+                new CatalogVersionsSpan(tableId, fromCatalogVersion, 
toCatalogVersion),
+                key -> tableSchemaVersionsBetweenCatalogVersions(tableId, 
fromCatalogVersion, toCatalogVersion)
+        );
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, int toIncluding) {
+        int fromCatalogVersion = 
catalogService.activeCatalogVersion(fromIncluding.longValue());
+
+        return catalogVersionToTableVersionSpansCache.computeIfAbsent(
+                new CatalogVersionToTableVersionSpan(tableId, 
fromCatalogVersion, toIncluding),
+                key -> 
tableSchemaVersionsBetweenCatalogAndTableVersions(tableId, fromCatalogVersion, 
toIncluding)
+        );
+    }
+
+    private List<FullTableSchema> 
tableSchemaVersionsBetweenCatalogVersions(int tableId, int fromCatalogVersion, 
int toCatalogVersion) {
+        return tableVersionsBetween(tableId, fromCatalogVersion, 
toCatalogVersion)
+                
.map(CatalogValidationSchemasSource::fullSchemaFromTableDescriptor)
+                .collect(toList());
+    }
+
+    // It's ok to use Stream as the results of the methods that call this are 
cached.
+    private Stream<CatalogTableDescriptor> tableVersionsBetween(
+            int tableId,
+            int fromCatalogVersionIncluding,
+            int toCatalogVersionIncluding
+    ) {
+        return IntStream.rangeClosed(fromCatalogVersionIncluding, 
toCatalogVersionIncluding)
+                .mapToObj(catalogVersion -> catalogService.table(tableId, 
catalogVersion))
+                .filter(new Predicate<>() {
+                    int prevVersion = Integer.MIN_VALUE;
+
+                    @Override
+                    public boolean test(CatalogTableDescriptor 
tableDescriptor) {
+                        if (tableDescriptor.tableVersion() == prevVersion) {
+                            return false;
+                        }
+
+                        assert prevVersion == Integer.MIN_VALUE || 
tableDescriptor.tableVersion() == prevVersion + 1;

Review Comment:
   Please add to assert message prevVersion and tableVersion



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSource.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaManager;
+
+/**
+ * An implementation over {@link CatalogService}.
+ */
+public class CatalogValidationSchemasSource implements ValidationSchemasSource 
{
+    private final CatalogService catalogService;
+
+    private final SchemaManager schemaManager;
+
+    private final ConcurrentMap<CatalogVersionsSpan, List<FullTableSchema>> 
catalogVersionSpansCache = new ConcurrentHashMap<>();
+
+    // TODO: Remove entries from cache when compacting Catalog 
https://issues.apache.org/jira/browse/IGNITE-20790
+    // TODO: Remove entries from cache when compacting schemas in 
SchemaManager https://issues.apache.org/jira/browse/IGNITE-20789
+    private final ConcurrentMap<CatalogVersionToTableVersionSpan, 
List<FullTableSchema>> catalogVersionToTableVersionSpansCache
+            = new ConcurrentHashMap<>();
+
+    /** Constructor. */
+    public CatalogValidationSchemasSource(CatalogService catalogService, 
SchemaManager schemaManager) {
+        this.catalogService = catalogService;
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<Void> waitForSchemaAvailability(int tableId, int 
schemaVersion) {
+        return schemaManager.schemaRegistry(tableId)
+                .schemaAsync(schemaVersion)
+                .thenApply(unused -> null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        int fromCatalogVersion = 
catalogService.activeCatalogVersion(fromIncluding.longValue());
+        int toCatalogVersion = 
catalogService.activeCatalogVersion(toIncluding.longValue());

Review Comment:
   Could you add a comment that it is safe for us to take catalog versions at a 
timestamps, since the external component should have already waited for schema 
synchronization.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSource.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaManager;
+
+/**
+ * An implementation over {@link CatalogService}.
+ */
+public class CatalogValidationSchemasSource implements ValidationSchemasSource 
{
+    private final CatalogService catalogService;
+
+    private final SchemaManager schemaManager;
+
+    private final ConcurrentMap<CatalogVersionsSpan, List<FullTableSchema>> 
catalogVersionSpansCache = new ConcurrentHashMap<>();
+
+    // TODO: Remove entries from cache when compacting Catalog 
https://issues.apache.org/jira/browse/IGNITE-20790
+    // TODO: Remove entries from cache when compacting schemas in 
SchemaManager https://issues.apache.org/jira/browse/IGNITE-20789
+    private final ConcurrentMap<CatalogVersionToTableVersionSpan, 
List<FullTableSchema>> catalogVersionToTableVersionSpansCache
+            = new ConcurrentHashMap<>();
+
+    /** Constructor. */
+    public CatalogValidationSchemasSource(CatalogService catalogService, 
SchemaManager schemaManager) {
+        this.catalogService = catalogService;
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<Void> waitForSchemaAvailability(int tableId, int 
schemaVersion) {
+        return schemaManager.schemaRegistry(tableId)
+                .schemaAsync(schemaVersion)
+                .thenApply(unused -> null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        int fromCatalogVersion = 
catalogService.activeCatalogVersion(fromIncluding.longValue());
+        int toCatalogVersion = 
catalogService.activeCatalogVersion(toIncluding.longValue());
+
+        return catalogVersionSpansCache.computeIfAbsent(
+                new CatalogVersionsSpan(tableId, fromCatalogVersion, 
toCatalogVersion),
+                key -> tableSchemaVersionsBetweenCatalogVersions(tableId, 
fromCatalogVersion, toCatalogVersion)
+        );
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, int toIncluding) {
+        int fromCatalogVersion = 
catalogService.activeCatalogVersion(fromIncluding.longValue());
+
+        return catalogVersionToTableVersionSpansCache.computeIfAbsent(
+                new CatalogVersionToTableVersionSpan(tableId, 
fromCatalogVersion, toIncluding),
+                key -> 
tableSchemaVersionsBetweenCatalogAndTableVersions(tableId, fromCatalogVersion, 
toIncluding)
+        );
+    }
+
+    private List<FullTableSchema> 
tableSchemaVersionsBetweenCatalogVersions(int tableId, int fromCatalogVersion, 
int toCatalogVersion) {
+        return tableVersionsBetween(tableId, fromCatalogVersion, 
toCatalogVersion)
+                
.map(CatalogValidationSchemasSource::fullSchemaFromTableDescriptor)
+                .collect(toList());
+    }
+
+    // It's ok to use Stream as the results of the methods that call this are 
cached.
+    private Stream<CatalogTableDescriptor> tableVersionsBetween(
+            int tableId,
+            int fromCatalogVersionIncluding,
+            int toCatalogVersionIncluding
+    ) {
+        return IntStream.rangeClosed(fromCatalogVersionIncluding, 
toCatalogVersionIncluding)
+                .mapToObj(catalogVersion -> catalogService.table(tableId, 
catalogVersion))
+                .filter(new Predicate<>() {
+                    int prevVersion = Integer.MIN_VALUE;
+
+                    @Override
+                    public boolean test(CatalogTableDescriptor 
tableDescriptor) {
+                        if (tableDescriptor.tableVersion() == prevVersion) {
+                            return false;
+                        }
+
+                        assert prevVersion == Integer.MIN_VALUE || 
tableDescriptor.tableVersion() == prevVersion + 1;
+
+                        prevVersion = tableDescriptor.tableVersion();
+
+                        return true;
+                    }
+                });
+    }
+
+    private List<FullTableSchema> 
tableSchemaVersionsBetweenCatalogAndTableVersions(
+            int tableId,
+            int fromCatalogVersion,
+            int toTableVersion
+    ) {
+        return tableVersionsBetween(tableId, fromCatalogVersion, 
catalogService.latestCatalogVersion())
+                .takeWhile(tableDescriptor -> tableDescriptor.tableVersion() 
<= toTableVersion)
+                
.map(CatalogValidationSchemasSource::fullSchemaFromTableDescriptor)
+                .collect(toList());
+    }
+
+    private static FullTableSchema 
fullSchemaFromTableDescriptor(CatalogTableDescriptor tableDescriptor) {
+        return new FullTableSchema(
+                tableDescriptor.tableVersion(),
+                tableDescriptor.id(),
+                tableDescriptor.columns()
+        );
+    }
+
+    private static class CatalogVersionsSpan {
+        private final int tableId;
+        private final int fromCatalogVersion;
+        private final int toCatalogVersion;
+
+        private CatalogVersionsSpan(int tableId, int fromCatalogVersion, int 
toCatalogVersion) {
+            this.tableId = tableId;
+            this.fromCatalogVersion = fromCatalogVersion;
+            this.toCatalogVersion = toCatalogVersion;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            CatalogVersionsSpan that = (CatalogVersionsSpan) o;
+            return tableId == that.tableId && fromCatalogVersion == 
that.fromCatalogVersion && toCatalogVersion == that.toCatalogVersion;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tableId, fromCatalogVersion, toCatalogVersion);
+        }
+    }
+
+    private static class CatalogVersionToTableVersionSpan {
+        private final int tableId;
+        private final int fromCatalogVersion;
+        private final int toTableVersion;
+
+        private CatalogVersionToTableVersionSpan(int tableId, int 
fromCatalogVersion, int toTableVersion) {
+            this.tableId = tableId;
+            this.fromCatalogVersion = fromCatalogVersion;
+            this.toTableVersion = toTableVersion;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            CatalogVersionToTableVersionSpan that = 
(CatalogVersionToTableVersionSpan) o;
+            return tableId == that.tableId && fromCatalogVersion == 
that.fromCatalogVersion && toTableVersion == that.toTableVersion;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tableId, fromCatalogVersion, toTableVersion);

Review Comment:
   Same



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSource.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaManager;
+
+/**
+ * An implementation over {@link CatalogService}.
+ */
+public class CatalogValidationSchemasSource implements ValidationSchemasSource 
{
+    private final CatalogService catalogService;
+
+    private final SchemaManager schemaManager;
+
+    private final ConcurrentMap<CatalogVersionsSpan, List<FullTableSchema>> 
catalogVersionSpansCache = new ConcurrentHashMap<>();
+
+    // TODO: Remove entries from cache when compacting Catalog 
https://issues.apache.org/jira/browse/IGNITE-20790
+    // TODO: Remove entries from cache when compacting schemas in 
SchemaManager https://issues.apache.org/jira/browse/IGNITE-20789
+    private final ConcurrentMap<CatalogVersionToTableVersionSpan, 
List<FullTableSchema>> catalogVersionToTableVersionSpansCache
+            = new ConcurrentHashMap<>();
+
+    /** Constructor. */
+    public CatalogValidationSchemasSource(CatalogService catalogService, 
SchemaManager schemaManager) {
+        this.catalogService = catalogService;
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<Void> waitForSchemaAvailability(int tableId, int 
schemaVersion) {
+        return schemaManager.schemaRegistry(tableId)
+                .schemaAsync(schemaVersion)
+                .thenApply(unused -> null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        int fromCatalogVersion = 
catalogService.activeCatalogVersion(fromIncluding.longValue());
+        int toCatalogVersion = 
catalogService.activeCatalogVersion(toIncluding.longValue());
+
+        return catalogVersionSpansCache.computeIfAbsent(
+                new CatalogVersionsSpan(tableId, fromCatalogVersion, 
toCatalogVersion),
+                key -> tableSchemaVersionsBetweenCatalogVersions(tableId, 
fromCatalogVersion, toCatalogVersion)
+        );
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, int toIncluding) {
+        int fromCatalogVersion = 
catalogService.activeCatalogVersion(fromIncluding.longValue());
+
+        return catalogVersionToTableVersionSpansCache.computeIfAbsent(
+                new CatalogVersionToTableVersionSpan(tableId, 
fromCatalogVersion, toIncluding),
+                key -> 
tableSchemaVersionsBetweenCatalogAndTableVersions(tableId, fromCatalogVersion, 
toIncluding)
+        );
+    }
+
+    private List<FullTableSchema> 
tableSchemaVersionsBetweenCatalogVersions(int tableId, int fromCatalogVersion, 
int toCatalogVersion) {
+        return tableVersionsBetween(tableId, fromCatalogVersion, 
toCatalogVersion)
+                
.map(CatalogValidationSchemasSource::fullSchemaFromTableDescriptor)
+                .collect(toList());
+    }
+
+    // It's ok to use Stream as the results of the methods that call this are 
cached.
+    private Stream<CatalogTableDescriptor> tableVersionsBetween(
+            int tableId,
+            int fromCatalogVersionIncluding,
+            int toCatalogVersionIncluding
+    ) {
+        return IntStream.rangeClosed(fromCatalogVersionIncluding, 
toCatalogVersionIncluding)
+                .mapToObj(catalogVersion -> catalogService.table(tableId, 
catalogVersion))
+                .filter(new Predicate<>() {
+                    int prevVersion = Integer.MIN_VALUE;
+
+                    @Override
+                    public boolean test(CatalogTableDescriptor 
tableDescriptor) {
+                        if (tableDescriptor.tableVersion() == prevVersion) {
+                            return false;
+                        }
+
+                        assert prevVersion == Integer.MIN_VALUE || 
tableDescriptor.tableVersion() == prevVersion + 1;
+
+                        prevVersion = tableDescriptor.tableVersion();
+
+                        return true;
+                    }
+                });
+    }
+
+    private List<FullTableSchema> 
tableSchemaVersionsBetweenCatalogAndTableVersions(
+            int tableId,
+            int fromCatalogVersion,
+            int toTableVersion
+    ) {
+        return tableVersionsBetween(tableId, fromCatalogVersion, 
catalogService.latestCatalogVersion())
+                .takeWhile(tableDescriptor -> tableDescriptor.tableVersion() 
<= toTableVersion)
+                
.map(CatalogValidationSchemasSource::fullSchemaFromTableDescriptor)
+                .collect(toList());
+    }
+
+    private static FullTableSchema 
fullSchemaFromTableDescriptor(CatalogTableDescriptor tableDescriptor) {
+        return new FullTableSchema(
+                tableDescriptor.tableVersion(),
+                tableDescriptor.id(),
+                tableDescriptor.columns()
+        );
+    }
+
+    private static class CatalogVersionsSpan {

Review Comment:
   Please move to separete package private class.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java:
##########
@@ -240,4 +260,34 @@ void failIfRequestSchemaDiffersFromTxTs(HybridTimestamp 
txTs, int requestSchemaV
             throw new InternalSchemaVersionMismatchException();
         }
     }
+
+    private static class DiffKey {
+        private final int tableId;
+        private final int fromSchemaVersion;
+        private final int toSchemaVersion;
+
+        private DiffKey(int tableId, int fromSchemaVersion, int 
toSchemaVersion) {
+            this.tableId = tableId;
+            this.fromSchemaVersion = fromSchemaVersion;
+            this.toSchemaVersion = toSchemaVersion;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            DiffKey diffKey = (DiffKey) o;
+            return tableId == diffKey.tableId && fromSchemaVersion == 
diffKey.fromSchemaVersion
+                    && toSchemaVersion == diffKey.toSchemaVersion;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tableId, fromSchemaVersion, toSchemaVersion);

Review Comment:
   Maybe we shouldn’t use varags, but honestly calculate the hash code the old 
fashioned way?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSource.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaManager;
+
+/**
+ * An implementation over {@link CatalogService}.
+ */
+public class CatalogValidationSchemasSource implements ValidationSchemasSource 
{
+    private final CatalogService catalogService;
+
+    private final SchemaManager schemaManager;
+
+    private final ConcurrentMap<CatalogVersionsSpan, List<FullTableSchema>> 
catalogVersionSpansCache = new ConcurrentHashMap<>();
+
+    // TODO: Remove entries from cache when compacting Catalog 
https://issues.apache.org/jira/browse/IGNITE-20790
+    // TODO: Remove entries from cache when compacting schemas in 
SchemaManager https://issues.apache.org/jira/browse/IGNITE-20789
+    private final ConcurrentMap<CatalogVersionToTableVersionSpan, 
List<FullTableSchema>> catalogVersionToTableVersionSpansCache
+            = new ConcurrentHashMap<>();
+
+    /** Constructor. */
+    public CatalogValidationSchemasSource(CatalogService catalogService, 
SchemaManager schemaManager) {
+        this.catalogService = catalogService;
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<Void> waitForSchemaAvailability(int tableId, int 
schemaVersion) {
+        return schemaManager.schemaRegistry(tableId)
+                .schemaAsync(schemaVersion)
+                .thenApply(unused -> null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        int fromCatalogVersion = 
catalogService.activeCatalogVersion(fromIncluding.longValue());
+        int toCatalogVersion = 
catalogService.activeCatalogVersion(toIncluding.longValue());
+
+        return catalogVersionSpansCache.computeIfAbsent(
+                new CatalogVersionsSpan(tableId, fromCatalogVersion, 
toCatalogVersion),
+                key -> tableSchemaVersionsBetweenCatalogVersions(tableId, 
fromCatalogVersion, toCatalogVersion)
+        );
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, int toIncluding) {

Review Comment:
   `int toIncluding` confused me, let's rename it, if I understood correctly, 
this is `toTableVersionIncluding` ?
   



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSource.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaManager;
+
+/**
+ * An implementation over {@link CatalogService}.
+ */
+public class CatalogValidationSchemasSource implements ValidationSchemasSource 
{
+    private final CatalogService catalogService;
+
+    private final SchemaManager schemaManager;
+
+    private final ConcurrentMap<CatalogVersionsSpan, List<FullTableSchema>> 
catalogVersionSpansCache = new ConcurrentHashMap<>();
+
+    // TODO: Remove entries from cache when compacting Catalog 
https://issues.apache.org/jira/browse/IGNITE-20790
+    // TODO: Remove entries from cache when compacting schemas in 
SchemaManager https://issues.apache.org/jira/browse/IGNITE-20789
+    private final ConcurrentMap<CatalogVersionToTableVersionSpan, 
List<FullTableSchema>> catalogVersionToTableVersionSpansCache
+            = new ConcurrentHashMap<>();
+
+    /** Constructor. */
+    public CatalogValidationSchemasSource(CatalogService catalogService, 
SchemaManager schemaManager) {
+        this.catalogService = catalogService;
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<Void> waitForSchemaAvailability(int tableId, int 
schemaVersion) {
+        return schemaManager.schemaRegistry(tableId)
+                .schemaAsync(schemaVersion)
+                .thenApply(unused -> null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        int fromCatalogVersion = 
catalogService.activeCatalogVersion(fromIncluding.longValue());
+        int toCatalogVersion = 
catalogService.activeCatalogVersion(toIncluding.longValue());
+
+        return catalogVersionSpansCache.computeIfAbsent(
+                new CatalogVersionsSpan(tableId, fromCatalogVersion, 
toCatalogVersion),
+                key -> tableSchemaVersionsBetweenCatalogVersions(tableId, 
fromCatalogVersion, toCatalogVersion)
+        );
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, int toIncluding) {
+        int fromCatalogVersion = 
catalogService.activeCatalogVersion(fromIncluding.longValue());
+
+        return catalogVersionToTableVersionSpansCache.computeIfAbsent(
+                new CatalogVersionToTableVersionSpan(tableId, 
fromCatalogVersion, toIncluding),
+                key -> 
tableSchemaVersionsBetweenCatalogAndTableVersions(tableId, fromCatalogVersion, 
toIncluding)
+        );
+    }
+
+    private List<FullTableSchema> 
tableSchemaVersionsBetweenCatalogVersions(int tableId, int fromCatalogVersion, 
int toCatalogVersion) {
+        return tableVersionsBetween(tableId, fromCatalogVersion, 
toCatalogVersion)
+                
.map(CatalogValidationSchemasSource::fullSchemaFromTableDescriptor)
+                .collect(toList());
+    }
+
+    // It's ok to use Stream as the results of the methods that call this are 
cached.
+    private Stream<CatalogTableDescriptor> tableVersionsBetween(
+            int tableId,
+            int fromCatalogVersionIncluding,
+            int toCatalogVersionIncluding
+    ) {
+        return IntStream.rangeClosed(fromCatalogVersionIncluding, 
toCatalogVersionIncluding)
+                .mapToObj(catalogVersion -> catalogService.table(tableId, 
catalogVersion))
+                .filter(new Predicate<>() {
+                    int prevVersion = Integer.MIN_VALUE;
+
+                    @Override
+                    public boolean test(CatalogTableDescriptor 
tableDescriptor) {
+                        if (tableDescriptor.tableVersion() == prevVersion) {
+                            return false;
+                        }
+
+                        assert prevVersion == Integer.MIN_VALUE || 
tableDescriptor.tableVersion() == prevVersion + 1;
+
+                        prevVersion = tableDescriptor.tableVersion();
+
+                        return true;
+                    }
+                });
+    }
+
+    private List<FullTableSchema> 
tableSchemaVersionsBetweenCatalogAndTableVersions(
+            int tableId,
+            int fromCatalogVersion,
+            int toTableVersion
+    ) {
+        return tableVersionsBetween(tableId, fromCatalogVersion, 
catalogService.latestCatalogVersion())
+                .takeWhile(tableDescriptor -> tableDescriptor.tableVersion() 
<= toTableVersion)
+                
.map(CatalogValidationSchemasSource::fullSchemaFromTableDescriptor)
+                .collect(toList());
+    }
+
+    private static FullTableSchema 
fullSchemaFromTableDescriptor(CatalogTableDescriptor tableDescriptor) {
+        return new FullTableSchema(
+                tableDescriptor.tableVersion(),
+                tableDescriptor.id(),
+                tableDescriptor.columns()
+        );
+    }
+
+    private static class CatalogVersionsSpan {
+        private final int tableId;
+        private final int fromCatalogVersion;
+        private final int toCatalogVersion;
+
+        private CatalogVersionsSpan(int tableId, int fromCatalogVersion, int 
toCatalogVersion) {
+            this.tableId = tableId;
+            this.fromCatalogVersion = fromCatalogVersion;
+            this.toCatalogVersion = toCatalogVersion;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            CatalogVersionsSpan that = (CatalogVersionsSpan) o;
+            return tableId == that.tableId && fromCatalogVersion == 
that.fromCatalogVersion && toCatalogVersion == that.toCatalogVersion;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tableId, fromCatalogVersion, toCatalogVersion);
+        }
+    }
+
+    private static class CatalogVersionToTableVersionSpan {

Review Comment:
   Please same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to