IGNITE-3443 Cleanup and reworked to non-blocking update metrics algorithm.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/66d88909
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/66d88909
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/66d88909

Branch: refs/heads/ignite-3443
Commit: 66d88909f57b79d7d91d6f4a94787496d3569259
Parents: b8b4e6a
Author: AKuznetsov <akuznet...@gridgain.com>
Authored: Wed Oct 12 19:11:15 2016 +0700
Committer: AKuznetsov <akuznet...@gridgain.com>
Committed: Wed Oct 12 19:11:15 2016 +0700

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      |  8 +--
 .../GridCacheQueryDetailsMetricsAdapter.java    |  7 ++-
 .../query/GridCacheQueryFutureAdapter.java      |  4 +-
 .../cache/query/GridCacheQueryManager.java      | 55 +++++++++-----------
 .../processors/query/GridQueryProcessor.java    |  1 +
 .../internal/visor/cache/VisorCacheMetrics.java |  2 +-
 .../VisorCacheQueryMetricsCollectorTask.java    |  2 +-
 7 files changed, 39 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/66d88909/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index af3a819..a84fb00 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -494,8 +494,8 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
         if (grp != null)
             qry.projection(grp);
 
-        final GridCloseableIterator<R> iter = 
ctx.kernalContext().query().executeQuery(GridCacheQueryType.SCAN, ctx.name(), 
ctx,
-            new IgniteOutClosureX<GridCloseableIterator<R>>() {
+        final GridCloseableIterator<R> iter = 
ctx.kernalContext().query().executeQuery(GridCacheQueryType.SCAN,
+            ctx.name(), ctx, new IgniteOutClosureX<GridCloseableIterator<R>>() 
{
                 @Override public GridCloseableIterator<R> applyx() throws 
IgniteCheckedException {
                     final GridCloseableIterator iter0 = qry.executeScanQuery();
 
@@ -563,8 +563,8 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
             if (grp != null)
                 qry.projection(grp);
 
-            fut = 
ctx.kernalContext().query().executeQuery(GridCacheQueryType.SPI, "", ctx,
-                new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
+            fut = 
ctx.kernalContext().query().executeQuery(GridCacheQueryType.SPI, 
filter.getClass().getSimpleName(),
+                ctx, new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, 
V>>>() {
                     @Override public CacheQueryFuture<Map.Entry<K, V>> 
applyx() throws IgniteCheckedException {
                         return qry.execute(((SpiQuery)filter).getArgs());
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/66d88909/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsAdapter.java
index eaa9546..90115b8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryDetailsMetricsAdapter.java
@@ -127,7 +127,12 @@ public class GridCacheQueryDetailsMetricsAdapter 
implements QueryDetailsMetrics,
         }
     }
 
-    public void update(QueryDetailsMetrics m) {
+    /**
+     * Aggregate metrics.
+     *
+     * @param m Other metrics to take into account.
+     */
+    public void aggregate(QueryDetailsMetrics m) {
         if (lastStartTime < m.lastStartTime())
             lastStartTime = m.lastStartTime();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/66d88909/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index 143a2ae..db519f5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -155,9 +155,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> 
extends GridFutureAda
     @Override public boolean onDone(Collection<R> res, Throwable err) {
         cctx.time().removeTimeoutObject(this);
 
-        GridCacheQueryAdapter<?> qryAdapter = qry.query();
-
-        qryAdapter.onCompleted(res, err, startTime(), duration());
+        qry.query().onCompleted(res, err, startTime(), duration());
 
         return super.onDone(res, err);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/66d88909/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 16cd2d6..36f3ede 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -39,7 +39,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import javax.cache.Cache;
 import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.Ignite;
@@ -48,6 +47,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.QueryDetailsMetrics;
 import org.apache.ignite.cache.query.QueryMetrics;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -82,6 +82,7 @@ import 
org.apache.ignite.internal.processors.query.GridQueryIndexType;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
 import org.apache.ignite.internal.util.GridEmptyIterator;
@@ -149,11 +150,10 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
     private volatile GridCacheQueryMetricsAdapter metrics = new 
GridCacheQueryMetricsAdapter();
 
     /** */
-    private final ConcurrentSkipListMap<Integer, QueryDetailsMetrics> qryHist =
-        new ConcurrentSkipListMap<>();
+    private int qryHistSz;
 
     /** */
-    private int qryHistSz;
+    private GridBoundedConcurrentLinkedHashMap<Integer, QueryDetailsMetrics> 
qryHist;
 
     /** */
     private final ConcurrentMap<UUID, RequestFutureMap> qryIters =
@@ -180,10 +180,14 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
     /** {@inheritDoc} */
     @Override public void start0() throws IgniteCheckedException {
+        CacheConfiguration ccfg = cctx.config();
+
         qryProc = cctx.kernalContext().query();
         space = cctx.name();
-        maxIterCnt = cctx.config().getMaxQueryIteratorsCount();
-        qryHistSz = 100; // TODO IGNITE-3443 
cctx.config().getQueryMetricsHistorySize();
+
+        maxIterCnt = ccfg.getMaxQueryIteratorsCount();
+        qryHistSz = ccfg.getQueryMetricsHistorySize();
+        qryHist = new GridBoundedConcurrentLinkedHashMap<>(qryHistSz);
 
         lsnr = new GridLocalEventListener() {
             @Override public void onEvent(Event evt) {
@@ -223,7 +227,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
         cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
-        enabled = GridQueryProcessor.isEnabled(cctx.config());
+        enabled = GridQueryProcessor.isEnabled(ccfg);
 
         qryTopVer = cctx.startTopologyVersion();
 
@@ -2083,6 +2087,11 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
         return metrics.copy();
     }
 
+    /**
+     * Gets cache queries detailed metrics.
+     *
+     * @return Cache queries metrics aggregated by query type and query text.
+     */
     public Collection<QueryDetailsMetrics> detailsMetrics() {
         return qryHist.values();
     }
@@ -2110,32 +2119,18 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
             if (qryHistSz > 0) {
                 Integer qryHash = 
GridCacheQueryDetailsMetricsAdapter.queryHashCode(qryType, qry);
 
-                QueryDetailsMetrics qryMetrics = qryHist.get(qryHash);
+                // Non-blocking algorithm to update metrics.
+                while(true) {
+                    QueryDetailsMetrics qryMetrics = qryHist.remove(qryHash);
 
-                if (qryMetrics == null) {
-                    qryMetrics = new 
GridCacheQueryDetailsMetricsAdapter(qryType, qry);
+                    if (qryMetrics == null)
+                        qryMetrics = new 
GridCacheQueryDetailsMetricsAdapter(qryType, qry);
 
-                    QueryDetailsMetrics oldMetrics = 
qryHist.putIfAbsent(qryHash, qryMetrics);
+                    
((GridCacheQueryDetailsMetricsAdapter)qryMetrics).update(startTime, duration, 
failed, completed);
 
-                    if (oldMetrics != null)
-                        qryMetrics = oldMetrics;
-                }
-
-                
((GridCacheQueryDetailsMetricsAdapter)qryMetrics).update(startTime, duration, 
failed, completed);
-
-                while (qryHist.size() > qryHistSz) {
-                    Map.Entry<Integer, QueryDetailsMetrics> firstEntry = 
qryHist.firstEntry();
-                    qryHash = firstEntry.getKey();
-                    qryMetrics = firstEntry.getValue();
-
-                    for (Map.Entry<Integer, QueryDetailsMetrics> entry : 
qryHist.entrySet()) {
-                        if (qryMetrics.lastStartTime() > 
entry.getValue().lastStartTime()) {
-                            qryHash = entry.getKey();
-                            qryMetrics = entry.getValue();
-                        }
-                    }
-
-                    qryHist.remove(qryHash);
+                    // Leave if updated.
+                    if (qryHist.putIfAbsent(qryHash, qryMetrics) == null)
+                        break;
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/66d88909/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 889cf3d..d54d25f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import javax.cache.Cache;
 import javax.cache.CacheException;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;

http://git-wip-us.apache.org/repos/asf/ignite/blob/66d88909/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
index 58d36ac..1204cbc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
@@ -210,7 +210,7 @@ public class VisorCacheMetrics implements Serializable, 
LessNamingBean {
         commitsPerSec = perSecond(m.getAverageTxCommitTime());
         rollbacksPerSec = perSecond(m.getAverageTxRollbackTime());
 
-        qryMetrics = new VisorCacheQueryMetrics().from(c.queryMetrics());
+        qryMetrics = VisorCacheQueryMetrics.from(c.queryMetrics());
 
         dhtEvictQueueCurrSize = m.getDhtEvictQueueCurrentSize();
         txThreadMapSize = m.getTxThreadMapSize();

http://git-wip-us.apache.org/repos/asf/ignite/blob/66d88909/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryMetricsCollectorTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryMetricsCollectorTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryMetricsCollectorTask.java
index 60affe5..d008f33 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryMetricsCollectorTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheQueryMetricsCollectorTask.java
@@ -103,7 +103,7 @@ public class VisorCacheQueryMetricsCollectorTask extends 
VisorMultiNodeTask<Void
                     res.put(qryHashCode, aggMetrics);
                 }
 
-                aggMetrics.update(m);
+                aggMetrics.aggregate(m);
             }
         }
 

Reply via email to