This is an automated email from the ASF dual-hosted git repository. ilyak pushed a commit to branch ignite-11767v7 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 64c5791d3dd7a05b87a59ae372a78128f9e6ec7d Author: Ilya Kasnacheev <ilya.kasnach...@gmail.com> AuthorDate: Thu Aug 15 16:57:15 2019 +0300 IGNITE-11767 Clean up GridDhtPartitionsFullMessage when not needed, do not hold decompressed partsSizes in field. Since uncompression of partsSizes is de-parallelized, PME might be slightly slower while consuming way less Heap. Cherry-picked from 478277e5e3fe1a535ea905f8beab42926453825a --- .../cache/GridCachePartitionExchangeManager.java | 21 ++++- .../GridDhtPartitionsAbstractMessage.java | 4 +- .../preloader/GridDhtPartitionsExchangeFuture.java | 14 +++- .../preloader/GridDhtPartitionsFullMessage.java | 97 ++++++++++------------ .../preloader/GridDhtPartitionsSingleMessage.java | 15 +--- .../cache/CacheGroupsMetricsRebalanceTest.java | 4 +- 6 files changed, 81 insertions(+), 74 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index f18b404..20e3c02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1158,10 +1158,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana partsToReload ); - m.compress(compress); + m.compressed(compress); final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>(); + Map<Integer, Map<Integer, Long>> partsSizes = new HashMap<>(); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) { if (exchId != null) { @@ -1184,7 +1186,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana affCache.similarAffinityKey()); } - m.addPartitionSizes(grp.groupId(), grp.topology().globalPartSizes()); + Map<Integer, Long> partSizesMap = grp.topology().globalPartSizes(); + + if (!partSizesMap.isEmpty()) + partsSizes.put(grp.groupId(), partSizesMap); if (exchId != null) { CachePartitionFullCountersMap cntrsMap = grp.topology().fullUpdateCounters(); @@ -1220,10 +1225,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana else m.addPartitionUpdateCounters(top.groupId(), CachePartitionFullCountersMap.toCountersMap(cntrsMap)); - m.addPartitionSizes(top.groupId(), top.globalPartSizes()); + Map<Integer, Long> partSizesMap = top.globalPartSizes(); + + if (!partSizesMap.isEmpty()) + partsSizes.put(top.groupId(), partSizesMap); } } + if (!partsSizes.isEmpty()) + m.partitionSizes(cctx, partsSizes); + return m; } @@ -1535,6 +1546,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean updated = false; + Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx); + for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { Integer grpId = entry.getKey(); @@ -1552,7 +1565,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), grpId), - msg.partitionSizes(grpId), + partsSizes.getOrDefault(grpId, Collections.emptyMap()), msg.topologyVersion()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index 84cc792..a5eb237 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -118,14 +118,14 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage /** * @return {@code True} if message data is compressed. */ - protected final boolean compressed() { + public final boolean compressed() { return (flags & COMPRESSED_FLAG_MASK) != 0; } /** * @param compressed {@code True} if message data is compressed. */ - protected final void compressed(boolean compressed) { + public final void compressed(boolean compressed) { flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : (byte)(flags & ~COMPRESSED_FLAG_MASK); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e84b7a0..d9e780a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2331,6 +2331,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte newCrdFut = null; exchangeLocE = null; exchangeGlobalExceptions.clear(); + if (finishState != null) + finishState.cleanUp(); } /** @@ -3981,6 +3983,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte long time = System.currentTimeMillis(); + Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx); + for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { Integer grpId = entry.getKey(); @@ -3994,7 +3998,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte entry.getValue(), cntrMap, msg.partsToReload(cctx.localNodeId(), grpId), - msg.partitionSizes(grpId), + partsSizes.getOrDefault(grpId, Collections.emptyMap()), null); } else { @@ -4776,6 +4780,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte this.resTopVer = resTopVer; this.msg = msg; } + + /** + * Cleans up resources to avoid excessive memory usage. + */ + public void cleanUp() { + if (msg != null) + msg.cleanUp(); + } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index a63ab70..b9ffd9b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; @@ -95,11 +96,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** Serialized partitions that must be cleared and re-loaded. */ private byte[] partsToReloadBytes; - /** Partitions sizes. */ - @GridToStringInclude - @GridDirectTransient - private Map<Integer, Map<Integer, Long>> partsSizes; - /** Serialized partitions sizes. */ private byte[] partsSizesBytes; @@ -115,10 +111,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa private byte[] errsBytes; /** */ - @GridDirectTransient - private transient boolean compress; - - /** */ private AffinityTopologyVersion resTopVer; /** */ @@ -174,12 +166,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa cp.partHistSuppliersBytes = partHistSuppliersBytes; cp.partsToReload = partsToReload; cp.partsToReloadBytes = partsToReloadBytes; - cp.partsSizes = partsSizes; cp.partsSizesBytes = partsSizesBytes; cp.topVer = topVer; cp.errs = errs; cp.errsBytes = errsBytes; - cp.compress = compress; cp.resTopVer = resTopVer; cp.joinedNodeAff = joinedNodeAff; cp.idealAffDiff = idealAffDiff; @@ -246,13 +236,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } /** - * @param compress {@code True} if it is possible to use compression for message. - */ - public void compress(boolean compress) { - this.compress = compress; - } - - /** * @return Local partitions. */ public Map<Integer, GridDhtPartitionFullMap> partitions() { @@ -285,7 +268,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa parts.put(grpId, fullMap); if (dupDataCache != null) { - assert compress; + assert compressed(); assert parts.containsKey(dupDataCache); if (dupPartsData == null) @@ -356,32 +339,43 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } /** - * Adds partition sizes map for specified {@code grpId} to the current message. + * Supplies partition sizes map for all cache groups. * - * @param grpId Group id. - * @param partSizesMap Partition sizes map. + * @param ctx Cache context. + * @param partsSizes Partitions sizes map. */ - public void addPartitionSizes(int grpId, Map<Integer, Long> partSizesMap) { - if (partSizesMap.isEmpty()) - return; + public void partitionSizes(GridCacheSharedContext ctx, Map<Integer, Map<Integer, Long>> partsSizes) { + try { + byte[] marshalled = U.marshal(ctx, partsSizes); - if (partsSizes == null) - partsSizes = new HashMap<>(); + if (compressed()) + marshalled = U.zip(marshalled); - partsSizes.put(grpId, partSizesMap); + partsSizesBytes = marshalled; + } + catch (IgniteCheckedException ex) { + throw new IgniteException(ex); + } } /** - * Returns partition sizes map for specified {@code grpId}. + * Returns partition sizes map for all cache groups. * - * @param grpId Group id. - * @return Partition sizes map (partId, partSize). + * @param ctx Cache context. + * @return Partition sizes map (grpId, (partId, partSize)). */ - public Map<Integer, Long> partitionSizes(int grpId) { - if (partsSizes == null) + public Map<Integer, Map<Integer, Long>> partitionSizes(GridCacheSharedContext ctx) { + if (partsSizesBytes == null) return Collections.emptyMap(); - return partsSizes.getOrDefault(grpId, Collections.emptyMap()); + try { + return compressed() + ? U.unmarshalZip(ctx.marshaller(), partsSizesBytes, ctx.deploy().globalLoader()) + : U.unmarshal(ctx, partsSizesBytes, ctx.deploy().globalLoader()); + } + catch (IgniteCheckedException ex) { + throw new IgniteException(ex); + } } /** @@ -415,7 +409,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa byte[] partCntrsBytes20 = null; byte[] partHistSuppliersBytes0 = null; byte[] partsToReloadBytes0 = null; - byte[] partsSizesBytes0 = null; byte[] errsBytes0 = null; if (!F.isEmpty(parts) && partsBytes == null) @@ -433,32 +426,23 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa if (partsToReload != null && partsToReloadBytes == null) partsToReloadBytes0 = U.marshal(ctx, partsToReload); - if (partsSizes != null && partsSizesBytes == null) - partsSizesBytes0 = U.marshal(ctx, partsSizes); - if (!F.isEmpty(errs) && errsBytes == null) errsBytes0 = U.marshal(ctx, errs); - if (compress) { - assert !compressed(); - + if (compressed()) { try { byte[] partsBytesZip = U.zip(partsBytes0); byte[] partCntrsBytesZip = U.zip(partCntrsBytes0); byte[] partCntrsBytes2Zip = U.zip(partCntrsBytes20); byte[] partHistSuppliersBytesZip = U.zip(partHistSuppliersBytes0); byte[] partsToReloadBytesZip = U.zip(partsToReloadBytes0); - byte[] partsSizesBytesZip = U.zip(partsSizesBytes0); byte[] exsBytesZip = U.zip(errsBytes0); - partsBytes0 = partsBytesZip; partCntrsBytes0 = partCntrsBytesZip; partCntrsBytes20 = partCntrsBytes2Zip; partHistSuppliersBytes0 = partHistSuppliersBytesZip; partsToReloadBytes0 = partsToReloadBytesZip; - partsSizesBytes0 = partsSizesBytesZip; errsBytes0 = exsBytesZip; - compressed(true); } catch (IgniteCheckedException e) { @@ -471,7 +455,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa partCntrsBytes2 = partCntrsBytes20; partHistSuppliersBytes = partHistSuppliersBytes0; partsToReloadBytes = partsToReloadBytes0; - partsSizesBytes = partsSizesBytes0; errsBytes = errsBytes0; } } @@ -559,13 +542,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa partsToReload = U.unmarshal(ctx, partsToReloadBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } - if (partsSizesBytes != null && partsSizes == null) { - if (compressed()) - partsSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - else - partsSizes = U.unmarshal(ctx, partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - } - if (partCntrs == null) partCntrs = new IgniteDhtPartitionCountersMap(); @@ -575,7 +551,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa else errs = U.unmarshal(ctx, errsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } - if (errs == null) errs = new HashMap<>(); } @@ -830,4 +805,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } } } + + /** + * Cleans up resources to avoid excessive memory usage. + */ + public void cleanUp() { + partsBytes = null; + partCntrs2 = null; + partCntrsBytes = null; + partCntrsBytes2 = null; + partHistSuppliersBytes = null; + partsToReloadBytes = null; + partsSizesBytes = null; + errsBytes = null; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 088fb31..db5d800 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -96,10 +96,6 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes private boolean client; /** */ - @GridDirectTransient - private transient boolean compress; - - /** */ @GridDirectCollection(Integer.class) private Collection<Integer> grpsAffRequest; @@ -131,8 +127,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes boolean compress) { super(exchId, lastVer); + compressed(compress); + this.client = client; - this.compress = compress; } /** @@ -201,7 +198,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes parts.put(cacheId, locMap); if (dupDataCache != null) { - assert compress; + assert compressed(); assert F.isEmpty(locMap.map()); assert parts.containsKey(dupDataCache); @@ -367,9 +364,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (err != null && errBytes == null) errBytes0 = U.marshal(ctx, err); - if (compress) { - assert !compressed(); - + if (compressed()) { try { byte[] partsBytesZip = U.zip(partsBytes0); byte[] partCntrsBytesZip = U.zip(partCntrsBytes0); @@ -382,8 +377,6 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes partHistCntrsBytes0 = partHistCntrsBytesZip; partsSizesBytes0 = partsSizesBytesZip; errBytes0 = exBytesZip; - - compressed(true); } catch (IgniteCheckedException e) { U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java index af2dc63..3aa07a9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java @@ -277,7 +277,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest { @Override public boolean apply() { return ig2.cache(CACHE1).localMetrics().getKeysToRebalanceLeft() == 0; } - }, timeLeft + 10_000L); + }, timeLeft + 12_000L); log.info("[timePassed=" + timePassed + ", timeLeft=" + timeLeft + ", Time to rebalance=" + (finishTime - startTime) + @@ -292,7 +292,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest { long diff = finishTime - currTime; - assertTrue("Expected less than 10000, but actual: " + diff, Math.abs(diff) < 10_000L); + assertTrue("Expected less than 12000, but actual: " + diff, Math.abs(diff) < 12_000L); } /**