IGNITE-3443 Cleanup and reworked to non-blocking update metrics algorithm.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/66d88909 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/66d88909 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/66d88909 Branch: refs/heads/ignite-3443 Commit: 66d88909f57b79d7d91d6f4a94787496d3569259 Parents: b8b4e6a Author: AKuznetsov <akuznet...@gridgain.com> Authored: Wed Oct 12 19:11:15 2016 +0700 Committer: AKuznetsov <akuznet...@gridgain.com> Committed: Wed Oct 12 19:11:15 2016 +0700 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 8 +-- .../GridCacheQueryDetailsMetricsAdapter.java | 7 ++- .../query/GridCacheQueryFutureAdapter.java | 4 +- .../cache/query/GridCacheQueryManager.java | 55 +++++++++----------- .../processors/query/GridQueryProcessor.java | 1 + .../internal/visor/cache/VisorCacheMetrics.java | 2 +- .../VisorCacheQueryMetricsCollectorTask.java | 2 +- 7 files changed, 39 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/66d88909/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index af3a819..a84fb00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -494,8 +494,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (grp != null) qry.projection(grp); - final GridCloseableIterator<R> iter = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SCAN, ctx.name(), ctx, - new IgniteOutClosureX<GridCloseableIterator<R>>() { + final GridCloseableIterator<R> iter = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SCAN, + ctx.name(), ctx, new IgniteOutClosureX<GridCloseableIterator<R>>() { @Override public GridCloseableIterator<R> applyx() throws IgniteCheckedException { final GridCloseableIterator iter0 = qry.executeScanQuery(); @@ -563,8 +563,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (grp != null) qry.projection(grp); - fut = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SPI, "", ctx, - new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() { + fut = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SPI, filter.getClass().getSimpleName(), + ctx, new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() { @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException { return qry.execute(((SpiQuery)filter).getArgs()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/66d88909/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsAdapter.java index eaa9546..90115b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsAdapter.java @@ -127,7 +127,12 @@ public class GridCacheQueryDetailsMetricsAdapter implements QueryDetailsMetrics, } } - public void update(QueryDetailsMetrics m) { + /** + * Aggregate metrics. + * + * @param m Other metrics to take into account. + */ + public void aggregate(QueryDetailsMetrics m) { if (lastStartTime < m.lastStartTime()) lastStartTime = m.lastStartTime(); http://git-wip-us.apache.org/repos/asf/ignite/blob/66d88909/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java index 143a2ae..db519f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java @@ -155,9 +155,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda @Override public boolean onDone(Collection<R> res, Throwable err) { cctx.time().removeTimeoutObject(this); - GridCacheQueryAdapter<?> qryAdapter = qry.query(); - - qryAdapter.onCompleted(res, err, startTime(), duration()); + qry.query().onCompleted(res, err, startTime(), duration()); return super.onDone(res, err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/66d88909/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 16cd2d6..36f3ede 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -39,7 +39,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; import javax.cache.Cache; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.Ignite; @@ -48,6 +47,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.cache.query.QueryDetailsMetrics; import org.apache.ignite.cache.query.QueryMetrics; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.events.DiscoveryEvent; @@ -82,6 +82,7 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexType; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridEmptyCloseableIterator; import org.apache.ignite.internal.util.GridEmptyIterator; @@ -149,11 +150,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte private volatile GridCacheQueryMetricsAdapter metrics = new GridCacheQueryMetricsAdapter(); /** */ - private final ConcurrentSkipListMap<Integer, QueryDetailsMetrics> qryHist = - new ConcurrentSkipListMap<>(); + private int qryHistSz; /** */ - private int qryHistSz; + private GridBoundedConcurrentLinkedHashMap<Integer, QueryDetailsMetrics> qryHist; /** */ private final ConcurrentMap<UUID, RequestFutureMap> qryIters = @@ -180,10 +180,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { + CacheConfiguration ccfg = cctx.config(); + qryProc = cctx.kernalContext().query(); space = cctx.name(); - maxIterCnt = cctx.config().getMaxQueryIteratorsCount(); - qryHistSz = 100; // TODO IGNITE-3443 cctx.config().getQueryMetricsHistorySize(); + + maxIterCnt = ccfg.getMaxQueryIteratorsCount(); + qryHistSz = ccfg.getQueryMetricsHistorySize(); + qryHist = new GridBoundedConcurrentLinkedHashMap<>(qryHistSz); lsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -223,7 +227,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); - enabled = GridQueryProcessor.isEnabled(cctx.config()); + enabled = GridQueryProcessor.isEnabled(ccfg); qryTopVer = cctx.startTopologyVersion(); @@ -2083,6 +2087,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return metrics.copy(); } + /** + * Gets cache queries detailed metrics. + * + * @return Cache queries metrics aggregated by query type and query text. + */ public Collection<QueryDetailsMetrics> detailsMetrics() { return qryHist.values(); } @@ -2110,32 +2119,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (qryHistSz > 0) { Integer qryHash = GridCacheQueryDetailsMetricsAdapter.queryHashCode(qryType, qry); - QueryDetailsMetrics qryMetrics = qryHist.get(qryHash); + // Non-blocking algorithm to update metrics. + while(true) { + QueryDetailsMetrics qryMetrics = qryHist.remove(qryHash); - if (qryMetrics == null) { - qryMetrics = new GridCacheQueryDetailsMetricsAdapter(qryType, qry); + if (qryMetrics == null) + qryMetrics = new GridCacheQueryDetailsMetricsAdapter(qryType, qry); - QueryDetailsMetrics oldMetrics = qryHist.putIfAbsent(qryHash, qryMetrics); + ((GridCacheQueryDetailsMetricsAdapter)qryMetrics).update(startTime, duration, failed, completed); - if (oldMetrics != null) - qryMetrics = oldMetrics; - } - - ((GridCacheQueryDetailsMetricsAdapter)qryMetrics).update(startTime, duration, failed, completed); - - while (qryHist.size() > qryHistSz) { - Map.Entry<Integer, QueryDetailsMetrics> firstEntry = qryHist.firstEntry(); - qryHash = firstEntry.getKey(); - qryMetrics = firstEntry.getValue(); - - for (Map.Entry<Integer, QueryDetailsMetrics> entry : qryHist.entrySet()) { - if (qryMetrics.lastStartTime() > entry.getValue().lastStartTime()) { - qryHash = entry.getKey(); - qryMetrics = entry.getValue(); - } - } - - qryHist.remove(qryHash); + // Leave if updated. + if (qryHist.putIfAbsent(qryHash, qryMetrics) == null) + break; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/66d88909/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 889cf3d..d54d25f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import javax.cache.Cache; import javax.cache.CacheException; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; http://git-wip-us.apache.org/repos/asf/ignite/blob/66d88909/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java index 58d36ac..1204cbc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java @@ -210,7 +210,7 @@ public class VisorCacheMetrics implements Serializable, LessNamingBean { commitsPerSec = perSecond(m.getAverageTxCommitTime()); rollbacksPerSec = perSecond(m.getAverageTxRollbackTime()); - qryMetrics = new VisorCacheQueryMetrics().from(c.queryMetrics()); + qryMetrics = VisorCacheQueryMetrics.from(c.queryMetrics()); dhtEvictQueueCurrSize = m.getDhtEvictQueueCurrentSize(); txThreadMapSize = m.getTxThreadMapSize(); http://git-wip-us.apache.org/repos/asf/ignite/blob/66d88909/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryMetricsCollectorTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryMetricsCollectorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryMetricsCollectorTask.java index 60affe5..d008f33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryMetricsCollectorTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryMetricsCollectorTask.java @@ -103,7 +103,7 @@ public class VisorCacheQueryMetricsCollectorTask extends VisorMultiNodeTask<Void res.put(qryHashCode, aggMetrics); } - aggMetrics.update(m); + aggMetrics.aggregate(m); } }