korlov42 commented on code in PR #6593:
URL: https://github.com/apache/ignite-3/pull/6593#discussion_r2387740409


##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/VersionedSchemaManager.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.framework;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchemas;
+
+/**
+ * Dummy wrapper for predefined collection of schemas with possibility to 
change catalog version.
+ *
+ * @see PredefinedSchemaManager
+ */
+public class VersionedSchemaManager extends PredefinedSchemaManager {
+    private final AtomicInteger ver;
+
+    /** Constructor. */
+    public VersionedSchemaManager(IgniteSchema schema, AtomicInteger ver) {
+        super(schema);
+
+        this.ver = ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int catalogVersion(long timestamp) {
+        return ver == null ? super.catalogVersion(timestamp) : ver.get();

Review Comment:
   if `ver` is supposed to be null, then let's make this attribute as well as 
constructor parameter as `nullable` 



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/cache/Cache.java:
##########
@@ -73,6 +75,22 @@ public interface Cache<K, V> {
      */
     V compute(K key, BiFunction<? super K, ? super V, ? extends V> 
remappingFunction);
 
+    /**
+     * If the value for the specified key is present and non-null, attempts to 
compute a new mapping
+     * given the key and its current mapped value.
+     * If the remapping function returns null, the mapping is removed.
+     * If the remapping function itself throws an (unchecked) exception, the 
exception is rethrown,
+     * and the current mapping is left unchanged.
+     *
+     * @param key Key with which the specified value is to be associated.
+     * @param remappingFunction The remapping function to compute a value.
+     * @return The new value associated with the specified key, or null if 
none.
+     */
+    @Nullable V computeIfPresent(K key, BiFunction<? super K, ? super V, ? 
extends V> remappingFunction);

Review Comment:
   may you elaborate please? If key is evicted, you should get `null` value 
instead of actual value. And if you don't need to create new mapping -- just 
return null. 



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java:
##########
@@ -127,9 +135,16 @@ private void updateTableSizeStatistics(int tableId, 
boolean force) {
 
                             return new ActualSize(Math.max(size, 1), 
currTimestamp);
                         });
-                    }).exceptionally(e -> {
-                        LOG.info("Can't calculate size for table [id={}].", e, 
tableId);
-                        return null;
+                    }).handle((res, err) -> {
+                        if (err != null) {
+                            LOG.warn(format("Can't calculate size for table 
[id={}].", tableId), err);
+
+                            return null;
+                        } else {
+                            changesUpdater.accept(tableId);

Review Comment:
   null-check is missing



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java:
##########
@@ -130,6 +137,12 @@ private void updateTableSizeStatistics(int tableId, 
boolean force) {
                     }).exceptionally(e -> {
                         LOG.info("Can't calculate size for table [id={}].", e, 
tableId);
                         return null;
+                    }).whenComplete((ignored, ex) -> {
+                        if (ex == null) {

Review Comment:
   I checked 
`org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImplTest#testEstimationFailure`
 but it still passes with old implementation



##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java:
##########
@@ -113,6 +115,36 @@ public void checkTableSize() {
         verify(internalTable, times(1)).estimatedSize();
     }
 
+    @Test
+    public void testEstimationFailure() throws Exception {

Review Comment:
   ```suggestion
       public void testEstimationFailure() {
   ```



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManager.java:
##########
@@ -28,6 +29,9 @@ public interface SqlStatisticManager extends LifecycleAware {
      */
     long tableSize(int tableId);
 
+    /** Plan updater callback. */
+    void setListener(IntConsumer updater);

Review Comment:
   you still are using `IntConsumer`... What this `int` is supposed to 
represent? Table size? Number of partition?



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java:
##########
@@ -858,6 +1046,149 @@ private static ParameterMetadata 
createParameterMetadata(RelDataType parameterRo
         return new ParameterMetadata(parameterTypes);
     }
 
+    public void statisticsChanged(int tableId) {
+        planUpdater.statisticsChanged(tableId);
+    }
+
+    private static class PlanUpdater {
+        private final ClockService clockService;
+
+        private final ScheduledExecutorService planUpdater;
+
+        private final AtomicReference<CompletableFuture<PlanInfo>> 
rePlanningFut = new AtomicReference<>(nullCompletedFuture());
+
+        private volatile boolean recalculatePlans;
+
+        private final Cache<CacheKey, CompletableFuture<PlanInfo>> cache;
+
+        private final SqlSchemaManager schemaManager;
+
+        private final long plannerTimeout;
+
+        private final PlanPrepare prepare;
+
+        PlanUpdater(
+                ClockService clockService,
+                ScheduledExecutorService planUpdater,
+                Cache<CacheKey, CompletableFuture<PlanInfo>> cache,
+                SqlSchemaManager schemaManager,
+                long plannerTimeout,
+                PlanPrepare prepare
+        ) {
+            this.clockService = clockService;
+            this.planUpdater = planUpdater;
+            this.cache = cache;
+            this.schemaManager = schemaManager;
+            this.plannerTimeout = plannerTimeout;
+            this.prepare = prepare;
+        }
+
+        /**
+         * Reacts to the changed statistic.
+         *
+         * @param tableId Table Id statistic changed for.
+         */
+        void statisticsChanged(int tableId) {
+            Set<Entry<CacheKey, CompletableFuture<PlanInfo>>> cachedEntries = 
cache.entrySet();
+
+            int currentCatalogVersion = 
schemaManager.catalogVersion(clockService.now().longValue());
+
+            boolean statChanged = false;
+
+            for (Map.Entry<CacheKey, CompletableFuture<PlanInfo>> ent : 
cachedEntries) {

Review Comment:
   I see you added infinite loop to the test
   ```
               // infinitely change statistic
               IgniteTestUtils.runAsync(() -> {
                   while (true) {
                       service.statisticsChanged(table.id());
                       Thread.sleep(100);
                   }
               });
   ```
   
   so, you've constantly renews the plan for the same query, and the next 
assertion `assertNotSame(p0, p2);` passes as if the plan was evicted and the 
next call to `prepare` triggers the planning. In fact, you just constantly 
replans the query in the infinite loop. This not only doesn't make sure 
expiration is not affected, but get the original check broken



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java:
##########
@@ -142,6 +157,10 @@ public class PrepareServiceImpl implements PrepareService {
 
     private volatile ThreadPoolExecutor planningPool;
 
+    private final PlanUpdater planUpdater;
+
+    private final ClockService clockService;

Review Comment:
   this one is not answered



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java:
##########
@@ -130,6 +137,12 @@ private void updateTableSizeStatistics(int tableId, 
boolean force) {
                     }).exceptionally(e -> {
                         LOG.info("Can't calculate size for table [id={}].", e, 
tableId);
                         return null;
+                    }).whenComplete((ignored, ex) -> {
+                        if (ex == null) {
+                            if (planUpdater != null) {
+                                planUpdater.accept(tableId);

Review Comment:
   so, you've just removed `volatile` keyword and null-check, and this makes 
the patch even worse 



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java:
##########
@@ -65,6 +67,8 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticManager {
     private final CatalogService catalogService;
     private final LowWatermark lowWatermark;
 
+    private IntConsumer changesUpdater;

Review Comment:
   `changesUpdater` is supposed to be set lazily, but doesn't marked neither as 
`@Nullable` nor as volatile.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to