Berkof commented on a change in pull request #9423:
URL: https://github.com/apache/ignite/pull/9423#discussion_r715487617
##########
File path:
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
##########
@@ -163,61 +179,68 @@ else if (db == null)
if (log.isInfoEnabled())
log.info(String.format("Statistics usage state was changed
from %s to %s", oldVal, newVal));
+ lastUsageState = newVal;
+
if (oldVal == newVal)
return;
- switch (newVal) {
- case OFF:
- disableOperations();
-
- break;
- case ON:
- case NO_UPDATE:
- enableOperations();
-
- break;
- }
+ stateChanged();
Review comment:
done, but not start/stop - tryStart/tryStop because there are a lot of
conditions to be meet to start.
##########
File path:
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsProcessor.java
##########
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.stat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import
org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
+import
org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import
org.apache.ignite.internal.processors.query.stat.task.GatherPartitionStatistics;
+import org.apache.ignite.internal.util.GridBusyLock;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * Process all tasks, related to statistics repository. Mostly - statistics
collection,
+ * invalidation (due to configuration, topology or obsolescence issues) and
loads.
+ * Input tasks should be scheduled throug management pool while gathering pool
used to process heavy
+ * operations in parallel.
+ *
+ * Manage gathering pool and it's jobs. To guarantee gracefull shutdown:
+ * 1) Any job can be added into gatheringInProgress only in active state
(check after adding)
+ * 2) State can be disactivated only after cancelling all jobs and getting
busyLock block
+ * 3) Each job should do it's work in busyLock with periodically checking of
it's cancellation status.
+ */
+public class StatisticsProcessor {
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Ignite statistics repository. */
+ private final IgniteStatisticsRepository statRepo;
+
+ /** Ignite Thread pool executor to do statistics collection tasks. */
+ private final IgniteThreadPoolExecutor gatherPool;
+
+ /** (cacheGroupId -> gather context) */
+ private final ConcurrentMap<StatisticsKey,
LocalStatisticsGatheringContext> gatheringInProgress =
+ new ConcurrentHashMap<>();
+
+ /** Active flag (used to skip commands in inactive cluster.) */
+ private volatile boolean active;
+
+ /* Lock protection of started gathering during deactivation. */
+ private static final GridBusyLock busyLock = new GridBusyLock();
+
+ /**
+ * Constructor.
+ *
+ * @param repo IgniteStatisticsRepository.
+ * @param gatherPool Thread pool to gather statistics in.
+ * @param logSupplier Log supplier function.
+ */
+ public StatisticsProcessor(
+ IgniteStatisticsRepository repo,
+ IgniteThreadPoolExecutor gatherPool,
+ Function<Class<?>, IgniteLogger> logSupplier
+ ) {
+ this.statRepo = repo;
+ this.gatherPool = gatherPool;
+ this.log = logSupplier.apply(StatisticsProcessor.class);
+ }
+
+ /**
+ * Update statistics for the given key to actual state.
+ * If byObsolescence and tbl is not {@code null} - does not clear any
other partitions.
+ * Should run throw management pool only.
+ * 1) Replace previous gathering context if exist and needed (replace
byObsolescence gathering with new one or
+ * replace gathering with older configuration or topology version with new
one).
+ * 2) If byObsolescence and no table awailable - clean obsolescence and
partition statistics for the given key.
+ * 3) Submit tasks for each specified partition.
+ * 4) after last task finish gathering - it starts aggregation.
+ * 5) read all partitions & obsolescence from repo and
+ * if byObsolescence = {@code true} - remove unnecessary one and aggregate
by specified list
+ * if byObsolescence = {@code false} - aggregate all presented in store
(because it should contains only actual ones)
+ * 5) save aggregated local statistics
+ *
+ * @param byObsolescence Update only obsolescence partitions.
+ * @param tbl Table to update. If {@code null} - just clear all partitions
and obsolescence from the repo
+ * @param cfg Statistics configuration to use.
+ * @param partsToProcess Partitions to update, if !byObsolescence - all
primary partitions for the given topology.
+ * @param topVer Topology version, can be {@code null} if tbl is null.
+ */
+ public void updateKeyAsync(
+ boolean byObsolescence,
+ GridH2Table tbl,
+ StatisticsObjectConfiguration cfg,
+ Set<Integer> partsToProcess,
+ AffinityTopologyVersion topVer
+ ) {
+ if (!startJob("Updating key " + cfg.key()))
+ return;
+
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format(
+ "Start statistics processing: byObsolescence=%b, cfg=%s,
partToProcess = %s, topVer=%s",
+ byObsolescence, cfg, partsToProcess, topVer));
+ }
+
+ LocalStatisticsGatheringContext newCtx = new
LocalStatisticsGatheringContext(byObsolescence, tbl, cfg,
+ partsToProcess, topVer);
+ LocalStatisticsGatheringContext registeredCtx =
registerNewTask(newCtx);
+
+ if (registeredCtx != null) {
+
Review comment:
between two blocks if and if. Or what are you mean?
##########
File path:
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsInMemoryStoreImpl.java
##########
@@ -203,11 +239,43 @@ public
IgniteStatisticsInMemoryStoreImpl(Function<Class<?>, IgniteLogger> logSup
Collection<ObjectPartitionStatisticsImpl> statistics
) {
IntMap<ObjectPartitionStatisticsImpl> statisticsMap = new
IntHashMap<ObjectPartitionStatisticsImpl>();
+
for (ObjectPartitionStatisticsImpl s : statistics) {
if (statisticsMap.put(s.partId(), s) != null)
log.warning(String.format("Trying to save more than one %s.%s
partition statistics for partition %d",
key.schema(), key.obj(), s.partId()));
}
+
return statisticsMap;
}
+
+ /** {@inheritDoc} */
+ @Override public Collection<Integer> loadObsolescenceMap(StatisticsKey
key) {
+ Collection<Integer> res[] = new Collection[1];
+ res[0] = new ArrayList<>();
Review comment:
removed.
##########
File path:
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepository.java
##########
@@ -375,6 +292,15 @@ public ObjectStatisticsImpl
getLocalStatistics(StatisticsKey key) {
return locStats.get(key);
}
+ /**
+ * Get all local statistics. Return internal map without copying.
+ *
+ * @return Local (for current node) object statistics.
+ */
+ public Map<StatisticsKey, ObjectStatisticsImpl> getAllLocalStatisticsInt()
{
Review comment:
done
##########
File path:
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsInMemoryStoreImpl.java
##########
@@ -203,11 +239,43 @@ public
IgniteStatisticsInMemoryStoreImpl(Function<Class<?>, IgniteLogger> logSup
Collection<ObjectPartitionStatisticsImpl> statistics
) {
IntMap<ObjectPartitionStatisticsImpl> statisticsMap = new
IntHashMap<ObjectPartitionStatisticsImpl>();
+
for (ObjectPartitionStatisticsImpl s : statistics) {
if (statisticsMap.put(s.partId(), s) != null)
log.warning(String.format("Trying to save more than one %s.%s
partition statistics for partition %d",
key.schema(), key.obj(), s.partId()));
}
+
return statisticsMap;
}
+
+ /** {@inheritDoc} */
+ @Override public Collection<Integer> loadObsolescenceMap(StatisticsKey
key) {
+ Collection<Integer> res[] = new Collection[1];
+ res[0] = new ArrayList<>();
+
+ obsStats.computeIfPresent(key, (k, v) -> {
+ for (Integer partId : v.keys())
+ res[0].add(partId);
+
+ return v;
+ });
+
+ return res[0];
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Integer> loadLocalPartitionMap(StatisticsKey
key) {
+ Collection<Integer> res[] = new Collection[1];
+ res[0] = new ArrayList<>();
Review comment:
removed
##########
File path:
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
##########
@@ -144,10 +137,10 @@
}
};
- /** Exchange listener. */
+ /** Exchange listener to update all local statistics. */
private final PartitionsExchangeAware exchAwareLsnr = new
PartitionsExchangeAware() {
@Override public void
onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
Review comment:
To not to hold topology. Anyway, we schedule processing asynchronously.
##########
File path:
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsRepository.java
##########
@@ -353,6 +252,7 @@ public ObjectPartitionStatisticsImpl
getLocalPartitionStatistics(StatisticsKey k
*/
public void clearLocalPartitionStatistics(StatisticsKey key, int partId) {
Review comment:
All other methods use LocalPartition to emphasize that they operate with
partition level statistics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]