Berkof commented on a change in pull request #9423:
URL: https://github.com/apache/ignite/pull/9423#discussion_r716296022



##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
##########
@@ -260,12 +289,14 @@ public IgniteStatisticsRepository statisticsRepository() {
 
     /** {@inheritDoc} */
     @Override public void dropAll() throws IgniteCheckedException {
+        checkStatisticsState("drop all statistics");
+
         statCfgMgr.dropAll();
     }
 
     /** {@inheritDoc} */
     @Override public void stop() {
-        disableOperations();
+        stateChanged();

Review comment:
       Done.

##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepository.java
##########
@@ -365,6 +265,23 @@ public void saveLocalStatistics(StatisticsKey key, 
ObjectStatisticsImpl statisti
         locStats.put(key, statistics);
     }
 
+    /**
+     * Clear specified partition ids statistics.
+     *
+     * @param key Key to remove statistics by.
+     * @param partsToRemove Set of parititon ids to remove.
+     */
+    public void clearLocalPartitionIdsStatistics(StatisticsKey key, 
Set<Integer> partsToRemove) {

Review comment:
       Remove Ids, keep Partition. Local statistics also are stored by the 
StatisticsRepository but these method clears partition statistics.

##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsProcessor.java
##########
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import 
org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import 
org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import 
org.apache.ignite.internal.processors.query.stat.task.GatherPartitionStatistics;
+import org.apache.ignite.internal.util.GridBusyLock;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * Process all tasks, related to statistics repository. Mostly - statistics 
collection,
+ * invalidation (due to configuration, topology or obsolescence issues) and 
loads.
+ * Input tasks should be scheduled throug management pool while gathering pool 
used to process heavy
+ * operations in parallel.
+ *
+ * Manage gathering pool and it's jobs. To guarantee gracefull shutdown:
+ * 1) Any job can be added into gatheringInProgress only in active state 
(check after adding)
+ * 2) State can be disactivated only after cancelling all jobs and getting 
busyLock block
+ * 3) Each job should do it's work in busyLock with periodically checking of 
it's cancellation status.
+ */
+public class StatisticsProcessor {
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Ignite statistics repository. */
+    private final IgniteStatisticsRepository statRepo;
+
+    /** Ignite Thread pool executor to do statistics collection tasks. */
+    private final IgniteThreadPoolExecutor gatherPool;
+
+    /** (cacheGroupId -> gather context) */
+    private final ConcurrentMap<StatisticsKey, 
LocalStatisticsGatheringContext> gatheringInProgress =
+        new ConcurrentHashMap<>();
+
+    /** Active flag (used to skip commands in inactive cluster.) */
+    private volatile boolean active;
+
+    /* Lock protection of started gathering during deactivation. */
+    private static final GridBusyLock busyLock = new GridBusyLock();
+
+    /**
+     * Constructor.
+     *
+     * @param repo IgniteStatisticsRepository.
+     * @param gatherPool Thread pool to gather statistics in.
+     * @param logSupplier Log supplier function.
+     */
+    public StatisticsProcessor(
+        IgniteStatisticsRepository repo,
+        IgniteThreadPoolExecutor gatherPool,
+        Function<Class<?>, IgniteLogger> logSupplier
+    ) {
+        this.statRepo = repo;
+        this.gatherPool = gatherPool;
+        this.log = logSupplier.apply(StatisticsProcessor.class);
+    }
+
+    /**
+     * Update statistics for the given key to actual state.
+     * If byObsolescence and tbl is not {@code null} - does not clear any 
other partitions.
+     * Should run throw management pool only.
+     * 1) Replace previous gathering context if exist and needed (replace 
byObsolescence gathering with new one or
+     * replace gathering with older configuration or topology version with new 
one).
+     * 2) If byObsolescence and no table awailable - clean obsolescence and 
partition statistics for the given key.
+     * 3) Submit tasks for each specified partition.
+     * 4) after last task finish gathering - it starts aggregation.
+     * 5) read all partitions & obsolescence from repo and
+     * if byObsolescence = {@code true} - remove unnecessary one and aggregate 
by specified list
+     * if byObsolescence = {@code false} - aggregate all presented in store 
(because it should contains only actual ones)
+     * 5) save aggregated local statistics
+     *
+     * @param byObsolescence Update only obsolescence partitions.
+     * @param tbl Table to update. If {@code null} - just clear all partitions 
and obsolescence from the repo
+     * @param cfg Statistics configuration to use.
+     * @param partsToProcess Partitions to update, if !byObsolescence - all 
primary partitions for the given topology.
+     * @param topVer Topology version, can be {@code null} if tbl is null.
+     */
+    public void updateKeyAsync(
+        boolean byObsolescence,
+        GridH2Table tbl,
+        StatisticsObjectConfiguration cfg,
+        Set<Integer> partsToProcess,
+        AffinityTopologyVersion topVer
+    ) {
+        if (!startJob("Updating key " + cfg.key()))
+            return;
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format(
+                    "Start statistics processing: byObsolescence=%b, cfg=%s, 
partToProcess = %s, topVer=%s",
+                    byObsolescence, cfg, partsToProcess, topVer));
+            }
+
+            LocalStatisticsGatheringContext newCtx = new 
LocalStatisticsGatheringContext(byObsolescence, tbl, cfg,
+                partsToProcess, topVer);
+            LocalStatisticsGatheringContext registeredCtx = 
registerNewTask(newCtx);
+
+            if (registeredCtx != null) {
+
+                prepareTask(registeredCtx);
+
+                if (registeredCtx.table() != null && 
!registeredCtx.remainingParts().isEmpty() &&
+                    !registeredCtx.configuration().columns().isEmpty())
+                    submitTasks(registeredCtx);
+                else {
+                    
gatheringInProgress.remove(registeredCtx.configuration().key(), registeredCtx);
+
+                    assert registeredCtx.remainingParts().isEmpty();
+                    assert registeredCtx.finished().getCount() == 0;
+                }
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Gathered by key " + cfg.key() + " were skipped 
due to previous one.");
+            }
+        }
+        finally {
+            endJob();
+        }
+    }
+
+    /**
+     * Do preparation step before schedule any gathering.
+     *
+     * @param ctx Context to do preparations.
+     */
+    private void prepareTask(LocalStatisticsGatheringContext ctx) {
+        try {
+            if (ctx.byObsolescence())
+                statRepo.saveObsolescenceInfo(ctx.configuration().key());
+
+            if (ctx.table() == null || ctx.configuration().columns().isEmpty())
+                
statRepo.clearLocalPartitionsStatistics(ctx.configuration().key(), null);
+        }
+        catch (Throwable t) {
+            ctx.future().cancel(true);
+        }
+        finally {
+            // Prepare phase done
+            ctx.finished().countDown();
+        }
+    }
+
+    /**
+     * Try to register new task.
+     *
+     * @param ctx Task to register.
+     * @return Registered task.
+     */
+    private LocalStatisticsGatheringContext 
registerNewTask(LocalStatisticsGatheringContext ctx) {
+        LocalStatisticsGatheringContext ctxToSubmit[] = new 
LocalStatisticsGatheringContext[1];
+        LocalStatisticsGatheringContext ctxToAwait[] = new 
LocalStatisticsGatheringContext[1];
+
+        gatheringInProgress.compute(ctx.configuration().key(), (k, v) -> {
+            if (v == null) {
+                // No old context - start new
+                ctxToSubmit[0] = ctx;
+
+                return ctx;
+            }
+
+            if ((v.byObsolescence() && ctx.byObsolescence()) ||
+                (v.topologyVersion().before(ctx.topologyVersion()) ||
+                    !checkStatisticsCfg(v.configuration(), 
ctx.configuration()))) {
+                // Old context for older topology or config - cancel and start 
new
+                v.future().cancel(true);
+                ctxToAwait[0] = v;
+
+                ctxToSubmit[0] = ctx;
+
+                return ctx;
+            }
+
+            ctxToSubmit[0] = null;
+
+            return v;
+        });
+
+        // Can't wait in map critical section (to allow task to try to remove 
itselves), but
+        // have to wait here in busyLock to do gracefull shutdown.
+        if (ctxToAwait[0] != null) {
+            try {
+                ctxToAwait[0].finished().await();
+            }
+            catch (InterruptedException e) {
+                log.warning("Unable to wait statistics gathering task finished 
by key " +
+                    ctx.configuration().key(), e);
+            }
+        }
+
+        return ctxToSubmit[0];
+    }
+
+    /**
+     * Test if statistics configuration is fit to all required versions and 
doesn't
+     * contain any extra column configurations.
+     *
+     * @param existingCfg Statistics configuration to check.
+     * @param targetCfg Target configuration.
+     * @return {@code true} if it is, {@code false} otherwise.
+     */
+    private boolean checkStatisticsCfg(
+        StatisticsObjectConfiguration existingCfg,
+        StatisticsObjectConfiguration targetCfg
+    ) {
+        if (existingCfg == targetCfg)
+            return true;
+
+        if (existingCfg.columns().size() != targetCfg.columns().size())
+            return false;
+
+        for (Map.Entry<String, StatisticsColumnConfiguration> 
targetColCfgEntry : targetCfg.columns().entrySet()) {
+            StatisticsColumnConfiguration existingColCfg = 
existingCfg.columns().get(targetColCfgEntry.getKey());
+
+            if (existingColCfg == null || existingColCfg.version() < 
targetColCfgEntry.getValue().version())
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Generate and subtim tasks into gathering pool for existing gathering 
context.
+     *
+     * @param ctx Context to generate tasks by.
+     */
+    private void submitTasks(LocalStatisticsGatheringContext ctx) {
+        for (int part : ctx.remainingParts()) {
+            final GatherPartitionStatistics task = new 
GatherPartitionStatistics(
+                statRepo,
+                ctx,
+                part,
+                log
+            );
+
+            submitTask(ctx, task);
+        }
+    }
+
+    /**
+     *
+     * @param ctx
+     */
+    private void aggregateStatistics(LocalStatisticsGatheringContext ctx) {
+        if (checkCancelled(ctx))
+            return;
+
+        StatisticsKey key = ctx.configuration().key();
+        Collection<ObjectPartitionStatisticsImpl> allParts = 
statRepo.getLocalPartitionsStatistics(key);
+
+        if (ctx.byObsolescence()) {
+            if (ctx.configuration() == null)
+                return;
+
+            statRepo.aggregatedLocalStatistics(allParts, ctx.configuration());
+        }
+        else {
+            Set<Integer> partsToRemove = new HashSet<>();
+            Collection<ObjectPartitionStatisticsImpl> partsToAggregate = new 
ArrayList<>();
+
+            for (ObjectPartitionStatisticsImpl partStat : allParts) {
+                if (ctx.allParts() == null || 
!ctx.allParts().contains(partStat.partId()))
+                    partsToRemove.add(partStat.partId());
+                else
+                    partsToAggregate.add(partStat);
+            }
+
+            if (!partsToRemove.isEmpty())
+                
statRepo.clearLocalPartitionIdsStatistics(ctx.configuration().key(), 
partsToRemove);
+
+            if (!partsToAggregate.isEmpty())
+                statRepo.aggregatedLocalStatistics(partsToAggregate, 
ctx.configuration());
+        }
+    }
+
+    /**
+     * Check if specified context cancelled. If so - log debug message and 
return {@code true}.
+     *
+     * @param ctx Gathering context to check.
+     * @return {@code true} if context was cancelled, {@code false} - 
otherwise.z
+     */
+    private boolean checkCancelled(LocalStatisticsGatheringContext ctx) {
+        if (ctx.future().isCancelled()) {
+            if (log.isDebugEnabled())
+                log.debug("Gathering context by key " + 
ctx.configuration().key() + " cancelled.");
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Try to get busyLock and check active state. Return success flag.
+     * If unsuccess - release busyLock.
+     *
+     * @param operation Text description of operation to log.
+     * @return {@code true}
+     */
+    private boolean startJob(String operation) {
+        if (!busyLock.enterBusy()) {
+            if (log.isDebugEnabled())
+                log.debug("Unable to start " + operation + " due to inactive 
state.");
+
+            return false;
+        }
+
+        if (!active) {
+            if (log.isDebugEnabled())
+                log.debug("Unable to start " + operation + " due to inactive 
state.");
+
+            busyLock.leaveBusy();
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Just unlock busyLock.
+     */
+    private void endJob() {
+        busyLock.leaveBusy();
+    }
+
+    /**
+     * Mark partition task failed. If that was the last partition -
+     * finalize ctx and remove it from gathering in progress.
+     *
+     * @param ctx Context to fishish partition in.
+     * @param partId Partition id.
+     */
+    private void failPartTask(LocalStatisticsGatheringContext ctx, int partId) 
{
+        // Partition skipped
+        ctx.finished().countDown();
+
+        // No need to gather rest partitions.
+        ctx.future().cancel(true);
+
+        if (log.isDebugEnabled())
+            log.debug(String.format("Gathering failed for key %s.%d ", 
ctx.configuration().key(), partId));
+
+        if (ctx.partitionNotAvailable(partId)) {
+            gatheringInProgress.remove(ctx.configuration().key(), ctx);
+
+            assert ctx.finished().getCount() == 0;
+
+            if (log.isDebugEnabled())
+                log.debug(String.format("Gathering removed for key %s", 
ctx.configuration().key()));
+        }
+    }
+
+    /**
+     * Submit partition gathering task.
+     *
+     * @param ctx Gathering context to track state.
+     * @param task Gathering task to proceed.
+     */
+    private void submitTask(
+        final LocalStatisticsGatheringContext ctx,
+        final GatherPartitionStatistics task
+    ) {
+        gatherPool.submit(() -> {
+            if (!startJob("Gathering partition statistics by key " + 
ctx.configuration().key())) {
+                failPartTask(ctx, task.partition());
+
+                return;
+            }
+            try {
+                GatheredPartitionResult gatherRes = new 
GatheredPartitionResult();
+
+                try {
+                    gatherCtxPartition(ctx, task, gatherRes);
+
+                    completePartitionStatistic(gatherRes.newPart, ctx, 
task.partition(), gatherRes.partStats);
+                }
+                catch (Throwable t) {
+                    //failPartTask(ctx, task.partition());
+                    gatherRes.partStats = null;
+
+                    if (t instanceof GatherStatisticCancelException) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Collect statistics task was cancelled " 
+
+                                "[key=" + ctx.configuration().key() + ", 
part=" + task.partition() + ']');
+                        }
+                    }
+                    else if (t.getCause() instanceof NodeStoppingException) {
+                        if (log.isDebugEnabled())
+                            log.debug("Node stopping during statistics 
collection on " +
+                                "[key=" + ctx.configuration().key() + ", 
part=" + task.partition() + ']');
+                    }
+                    else
+                        log.warning("Unexpected error on statistic gathering", 
t);
+                }
+                finally {
+                    if (gatherRes.partStats == null)
+                        failPartTask(ctx, task.partition());
+                    else {
+                        // Finish partition task
+                        ctx.finished().countDown();
+
+                        if (ctx.partitionDone(task.partition()))
+                            
gatheringInProgress.remove(ctx.configuration().key(), ctx);
+                    }
+                }
+            }
+            finally {
+                endJob();
+            }
+        });
+    }
+
+    private void gatherCtxPartition(
+        final LocalStatisticsGatheringContext ctx,
+        final GatherPartitionStatistics task,
+        GatheredPartitionResult result
+    ) {
+        if (!ctx.byObsolescence()) {
+            // Try to use existing partition statistics instead of gather new 
one
+            ObjectPartitionStatisticsImpl existingPartStat = 
statRepo.getLocalPartitionStatistics(
+                ctx.configuration().key(), task.partition());
+
+            if (existingPartStat != null && partStatFit(existingPartStat, 
ctx.configuration()))
+                result.partStats = existingPartStat;
+        }
+
+        result.newPart = result.partStats == null;
+
+        if (result.partStats == null)
+            result.partStats = task.call();
+    }
+
+    /**
+     * Check if specified object stistic fully meet specified statistics 
object configuration.
+     *
+     * @param stat Object statistics to test.
+     * @param cfg Statistics object configuration to compare with.
+     * @return {@code true} if specified statistics fully meet to specified 
configuration requiremenrs,
+     *         {@code false} - otherwise.
+     */
+    private boolean partStatFit(ObjectStatisticsImpl stat, 
StatisticsObjectConfiguration cfg) {
+        if (stat == null)
+            return false;
+
+        if (stat.columnsStatistics().size() != cfg.columns().size())
+            return false;
+
+        for (StatisticsColumnConfiguration colCfg : cfg.columns().values()) {
+            ColumnStatistics colStat = stat.columnStatistics(colCfg.name());
+
+            if (colStat == null || colCfg.version() > colStat.version())
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Run task on busy lock.
+     *
+     * @param r Task to run.
+     */
+    public void busyRun(Runnable r) {
+        if (!busyLock.enterBusy())
+            return;
+
+        try {
+            if (!active)
+                return;
+
+            r.run();
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+
+    }
+
+    /**
+     * Complete gathering of partition statistics: save to repository and try 
to complete whole task.
+     *
+     * @param newStat If {@code true} - partition statitsics was just gathered 
and need to be saved to repo.
+     * @param ctx Gathering context.
+     * @param partId Partition id.
+     * @param partStat Collected statistics or {@code null} if it was 
impossible to gather current partition.
+     */
+    private void completePartitionStatistic(
+        boolean newStat,
+        LocalStatisticsGatheringContext ctx,
+        int partId,
+        ObjectPartitionStatisticsImpl partStat
+    ) {
+        if (ctx.future().isCancelled() || partStat == null)
+            return;
+
+        StatisticsKey key = ctx.configuration().key();
+
+        if (newStat) {
+            if (ctx.configuration().columns().size() == 
partStat.columnsStatistics().size())
+                statRepo.refreshObsolescence(key, partId);
+
+            statRepo.replaceLocalPartitionStatistics(key, partStat);
+            //statRepo.saveLocalPartitionStatistics(key, partStat);

Review comment:
       Removed

##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
##########
@@ -400,10 +456,21 @@ public synchronized void processObsolescence() {
                     taskParts.add(k);
             });
 
-            if (!taskParts.isEmpty())
-                res.put(key, taskParts);
+            // Will add even empty list of partitions to recollect just to 
force obsolescence info to be stored.
+            res.put(finalCfg, taskParts);
         }
 
         return res;
     }
+
+    /**
+     * Check that cluster statistics usage state is not OFF and cluster is 
active.
+     *
+     * @param op Operation name.
+     */
+    public void checkStatisticsState(String op) {

Review comment:
       Done

##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsProcessor.java
##########
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import 
org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import 
org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import 
org.apache.ignite.internal.processors.query.stat.task.GatherPartitionStatistics;
+import org.apache.ignite.internal.util.GridBusyLock;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * Process all tasks, related to statistics repository. Mostly - statistics 
collection,
+ * invalidation (due to configuration, topology or obsolescence issues) and 
loads.
+ * Input tasks should be scheduled throug management pool while gathering pool 
used to process heavy
+ * operations in parallel.
+ *
+ * Manage gathering pool and it's jobs. To guarantee gracefull shutdown:
+ * 1) Any job can be added into gatheringInProgress only in active state 
(check after adding)
+ * 2) State can be disactivated only after cancelling all jobs and getting 
busyLock block
+ * 3) Each job should do it's work in busyLock with periodically checking of 
it's cancellation status.
+ */
+public class StatisticsProcessor {
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Ignite statistics repository. */
+    private final IgniteStatisticsRepository statRepo;
+
+    /** Ignite Thread pool executor to do statistics collection tasks. */
+    private final IgniteThreadPoolExecutor gatherPool;
+
+    /** (cacheGroupId -> gather context) */
+    private final ConcurrentMap<StatisticsKey, 
LocalStatisticsGatheringContext> gatheringInProgress =
+        new ConcurrentHashMap<>();
+
+    /** Active flag (used to skip commands in inactive cluster.) */
+    private volatile boolean active;
+
+    /* Lock protection of started gathering during deactivation. */
+    private static final GridBusyLock busyLock = new GridBusyLock();
+
+    /**
+     * Constructor.
+     *
+     * @param repo IgniteStatisticsRepository.
+     * @param gatherPool Thread pool to gather statistics in.
+     * @param logSupplier Log supplier function.
+     */
+    public StatisticsProcessor(
+        IgniteStatisticsRepository repo,
+        IgniteThreadPoolExecutor gatherPool,
+        Function<Class<?>, IgniteLogger> logSupplier
+    ) {
+        this.statRepo = repo;
+        this.gatherPool = gatherPool;
+        this.log = logSupplier.apply(StatisticsProcessor.class);
+    }
+
+    /**
+     * Update statistics for the given key to actual state.
+     * If byObsolescence and tbl is not {@code null} - does not clear any 
other partitions.
+     * Should run throw management pool only.
+     * 1) Replace previous gathering context if exist and needed (replace 
byObsolescence gathering with new one or
+     * replace gathering with older configuration or topology version with new 
one).
+     * 2) If byObsolescence and no table awailable - clean obsolescence and 
partition statistics for the given key.
+     * 3) Submit tasks for each specified partition.
+     * 4) after last task finish gathering - it starts aggregation.
+     * 5) read all partitions & obsolescence from repo and
+     * if byObsolescence = {@code true} - remove unnecessary one and aggregate 
by specified list
+     * if byObsolescence = {@code false} - aggregate all presented in store 
(because it should contains only actual ones)
+     * 5) save aggregated local statistics
+     *
+     * @param byObsolescence Update only obsolescence partitions.
+     * @param tbl Table to update. If {@code null} - just clear all partitions 
and obsolescence from the repo
+     * @param cfg Statistics configuration to use.
+     * @param partsToProcess Partitions to update, if !byObsolescence - all 
primary partitions for the given topology.
+     * @param topVer Topology version, can be {@code null} if tbl is null.
+     */
+    public void updateKeyAsync(
+        boolean byObsolescence,
+        GridH2Table tbl,
+        StatisticsObjectConfiguration cfg,
+        Set<Integer> partsToProcess,
+        AffinityTopologyVersion topVer
+    ) {
+        if (!startJob("Updating key " + cfg.key()))

Review comment:
       Removed.

##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
##########
@@ -297,78 +332,104 @@ public IgniteStatisticsConfigurationManager 
statisticConfiguration() {
 
     /** {@inheritDoc} */
     @Override public StatisticsUsageState usageState() {
-        return usageState.getOrDefault(DEFAULT_STATISTICS_USAGE_STATE);
+        return (lastUsageState == null) ? DEFAULT_STATISTICS_USAGE_STATE : 
lastUsageState;
     }
 
     /** {@inheritDoc} */
     @Override public void onRowUpdated(String schemaName, String objName, int 
partId, byte[] keyBytes) {
-        try {
-            if (statCfgMgr.config(new StatisticsKey(schemaName, objName)) != 
null)
-                statsRepos.addRowsModified(new StatisticsKey(schemaName, 
objName), partId, keyBytes);
-        }
-        catch (IgniteCheckedException e) {
-            if (log.isInfoEnabled())
-                log.info(String.format("Error while obsolescence key in %s.%s 
due to %s", schemaName, objName,
-                    e.getMessage()));
-        }
+        statsRepos.addRowsModified(new StatisticsKey(schemaName, objName), 
partId, keyBytes);
     }
 
     /**
      * Save dirty obsolescence info to local metastore. Check if statistics 
need to be refreshed and schedule it.
+     *
+     * 1) Get all dirty partition statistics.
+     * 2) Make separate tasks for each key to avoid saving obsolescence info 
for removed partition (race).
+     * 3) Check if partition should be recollected and add it to list in its 
tables task.
+     * 4) Submit tasks. Actually obsolescence info will be stored during task 
processing.
      */
     public synchronized void processObsolescence() {
-        Map<StatisticsKey, IntMap<ObjectPartitionStatisticsObsolescence>> 
dirty = statsRepos.saveObsolescenceInfo();
+        StatisticsUsageState state = usageState();
 
-        Map<StatisticsKey, List<Integer>> tasks = 
calculateObsolescenceRefreshTasks(dirty);
-
-        if (!F.isEmpty(tasks))
-            if (log.isTraceEnabled())
-                log.trace(String.format("Refreshing statistics for %d 
targets", tasks.size()));
+        if (state != ON || ctx.isStopping()) {
+            if (log.isDebugEnabled())
+                log.debug("Skipping obsolescence processing.");
 
-        for (Map.Entry<StatisticsKey, List<Integer>> objTask : 
tasks.entrySet()) {
-            GridH2Table tbl = schemaMgr.dataTable(objTask.getKey().schema(), 
objTask.getKey().obj());
+            return;
+        }
 
-            if (tbl == null) {
-                if (log.isDebugEnabled())
-                    log.debug(String.format("Got obsolescence statistics for 
unknown table %s", objTask.getKey()));
+        if (log.isTraceEnabled())
+            log.trace("Process statistics obsolescence started.");
 
-                continue;
-            }
+        Map<StatisticsKey, IntMap<ObjectPartitionStatisticsObsolescence>> 
dirty = statsRepos.getDirtyObsolescenceInfo();
 
-            StatisticsObjectConfiguration objCfg;
-            try {
-                objCfg = statCfgMgr.config(objTask.getKey());
-            } catch (IgniteCheckedException e) {
-                log.warning("Unable to load statistics object configuration 
from global metastore", e);
-                continue;
-            }
+        if (F.isEmpty(dirty)) {
+            if (log.isTraceEnabled())
+                log.trace("No dirty obsolescence info found. Finish 
obsolescence processing.");
 
-            if (objCfg == null) {
-                if (log.isDebugEnabled())
-                    log.debug(String.format("Got obsolescence statistics for 
unknown configuration %s", objTask.getKey()));
+            return;
+        }
+        else {
+            if (log.isTraceEnabled())
+                log.trace(String.format("Scheduling obsolescence savings for 
%d targets", dirty.size()));
+        }
 
-                continue;
-            }
+        Map<StatisticsObjectConfiguration, List<Integer>> tasks = 
calculateObsolescenceRefreshTasks(dirty);
 
-            GridCacheContext cctx = tbl.cacheContext();
+        for (Map.Entry<StatisticsObjectConfiguration, List<Integer>> objTask : 
tasks.entrySet()) {
+            StatisticsKey key = objTask.getKey().key();
+            GridH2Table tbl = schemaMgr.dataTable(key.schema(), key.obj());
 
-            Set<Integer> parts = cctx.affinity().primaryPartitions(
-                cctx.localNodeId(), cctx.affinity().affinityTopologyVersion());
+            if (tbl == null) {
+                // Table can be removed earlier, but not already processed. Or 
somethink goes wrong. Try to reschedule.
+                if (log.isDebugEnabled())
+                    log.debug(String.format("Got obsolescence statistics for 
unknown table %s", objTask.getKey()));
+            }
 
-            statCfgMgr.gatherLocalStatistics(objCfg, tbl, parts, new 
HashSet<>(objTask.getValue()), null);
+            statProc.updateKeyAsync(true, tbl, objTask.getKey(), new 
HashSet<>(objTask.getValue()),
+                null);
+//            if (objTask.getValue().isEmpty()) {
+//                // Just save or totally remove obsolescence info, no 
additional operations needed.
+//                statProc.updateKeyAsync(true, tbl, objTask.getKey(), 
Collections.emptySet(), null);
+//            }
+//            else {
+//                // Schedule full gathering.
+//                GridCacheContext<?, ?> cctx = (tbl == null) ? null : 
tbl.cacheContext();
+//
+//                AffinityTopologyVersion topVer = null;
+//
+//                if (!cctx.gate().enterIfNotStopped())
+//                    continue;
+//
+//                try {
+//                    topVer = cctx.affinity().affinityTopologyVersion();
+//                    cctx.affinity().affinityReadyFuture(topVer).get();
+//                }
+//                catch (IgniteCheckedException e) {
+//                    log.warning("Unable to get topology ready.", e);
+//                }
+//                finally {
+//                    cctx.gate().leave();
+//                }
+//
+//                statProc.updateKeyAsync(true, tbl, objTask.getKey(), new 
HashSet<>(objTask.getValue()),
+//                    topVer);
+//
+//            }

Review comment:
       Removed

##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsProcessor.java
##########
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import 
org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import 
org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import 
org.apache.ignite.internal.processors.query.stat.task.GatherPartitionStatistics;
+import org.apache.ignite.internal.util.GridBusyLock;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * Process all tasks, related to statistics repository. Mostly - statistics 
collection,
+ * invalidation (due to configuration, topology or obsolescence issues) and 
loads.
+ * Input tasks should be scheduled throug management pool while gathering pool 
used to process heavy
+ * operations in parallel.
+ *
+ * Manage gathering pool and it's jobs. To guarantee gracefull shutdown:
+ * 1) Any job can be added into gatheringInProgress only in active state 
(check after adding)
+ * 2) State can be disactivated only after cancelling all jobs and getting 
busyLock block
+ * 3) Each job should do it's work in busyLock with periodically checking of 
it's cancellation status.
+ */
+public class StatisticsProcessor {
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Ignite statistics repository. */
+    private final IgniteStatisticsRepository statRepo;
+
+    /** Ignite Thread pool executor to do statistics collection tasks. */
+    private final IgniteThreadPoolExecutor gatherPool;
+
+    /** (cacheGroupId -> gather context) */
+    private final ConcurrentMap<StatisticsKey, 
LocalStatisticsGatheringContext> gatheringInProgress =
+        new ConcurrentHashMap<>();
+
+    /** Active flag (used to skip commands in inactive cluster.) */
+    private volatile boolean active;
+
+    /* Lock protection of started gathering during deactivation. */
+    private static final GridBusyLock busyLock = new GridBusyLock();
+
+    /**
+     * Constructor.
+     *
+     * @param repo IgniteStatisticsRepository.
+     * @param gatherPool Thread pool to gather statistics in.
+     * @param logSupplier Log supplier function.
+     */
+    public StatisticsProcessor(
+        IgniteStatisticsRepository repo,
+        IgniteThreadPoolExecutor gatherPool,
+        Function<Class<?>, IgniteLogger> logSupplier
+    ) {
+        this.statRepo = repo;
+        this.gatherPool = gatherPool;
+        this.log = logSupplier.apply(StatisticsProcessor.class);
+    }
+
+    /**
+     * Update statistics for the given key to actual state.
+     * If byObsolescence and tbl is not {@code null} - does not clear any 
other partitions.
+     * Should run throw management pool only.
+     * 1) Replace previous gathering context if exist and needed (replace 
byObsolescence gathering with new one or
+     * replace gathering with older configuration or topology version with new 
one).
+     * 2) If byObsolescence and no table awailable - clean obsolescence and 
partition statistics for the given key.
+     * 3) Submit tasks for each specified partition.
+     * 4) after last task finish gathering - it starts aggregation.
+     * 5) read all partitions & obsolescence from repo and
+     * if byObsolescence = {@code true} - remove unnecessary one and aggregate 
by specified list
+     * if byObsolescence = {@code false} - aggregate all presented in store 
(because it should contains only actual ones)
+     * 5) save aggregated local statistics
+     *
+     * @param byObsolescence Update only obsolescence partitions.
+     * @param tbl Table to update. If {@code null} - just clear all partitions 
and obsolescence from the repo
+     * @param cfg Statistics configuration to use.
+     * @param partsToProcess Partitions to update, if !byObsolescence - all 
primary partitions for the given topology.
+     * @param topVer Topology version, can be {@code null} if tbl is null.
+     */
+    public void updateKeyAsync(
+        boolean byObsolescence,
+        GridH2Table tbl,
+        StatisticsObjectConfiguration cfg,
+        Set<Integer> partsToProcess,
+        AffinityTopologyVersion topVer
+    ) {
+        if (!startJob("Updating key " + cfg.key()))
+            return;
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format(
+                    "Start statistics processing: byObsolescence=%b, cfg=%s, 
partToProcess = %s, topVer=%s",
+                    byObsolescence, cfg, partsToProcess, topVer));
+            }
+
+            LocalStatisticsGatheringContext newCtx = new 
LocalStatisticsGatheringContext(byObsolescence, tbl, cfg,
+                partsToProcess, topVer);
+            LocalStatisticsGatheringContext registeredCtx = 
registerNewTask(newCtx);
+
+            if (registeredCtx != null) {
+
+                prepareTask(registeredCtx);
+
+                if (registeredCtx.table() != null && 
!registeredCtx.remainingParts().isEmpty() &&
+                    !registeredCtx.configuration().columns().isEmpty())
+                    submitTasks(registeredCtx);
+                else {
+                    
gatheringInProgress.remove(registeredCtx.configuration().key(), registeredCtx);
+
+                    assert registeredCtx.remainingParts().isEmpty();
+                    assert registeredCtx.finished().getCount() == 0;
+                }
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Gathered by key " + cfg.key() + " were skipped 
due to previous one.");
+            }
+        }
+        finally {
+            endJob();
+        }
+    }
+
+    /**
+     * Do preparation step before schedule any gathering.
+     *
+     * @param ctx Context to do preparations.
+     */
+    private void prepareTask(LocalStatisticsGatheringContext ctx) {
+        try {
+            if (ctx.byObsolescence())
+                statRepo.saveObsolescenceInfo(ctx.configuration().key());
+
+            if (ctx.table() == null || ctx.configuration().columns().isEmpty())
+                
statRepo.clearLocalPartitionsStatistics(ctx.configuration().key(), null);
+        }
+        catch (Throwable t) {
+            ctx.future().cancel(true);
+        }
+        finally {
+            // Prepare phase done
+            ctx.finished().countDown();
+        }
+    }
+
+    /**
+     * Try to register new task.
+     *
+     * @param ctx Task to register.
+     * @return Registered task.
+     */
+    private LocalStatisticsGatheringContext 
registerNewTask(LocalStatisticsGatheringContext ctx) {
+        LocalStatisticsGatheringContext ctxToSubmit[] = new 
LocalStatisticsGatheringContext[1];
+        LocalStatisticsGatheringContext ctxToAwait[] = new 
LocalStatisticsGatheringContext[1];

Review comment:
       Rewriten to boolean result.

##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
##########
@@ -297,78 +332,104 @@ public IgniteStatisticsConfigurationManager 
statisticConfiguration() {
 
     /** {@inheritDoc} */
     @Override public StatisticsUsageState usageState() {
-        return usageState.getOrDefault(DEFAULT_STATISTICS_USAGE_STATE);
+        return (lastUsageState == null) ? DEFAULT_STATISTICS_USAGE_STATE : 
lastUsageState;
     }
 
     /** {@inheritDoc} */
     @Override public void onRowUpdated(String schemaName, String objName, int 
partId, byte[] keyBytes) {
-        try {
-            if (statCfgMgr.config(new StatisticsKey(schemaName, objName)) != 
null)
-                statsRepos.addRowsModified(new StatisticsKey(schemaName, 
objName), partId, keyBytes);
-        }
-        catch (IgniteCheckedException e) {
-            if (log.isInfoEnabled())
-                log.info(String.format("Error while obsolescence key in %s.%s 
due to %s", schemaName, objName,
-                    e.getMessage()));
-        }
+        statsRepos.addRowsModified(new StatisticsKey(schemaName, objName), 
partId, keyBytes);
     }
 
     /**
      * Save dirty obsolescence info to local metastore. Check if statistics 
need to be refreshed and schedule it.
+     *
+     * 1) Get all dirty partition statistics.
+     * 2) Make separate tasks for each key to avoid saving obsolescence info 
for removed partition (race).
+     * 3) Check if partition should be recollected and add it to list in its 
tables task.
+     * 4) Submit tasks. Actually obsolescence info will be stored during task 
processing.
      */
     public synchronized void processObsolescence() {
-        Map<StatisticsKey, IntMap<ObjectPartitionStatisticsObsolescence>> 
dirty = statsRepos.saveObsolescenceInfo();
+        StatisticsUsageState state = usageState();
 
-        Map<StatisticsKey, List<Integer>> tasks = 
calculateObsolescenceRefreshTasks(dirty);
-
-        if (!F.isEmpty(tasks))
-            if (log.isTraceEnabled())
-                log.trace(String.format("Refreshing statistics for %d 
targets", tasks.size()));
+        if (state != ON || ctx.isStopping()) {
+            if (log.isDebugEnabled())
+                log.debug("Skipping obsolescence processing.");
 
-        for (Map.Entry<StatisticsKey, List<Integer>> objTask : 
tasks.entrySet()) {
-            GridH2Table tbl = schemaMgr.dataTable(objTask.getKey().schema(), 
objTask.getKey().obj());
+            return;
+        }

Review comment:
       Context switches state to stopping without notification.

##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsProcessor.java
##########
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import 
org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import 
org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import 
org.apache.ignite.internal.processors.query.stat.task.GatherPartitionStatistics;
+import org.apache.ignite.internal.util.GridBusyLock;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * Process all tasks, related to statistics repository. Mostly - statistics 
collection,
+ * invalidation (due to configuration, topology or obsolescence issues) and 
loads.
+ * Input tasks should be scheduled throug management pool while gathering pool 
used to process heavy
+ * operations in parallel.
+ *
+ * Manage gathering pool and it's jobs. To guarantee gracefull shutdown:
+ * 1) Any job can be added into gatheringInProgress only in active state 
(check after adding)
+ * 2) State can be disactivated only after cancelling all jobs and getting 
busyLock block
+ * 3) Each job should do it's work in busyLock with periodically checking of 
it's cancellation status.
+ */
+public class StatisticsProcessor {
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Ignite statistics repository. */
+    private final IgniteStatisticsRepository statRepo;
+
+    /** Ignite Thread pool executor to do statistics collection tasks. */
+    private final IgniteThreadPoolExecutor gatherPool;
+
+    /** (cacheGroupId -> gather context) */
+    private final ConcurrentMap<StatisticsKey, 
LocalStatisticsGatheringContext> gatheringInProgress =
+        new ConcurrentHashMap<>();
+
+    /** Active flag (used to skip commands in inactive cluster.) */
+    private volatile boolean active;
+
+    /* Lock protection of started gathering during deactivation. */
+    private static final GridBusyLock busyLock = new GridBusyLock();
+
+    /**
+     * Constructor.
+     *
+     * @param repo IgniteStatisticsRepository.
+     * @param gatherPool Thread pool to gather statistics in.
+     * @param logSupplier Log supplier function.
+     */
+    public StatisticsProcessor(
+        IgniteStatisticsRepository repo,
+        IgniteThreadPoolExecutor gatherPool,
+        Function<Class<?>, IgniteLogger> logSupplier
+    ) {
+        this.statRepo = repo;
+        this.gatherPool = gatherPool;
+        this.log = logSupplier.apply(StatisticsProcessor.class);
+    }
+
+    /**
+     * Update statistics for the given key to actual state.
+     * If byObsolescence and tbl is not {@code null} - does not clear any 
other partitions.
+     * Should run throw management pool only.
+     * 1) Replace previous gathering context if exist and needed (replace 
byObsolescence gathering with new one or
+     * replace gathering with older configuration or topology version with new 
one).
+     * 2) If byObsolescence and no table awailable - clean obsolescence and 
partition statistics for the given key.
+     * 3) Submit tasks for each specified partition.
+     * 4) after last task finish gathering - it starts aggregation.
+     * 5) read all partitions & obsolescence from repo and
+     * if byObsolescence = {@code true} - remove unnecessary one and aggregate 
by specified list
+     * if byObsolescence = {@code false} - aggregate all presented in store 
(because it should contains only actual ones)
+     * 5) save aggregated local statistics
+     *
+     * @param byObsolescence Update only obsolescence partitions.
+     * @param tbl Table to update. If {@code null} - just clear all partitions 
and obsolescence from the repo
+     * @param cfg Statistics configuration to use.
+     * @param partsToProcess Partitions to update, if !byObsolescence - all 
primary partitions for the given topology.
+     * @param topVer Topology version, can be {@code null} if tbl is null.
+     */
+    public void updateKeyAsync(
+        boolean byObsolescence,
+        GridH2Table tbl,
+        StatisticsObjectConfiguration cfg,
+        Set<Integer> partsToProcess,
+        AffinityTopologyVersion topVer
+    ) {
+        if (!startJob("Updating key " + cfg.key()))
+            return;
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format(
+                    "Start statistics processing: byObsolescence=%b, cfg=%s, 
partToProcess = %s, topVer=%s",
+                    byObsolescence, cfg, partsToProcess, topVer));
+            }
+
+            LocalStatisticsGatheringContext newCtx = new 
LocalStatisticsGatheringContext(byObsolescence, tbl, cfg,
+                partsToProcess, topVer);
+            LocalStatisticsGatheringContext registeredCtx = 
registerNewTask(newCtx);
+
+            if (registeredCtx != null) {
+
+                prepareTask(registeredCtx);
+
+                if (registeredCtx.table() != null && 
!registeredCtx.remainingParts().isEmpty() &&
+                    !registeredCtx.configuration().columns().isEmpty())
+                    submitTasks(registeredCtx);
+                else {
+                    
gatheringInProgress.remove(registeredCtx.configuration().key(), registeredCtx);
+
+                    assert registeredCtx.remainingParts().isEmpty();
+                    assert registeredCtx.finished().getCount() == 0;
+                }
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Gathered by key " + cfg.key() + " were skipped 
due to previous one.");
+            }
+        }
+        finally {
+            endJob();
+        }
+    }
+
+    /**
+     * Do preparation step before schedule any gathering.
+     *
+     * @param ctx Context to do preparations.
+     */
+    private void prepareTask(LocalStatisticsGatheringContext ctx) {
+        try {
+            if (ctx.byObsolescence())
+                statRepo.saveObsolescenceInfo(ctx.configuration().key());
+
+            if (ctx.table() == null || ctx.configuration().columns().isEmpty())
+                
statRepo.clearLocalPartitionsStatistics(ctx.configuration().key(), null);
+        }
+        catch (Throwable t) {
+            ctx.future().cancel(true);
+        }
+        finally {
+            // Prepare phase done
+            ctx.finished().countDown();
+        }
+    }
+
+    /**
+     * Try to register new task.
+     *
+     * @param ctx Task to register.
+     * @return Registered task.
+     */
+    private LocalStatisticsGatheringContext 
registerNewTask(LocalStatisticsGatheringContext ctx) {
+        LocalStatisticsGatheringContext ctxToSubmit[] = new 
LocalStatisticsGatheringContext[1];
+        LocalStatisticsGatheringContext ctxToAwait[] = new 
LocalStatisticsGatheringContext[1];
+
+        gatheringInProgress.compute(ctx.configuration().key(), (k, v) -> {
+            if (v == null) {
+                // No old context - start new
+                ctxToSubmit[0] = ctx;
+
+                return ctx;
+            }
+
+            if ((v.byObsolescence() && ctx.byObsolescence()) ||
+                (v.topologyVersion().before(ctx.topologyVersion()) ||
+                    !checkStatisticsCfg(v.configuration(), 
ctx.configuration()))) {
+                // Old context for older topology or config - cancel and start 
new
+                v.future().cancel(true);
+                ctxToAwait[0] = v;
+
+                ctxToSubmit[0] = ctx;
+
+                return ctx;
+            }
+
+            ctxToSubmit[0] = null;
+
+            return v;
+        });
+
+        // Can't wait in map critical section (to allow task to try to remove 
itselves), but
+        // have to wait here in busyLock to do gracefull shutdown.
+        if (ctxToAwait[0] != null) {
+            try {
+                ctxToAwait[0].finished().await();
+            }
+            catch (InterruptedException e) {
+                log.warning("Unable to wait statistics gathering task finished 
by key " +
+                    ctx.configuration().key(), e);
+            }
+        }
+
+        return ctxToSubmit[0];
+    }
+
+    /**
+     * Test if statistics configuration is fit to all required versions and 
doesn't
+     * contain any extra column configurations.
+     *
+     * @param existingCfg Statistics configuration to check.
+     * @param targetCfg Target configuration.
+     * @return {@code true} if it is, {@code false} otherwise.
+     */
+    private boolean checkStatisticsCfg(
+        StatisticsObjectConfiguration existingCfg,
+        StatisticsObjectConfiguration targetCfg
+    ) {
+        if (existingCfg == targetCfg)
+            return true;
+
+        if (existingCfg.columns().size() != targetCfg.columns().size())
+            return false;
+
+        for (Map.Entry<String, StatisticsColumnConfiguration> 
targetColCfgEntry : targetCfg.columns().entrySet()) {
+            StatisticsColumnConfiguration existingColCfg = 
existingCfg.columns().get(targetColCfgEntry.getKey());
+
+            if (existingColCfg == null || existingColCfg.version() < 
targetColCfgEntry.getValue().version())
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Generate and subtim tasks into gathering pool for existing gathering 
context.
+     *
+     * @param ctx Context to generate tasks by.
+     */
+    private void submitTasks(LocalStatisticsGatheringContext ctx) {
+        for (int part : ctx.remainingParts()) {
+            final GatherPartitionStatistics task = new 
GatherPartitionStatistics(
+                statRepo,
+                ctx,
+                part,
+                log
+            );
+
+            submitTask(ctx, task);
+        }
+    }
+
+    /**
+     *
+     * @param ctx
+     */
+    private void aggregateStatistics(LocalStatisticsGatheringContext ctx) {
+        if (checkCancelled(ctx))
+            return;
+
+        StatisticsKey key = ctx.configuration().key();
+        Collection<ObjectPartitionStatisticsImpl> allParts = 
statRepo.getLocalPartitionsStatistics(key);
+
+        if (ctx.byObsolescence()) {
+            if (ctx.configuration() == null)
+                return;
+
+            statRepo.aggregatedLocalStatistics(allParts, ctx.configuration());
+        }
+        else {
+            Set<Integer> partsToRemove = new HashSet<>();
+            Collection<ObjectPartitionStatisticsImpl> partsToAggregate = new 
ArrayList<>();
+
+            for (ObjectPartitionStatisticsImpl partStat : allParts) {
+                if (ctx.allParts() == null || 
!ctx.allParts().contains(partStat.partId()))
+                    partsToRemove.add(partStat.partId());
+                else
+                    partsToAggregate.add(partStat);
+            }
+
+            if (!partsToRemove.isEmpty())
+                
statRepo.clearLocalPartitionIdsStatistics(ctx.configuration().key(), 
partsToRemove);
+
+            if (!partsToAggregate.isEmpty())
+                statRepo.aggregatedLocalStatistics(partsToAggregate, 
ctx.configuration());
+        }
+    }
+
+    /**
+     * Check if specified context cancelled. If so - log debug message and 
return {@code true}.
+     *
+     * @param ctx Gathering context to check.
+     * @return {@code true} if context was cancelled, {@code false} - 
otherwise.z
+     */
+    private boolean checkCancelled(LocalStatisticsGatheringContext ctx) {
+        if (ctx.future().isCancelled()) {
+            if (log.isDebugEnabled())
+                log.debug("Gathering context by key " + 
ctx.configuration().key() + " cancelled.");
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Try to get busyLock and check active state. Return success flag.
+     * If unsuccess - release busyLock.
+     *
+     * @param operation Text description of operation to log.
+     * @return {@code true}
+     */
+    private boolean startJob(String operation) {
+        if (!busyLock.enterBusy()) {
+            if (log.isDebugEnabled())
+                log.debug("Unable to start " + operation + " due to inactive 
state.");
+
+            return false;
+        }
+
+        if (!active) {
+            if (log.isDebugEnabled())
+                log.debug("Unable to start " + operation + " due to inactive 
state.");
+
+            busyLock.leaveBusy();
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Just unlock busyLock.
+     */
+    private void endJob() {
+        busyLock.leaveBusy();
+    }
+
+    /**
+     * Mark partition task failed. If that was the last partition -
+     * finalize ctx and remove it from gathering in progress.
+     *
+     * @param ctx Context to fishish partition in.
+     * @param partId Partition id.
+     */
+    private void failPartTask(LocalStatisticsGatheringContext ctx, int partId) 
{
+        // Partition skipped
+        ctx.finished().countDown();
+
+        // No need to gather rest partitions.
+        ctx.future().cancel(true);
+
+        if (log.isDebugEnabled())
+            log.debug(String.format("Gathering failed for key %s.%d ", 
ctx.configuration().key(), partId));
+
+        if (ctx.partitionNotAvailable(partId)) {
+            gatheringInProgress.remove(ctx.configuration().key(), ctx);
+
+            assert ctx.finished().getCount() == 0;
+
+            if (log.isDebugEnabled())
+                log.debug(String.format("Gathering removed for key %s", 
ctx.configuration().key()));
+        }
+    }
+
+    /**
+     * Submit partition gathering task.
+     *
+     * @param ctx Gathering context to track state.
+     * @param task Gathering task to proceed.
+     */
+    private void submitTask(
+        final LocalStatisticsGatheringContext ctx,
+        final GatherPartitionStatistics task
+    ) {
+        gatherPool.submit(() -> {
+            if (!startJob("Gathering partition statistics by key " + 
ctx.configuration().key())) {
+                failPartTask(ctx, task.partition());
+
+                return;
+            }
+            try {
+                GatheredPartitionResult gatherRes = new 
GatheredPartitionResult();
+
+                try {
+                    gatherCtxPartition(ctx, task, gatherRes);
+
+                    completePartitionStatistic(gatherRes.newPart, ctx, 
task.partition(), gatherRes.partStats);
+                }
+                catch (Throwable t) {
+                    //failPartTask(ctx, task.partition());

Review comment:
       Removed

##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepository.java
##########
@@ -528,47 +444,69 @@ public void 
updateObsolescenceInfo(Map<StatisticsObjectConfiguration, Set<Intege
 
         for (Map.Entry<StatisticsKey, Set<Integer>> objDeleted : 
deleted.entrySet())
             store.clearObsolescenceInfo(objDeleted.getKey(), 
objDeleted.getValue());
+
+        fitObsolescenceInfo(cfg);
     }
 
     /**
-     * Load or update obsolescence info cache to fit specified cfg.
+     * Check store to clean unnecessary records.
      *
      * @param cfg Map object statistics configuration to primary partitions 
set.
      */
-    public synchronized void 
checkObsolescenceInfo(Map<StatisticsObjectConfiguration, Set<Integer>> cfg) {
-        if (!started.compareAndSet(false, true))
-            loadObsolescenceInfo(cfg);
-        else
-            updateObsolescenceInfo(cfg);
+    private void fitObsolescenceInfo(Map<StatisticsObjectConfiguration, 
Set<Integer>> cfg) {

Review comment:
       Yes, it's better to choose an optimal data structure. Removed here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to