ignite-3478

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f6b01483
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f6b01483
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f6b01483

Branch: refs/heads/ignite-3479
Commit: f6b014834fce7a08eb4b2345edab54f5b3654408
Parents: e449460
Author: sboikov <sboi...@gridgain.com>
Authored: Fri Sep 22 14:00:09 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri Sep 22 14:00:09 2017 +0300

----------------------------------------------------------------------
 .../dht/GridPartitionedGetFuture.java           |  2 +-
 .../mvcc/CacheCoordinatorsSharedManager.java    | 35 +++++++++++++++-----
 .../query/GridCacheDistributedQueryManager.java |  2 +-
 .../cache/query/GridCacheQueryManager.java      |  2 +-
 4 files changed, 30 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f6b01483/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index a6978b5..9b7d733 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -249,7 +249,7 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
             if (mvccVer != null) {
                 assert mvccCrd != null;
 
-                cctx.shared().coordinators().ackQueryDone(mvccCrd, 
mvccVer.counter());
+                cctx.shared().coordinators().ackQueryDone(mvccCrd, mvccVer);
             }
 
             cache().sendTtlUpdateRequest(expiryPlc);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f6b01483/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index 0f7e71e..d266bb3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -176,13 +176,26 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
 
     /**
      * @param crd Coordinator.
-     * @param cntr Counter assigned to query.
+     * @param mvccVer Query version.
      */
-    public void ackQueryDone(ClusterNode crd, long cntr) {
+    public void ackQueryDone(ClusterNode crd, MvccCoordinatorVersion mvccVer) {
         try {
+            long trackCntr = mvccVer.counter();
+
+            MvccLongList txs = mvccVer.activeTransactions();
+
+            if (txs != null) {
+                for (int i = 0; i < txs.size(); i++) {
+                    long txId = txs.get(i);
+
+                    if (txId < trackCntr)
+                        trackCntr = txId;
+                }
+            }
+
             cctx.gridIO().sendToGridTopic(crd,
                 MSG_TOPIC,
-                new CoordinatorQueryAckRequest(cntr),
+                new CoordinatorQueryAckRequest(trackCntr),
                 MSG_POLICY);
         }
         catch (ClusterTopologyCheckedException e) {
@@ -190,7 +203,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
                 log.debug("Failed to send query ack, node left [crd=" + crd + 
']');
         }
         catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send query ack [crd=" + crd + ", cntr=" + 
cntr + ']', e);
+            U.error(log, "Failed to send query ack [crd=" + crd + ", cntr=" + 
mvccVer + ']', e);
         }
     }
 
@@ -519,15 +532,21 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
 
         MvccCoordinatorVersionResponse res = new 
MvccCoordinatorVersionResponse();
 
-        for (Long txVer : activeTxs.values())
+        Long trackCntr = mvccCntr;
+
+        for (Long txVer : activeTxs.values()) {
+            if (txVer < trackCntr)
+                trackCntr = txVer;
+
             res.addTx(txVer);
+        }
 
-        Integer queries = activeQueries.get(mvccCntr);
+        Integer queries = activeQueries.get(trackCntr);
 
         if (queries != null)
-            activeQueries.put(mvccCntr, queries + 1);
+            activeQueries.put(trackCntr, queries + 1);
         else
-            activeQueries.put(mvccCntr, 1);
+            activeQueries.put(trackCntr, 1);
 
         res.init(futId, crdVer, mvccCntr, COUNTER_NA);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f6b01483/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index ffb49e0..3433b4f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -585,7 +585,7 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
                     cctx.io().removeOrderedHandler(false, topic);
 
                     if (mvccCrd != null)
-                        cctx.shared().coordinators().ackQueryDone(mvccCrd, 
mvccVer.counter());
+                        cctx.shared().coordinators().ackQueryDone(mvccCrd, 
mvccVer);
                 }
             });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f6b01483/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 075e492..b711a80 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -3004,7 +3004,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
         /** {@inheritDoc} */
         @Override protected void onClose() {
             if (mvccCrd != null)
-                dht.context().shared().coordinators().ackQueryDone(mvccCrd, 
mvccVer.counter());
+                dht.context().shared().coordinators().ackQueryDone(mvccCrd, 
mvccVer);
 
             if (expiryPlc != null && dht != null) {
                 dht.sendTtlUpdateRequest(expiryPlc);

Reply via email to