korlov42 commented on code in PR #6593: URL: https://github.com/apache/ignite-3/pull/6593#discussion_r2387740409
########## modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/VersionedSchemaManager.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.framework; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; +import org.apache.ignite.internal.sql.engine.schema.IgniteSchemas; + +/** + * Dummy wrapper for predefined collection of schemas with possibility to change catalog version. + * + * @see PredefinedSchemaManager + */ +public class VersionedSchemaManager extends PredefinedSchemaManager { + private final AtomicInteger ver; + + /** Constructor. */ + public VersionedSchemaManager(IgniteSchema schema, AtomicInteger ver) { + super(schema); + + this.ver = ver; + } + + /** {@inheritDoc} */ + @Override + public int catalogVersion(long timestamp) { + return ver == null ? super.catalogVersion(timestamp) : ver.get(); Review Comment: if `ver` is supposed to be null, then let's make this attribute as well as constructor parameter as `nullable` ########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/Cache.java: ########## @@ -73,6 +75,22 @@ public interface Cache<K, V> { */ V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction); + /** + * If the value for the specified key is present and non-null, attempts to compute a new mapping + * given the key and its current mapped value. + * If the remapping function returns null, the mapping is removed. + * If the remapping function itself throws an (unchecked) exception, the exception is rethrown, + * and the current mapping is left unchanged. + * + * @param key Key with which the specified value is to be associated. + * @param remappingFunction The remapping function to compute a value. + * @return The new value associated with the specified key, or null if none. + */ + @Nullable V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction); Review Comment: may you elaborate please? If key is evicted, you should get `null` value instead of actual value. And if you don't need to create new mapping -- just return null. ########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java: ########## @@ -127,9 +135,16 @@ private void updateTableSizeStatistics(int tableId, boolean force) { return new ActualSize(Math.max(size, 1), currTimestamp); }); - }).exceptionally(e -> { - LOG.info("Can't calculate size for table [id={}].", e, tableId); - return null; + }).handle((res, err) -> { + if (err != null) { + LOG.warn(format("Can't calculate size for table [id={}].", tableId), err); + + return null; + } else { + changesUpdater.accept(tableId); Review Comment: null-check is missing ########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java: ########## @@ -130,6 +137,12 @@ private void updateTableSizeStatistics(int tableId, boolean force) { }).exceptionally(e -> { LOG.info("Can't calculate size for table [id={}].", e, tableId); return null; + }).whenComplete((ignored, ex) -> { + if (ex == null) { Review Comment: I checked `org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImplTest#testEstimationFailure` but it still passes with old implementation ########## modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java: ########## @@ -113,6 +115,36 @@ public void checkTableSize() { verify(internalTable, times(1)).estimatedSize(); } + @Test + public void testEstimationFailure() throws Exception { Review Comment: ```suggestion public void testEstimationFailure() { ``` ########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManager.java: ########## @@ -28,6 +29,9 @@ public interface SqlStatisticManager extends LifecycleAware { */ long tableSize(int tableId); + /** Plan updater callback. */ + void setListener(IntConsumer updater); Review Comment: you still are using `IntConsumer`... What this `int` is supposed to represent? Table size? Number of partition? ########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java: ########## @@ -858,6 +1046,149 @@ private static ParameterMetadata createParameterMetadata(RelDataType parameterRo return new ParameterMetadata(parameterTypes); } + public void statisticsChanged(int tableId) { + planUpdater.statisticsChanged(tableId); + } + + private static class PlanUpdater { + private final ClockService clockService; + + private final ScheduledExecutorService planUpdater; + + private final AtomicReference<CompletableFuture<PlanInfo>> rePlanningFut = new AtomicReference<>(nullCompletedFuture()); + + private volatile boolean recalculatePlans; + + private final Cache<CacheKey, CompletableFuture<PlanInfo>> cache; + + private final SqlSchemaManager schemaManager; + + private final long plannerTimeout; + + private final PlanPrepare prepare; + + PlanUpdater( + ClockService clockService, + ScheduledExecutorService planUpdater, + Cache<CacheKey, CompletableFuture<PlanInfo>> cache, + SqlSchemaManager schemaManager, + long plannerTimeout, + PlanPrepare prepare + ) { + this.clockService = clockService; + this.planUpdater = planUpdater; + this.cache = cache; + this.schemaManager = schemaManager; + this.plannerTimeout = plannerTimeout; + this.prepare = prepare; + } + + /** + * Reacts to the changed statistic. + * + * @param tableId Table Id statistic changed for. + */ + void statisticsChanged(int tableId) { + Set<Entry<CacheKey, CompletableFuture<PlanInfo>>> cachedEntries = cache.entrySet(); + + int currentCatalogVersion = schemaManager.catalogVersion(clockService.now().longValue()); + + boolean statChanged = false; + + for (Map.Entry<CacheKey, CompletableFuture<PlanInfo>> ent : cachedEntries) { Review Comment: I see you added infinite loop to the test ``` // infinitely change statistic IgniteTestUtils.runAsync(() -> { while (true) { service.statisticsChanged(table.id()); Thread.sleep(100); } }); ``` so, you've constantly renews the plan for the same query, and the next assertion `assertNotSame(p0, p2);` passes as if the plan was evicted and the next call to `prepare` triggers the planning. In fact, you just constantly replans the query in the infinite loop. This not only doesn't make sure expiration is not affected, but get the original check broken ########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java: ########## @@ -142,6 +157,10 @@ public class PrepareServiceImpl implements PrepareService { private volatile ThreadPoolExecutor planningPool; + private final PlanUpdater planUpdater; + + private final ClockService clockService; Review Comment: this one is not answered ########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java: ########## @@ -130,6 +137,12 @@ private void updateTableSizeStatistics(int tableId, boolean force) { }).exceptionally(e -> { LOG.info("Can't calculate size for table [id={}].", e, tableId); return null; + }).whenComplete((ignored, ex) -> { + if (ex == null) { + if (planUpdater != null) { + planUpdater.accept(tableId); Review Comment: so, you've just removed `volatile` keyword and null-check, and this makes the patch even worse ########## modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java: ########## @@ -65,6 +67,8 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { private final CatalogService catalogService; private final LowWatermark lowWatermark; + private IntConsumer changesUpdater; Review Comment: `changesUpdater` is supposed to be set lazily, but doesn't marked neither as `@Nullable` nor as volatile. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
