ignite-3478
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f6b01483 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f6b01483 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f6b01483 Branch: refs/heads/ignite-3479 Commit: f6b014834fce7a08eb4b2345edab54f5b3654408 Parents: e449460 Author: sboikov <sboi...@gridgain.com> Authored: Fri Sep 22 14:00:09 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Sep 22 14:00:09 2017 +0300 ---------------------------------------------------------------------- .../dht/GridPartitionedGetFuture.java | 2 +- .../mvcc/CacheCoordinatorsSharedManager.java | 35 +++++++++++++++----- .../query/GridCacheDistributedQueryManager.java | 2 +- .../cache/query/GridCacheQueryManager.java | 2 +- 4 files changed, 30 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f6b01483/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index a6978b5..9b7d733 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -249,7 +249,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda if (mvccVer != null) { assert mvccCrd != null; - cctx.shared().coordinators().ackQueryDone(mvccCrd, mvccVer.counter()); + cctx.shared().coordinators().ackQueryDone(mvccCrd, mvccVer); } cache().sendTtlUpdateRequest(expiryPlc); http://git-wip-us.apache.org/repos/asf/ignite/blob/f6b01483/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index 0f7e71e..d266bb3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -176,13 +176,26 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager /** * @param crd Coordinator. - * @param cntr Counter assigned to query. + * @param mvccVer Query version. */ - public void ackQueryDone(ClusterNode crd, long cntr) { + public void ackQueryDone(ClusterNode crd, MvccCoordinatorVersion mvccVer) { try { + long trackCntr = mvccVer.counter(); + + MvccLongList txs = mvccVer.activeTransactions(); + + if (txs != null) { + for (int i = 0; i < txs.size(); i++) { + long txId = txs.get(i); + + if (txId < trackCntr) + trackCntr = txId; + } + } + cctx.gridIO().sendToGridTopic(crd, MSG_TOPIC, - new CoordinatorQueryAckRequest(cntr), + new CoordinatorQueryAckRequest(trackCntr), MSG_POLICY); } catch (ClusterTopologyCheckedException e) { @@ -190,7 +203,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager log.debug("Failed to send query ack, node left [crd=" + crd + ']'); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send query ack [crd=" + crd + ", cntr=" + cntr + ']', e); + U.error(log, "Failed to send query ack [crd=" + crd + ", cntr=" + mvccVer + ']', e); } } @@ -519,15 +532,21 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); - for (Long txVer : activeTxs.values()) + Long trackCntr = mvccCntr; + + for (Long txVer : activeTxs.values()) { + if (txVer < trackCntr) + trackCntr = txVer; + res.addTx(txVer); + } - Integer queries = activeQueries.get(mvccCntr); + Integer queries = activeQueries.get(trackCntr); if (queries != null) - activeQueries.put(mvccCntr, queries + 1); + activeQueries.put(trackCntr, queries + 1); else - activeQueries.put(mvccCntr, 1); + activeQueries.put(trackCntr, 1); res.init(futId, crdVer, mvccCntr, COUNTER_NA); http://git-wip-us.apache.org/repos/asf/ignite/blob/f6b01483/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index ffb49e0..3433b4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -585,7 +585,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage cctx.io().removeOrderedHandler(false, topic); if (mvccCrd != null) - cctx.shared().coordinators().ackQueryDone(mvccCrd, mvccVer.counter()); + cctx.shared().coordinators().ackQueryDone(mvccCrd, mvccVer); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/f6b01483/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 075e492..b711a80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -3004,7 +3004,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** {@inheritDoc} */ @Override protected void onClose() { if (mvccCrd != null) - dht.context().shared().coordinators().ackQueryDone(mvccCrd, mvccVer.counter()); + dht.context().shared().coordinators().ackQueryDone(mvccCrd, mvccVer); if (expiryPlc != null && dht != null) { dht.sendTtlUpdateRequest(expiryPlc);