IGNITE-9682 Update partition full map in parallel - Fixes #4824. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/862c9264 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/862c9264 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/862c9264 Branch: refs/heads/ignite-627 Commit: 862c9264fc05816f96ea594807854502ff3dd00a Parents: 2906a16 Author: Oleg Ostanin <oosta...@gridgain.com> Authored: Mon Oct 29 13:28:07 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Mon Oct 29 13:28:07 2018 +0300 ---------------------------------------------------------------------- .../GridDhtPartitionsExchangeFuture.java | 66 ++++++++++++-------- 1 file changed, 39 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/862c9264/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 0fc9c24..9314096 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -130,6 +130,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverJoinEvent; import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverLeftEvent; import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE; +import static org.apache.ignite.internal.util.IgniteUtils.doInParallel; /** * Future for exchanging partition maps. @@ -3948,39 +3949,50 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte long time = System.currentTimeMillis(); - for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { - Integer grpId = entry.getKey(); + int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize(); - CacheGroupContext grp = cctx.cache().cacheGroup(grpId); + // Reserve at least 2 threads for system operations. + parallelismLvl = Math.max(1, parallelismLvl - 2); - if (grp != null) { - CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId, - grp.topology().partitions()); + try { + doInParallel( + parallelismLvl, + cctx.kernalContext().getSystemExecutorService(), + msg.partitions().keySet(), grpId -> { + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - grp.topology().update(resTopVer, - entry.getValue(), - cntrMap, - msg.partsToReload(cctx.localNodeId(), grpId), - msg.partitionSizes(grpId), - null); - } - else { - ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE); + if (grp != null) { + CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId, + grp.topology().partitions()); - if (oldest != null && oldest.isLocal()) { - GridDhtPartitionTopology top = cctx.exchange().clientTopology(grpId, events().discoveryCache()); + grp.topology().update(resTopVer, + msg.partitions().get(grpId), + cntrMap, + msg.partsToReload(cctx.localNodeId(), grpId), + msg.partitionSizes(grpId), + null); + } + else { + ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE); - CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId, - top.partitions()); + if (oldest != null && oldest.isLocal()) { + GridDhtPartitionTopology top = cctx.exchange().clientTopology(grpId, events().discoveryCache()); - top.update(resTopVer, - entry.getValue(), - cntrMap, - Collections.emptySet(), - null, - null); - } - } + CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId, + top.partitions()); + + top.update(resTopVer, + msg.partitions().get(grpId), + cntrMap, + Collections.emptySet(), + null, + null); + } + } + }); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); } partitionsReceived = true;