This is an automated email from the ASF dual-hosted git repository.

ilyak pushed a commit to branch ignite-11767v7
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 64c5791d3dd7a05b87a59ae372a78128f9e6ec7d
Author: Ilya Kasnacheev <ilya.kasnach...@gmail.com>
AuthorDate: Thu Aug 15 16:57:15 2019 +0300

    IGNITE-11767 Clean up GridDhtPartitionsFullMessage when not needed, do not 
hold decompressed partsSizes in field.
    
    Since uncompression of partsSizes is de-parallelized, PME might be slightly 
slower while consuming way less Heap.
    
    Cherry-picked from 478277e5e3fe1a535ea905f8beab42926453825a
---
 .../cache/GridCachePartitionExchangeManager.java   | 21 ++++-
 .../GridDhtPartitionsAbstractMessage.java          |  4 +-
 .../preloader/GridDhtPartitionsExchangeFuture.java | 14 +++-
 .../preloader/GridDhtPartitionsFullMessage.java    | 97 ++++++++++------------
 .../preloader/GridDhtPartitionsSingleMessage.java  | 15 +---
 .../cache/CacheGroupsMetricsRebalanceTest.java     |  4 +-
 6 files changed, 81 insertions(+), 74 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f18b404..20e3c02 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1158,10 +1158,12 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             partsToReload
             );
 
-        m.compress(compress);
+        m.compressed(compress);
 
         final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new 
HashMap<>();
 
+        Map<Integer, Map<Integer, Long>> partsSizes = new HashMap<>();
+
         for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
             if (!grp.isLocal()) {
                 if (exchId != null) {
@@ -1184,7 +1186,10 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         affCache.similarAffinityKey());
                 }
 
-                m.addPartitionSizes(grp.groupId(), 
grp.topology().globalPartSizes());
+                Map<Integer, Long> partSizesMap = 
grp.topology().globalPartSizes();
+
+                if (!partSizesMap.isEmpty())
+                    partsSizes.put(grp.groupId(), partSizesMap);
 
                 if (exchId != null) {
                     CachePartitionFullCountersMap cntrsMap = 
grp.topology().fullUpdateCounters();
@@ -1220,10 +1225,16 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 else
                     m.addPartitionUpdateCounters(top.groupId(), 
CachePartitionFullCountersMap.toCountersMap(cntrsMap));
 
-                m.addPartitionSizes(top.groupId(), top.globalPartSizes());
+                Map<Integer, Long> partSizesMap = top.globalPartSizes();
+
+                if (!partSizesMap.isEmpty())
+                    partsSizes.put(top.groupId(), partSizesMap);
             }
         }
 
+        if (!partsSizes.isEmpty())
+            m.partitionSizes(cctx, partsSizes);
+
         return m;
     }
 
@@ -1535,6 +1546,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                 boolean updated = false;
 
+                Map<Integer, Map<Integer, Long>> partsSizes = 
msg.partitionSizes(cctx);
+
                 for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : 
msg.partitions().entrySet()) {
                     Integer grpId = entry.getKey();
 
@@ -1552,7 +1565,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                             entry.getValue(),
                             null,
                             msg.partsToReload(cctx.localNodeId(), grpId),
-                            msg.partitionSizes(grpId),
+                            partsSizes.getOrDefault(grpId, 
Collections.emptyMap()),
                             msg.topologyVersion());
                     }
                 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 84cc792..a5eb237 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -118,14 +118,14 @@ public abstract class GridDhtPartitionsAbstractMessage 
extends GridCacheMessage
     /**
      * @return {@code True} if message data is compressed.
      */
-    protected final boolean compressed() {
+    public final boolean compressed() {
         return (flags & COMPRESSED_FLAG_MASK) != 0;
     }
 
     /**
      * @param compressed {@code True} if message data is compressed.
      */
-    protected final void compressed(boolean compressed) {
+    public final void compressed(boolean compressed) {
         flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : 
(byte)(flags & ~COMPRESSED_FLAG_MASK);
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e84b7a0..d9e780a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2331,6 +2331,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         newCrdFut = null;
         exchangeLocE = null;
         exchangeGlobalExceptions.clear();
+        if (finishState != null)
+            finishState.cleanUp();
     }
 
     /**
@@ -3981,6 +3983,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
         long time = System.currentTimeMillis();
 
+        Map<Integer, Map<Integer, Long>> partsSizes = msg.partitionSizes(cctx);
+
         for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : 
msg.partitions().entrySet()) {
             Integer grpId = entry.getKey();
 
@@ -3994,7 +3998,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     entry.getValue(),
                     cntrMap,
                     msg.partsToReload(cctx.localNodeId(), grpId),
-                    msg.partitionSizes(grpId),
+                    partsSizes.getOrDefault(grpId, Collections.emptyMap()),
                     null);
             }
             else {
@@ -4776,6 +4780,14 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             this.resTopVer = resTopVer;
             this.msg = msg;
         }
+
+        /**
+         * Cleans up resources to avoid excessive memory usage.
+         */
+        public void cleanUp() {
+            if (msg != null)
+                msg.cleanUp();
+        }
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index a63ab70..b9ffd9b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
@@ -95,11 +96,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     /** Serialized partitions that must be cleared and re-loaded. */
     private byte[] partsToReloadBytes;
 
-    /** Partitions sizes. */
-    @GridToStringInclude
-    @GridDirectTransient
-    private Map<Integer, Map<Integer, Long>> partsSizes;
-
     /** Serialized partitions sizes. */
     private byte[] partsSizesBytes;
 
@@ -115,10 +111,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     private byte[] errsBytes;
 
     /** */
-    @GridDirectTransient
-    private transient boolean compress;
-
-    /** */
     private AffinityTopologyVersion resTopVer;
 
     /** */
@@ -174,12 +166,10 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
         cp.partHistSuppliersBytes = partHistSuppliersBytes;
         cp.partsToReload = partsToReload;
         cp.partsToReloadBytes = partsToReloadBytes;
-        cp.partsSizes = partsSizes;
         cp.partsSizesBytes = partsSizesBytes;
         cp.topVer = topVer;
         cp.errs = errs;
         cp.errsBytes = errsBytes;
-        cp.compress = compress;
         cp.resTopVer = resTopVer;
         cp.joinedNodeAff = joinedNodeAff;
         cp.idealAffDiff = idealAffDiff;
@@ -246,13 +236,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     }
 
     /**
-     * @param compress {@code True} if it is possible to use compression for 
message.
-     */
-    public void compress(boolean compress) {
-        this.compress = compress;
-    }
-
-    /**
      * @return Local partitions.
      */
     public Map<Integer, GridDhtPartitionFullMap> partitions() {
@@ -285,7 +268,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             parts.put(grpId, fullMap);
 
             if (dupDataCache != null) {
-                assert compress;
+                assert compressed();
                 assert parts.containsKey(dupDataCache);
 
                 if (dupPartsData == null)
@@ -356,32 +339,43 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     }
 
     /**
-     * Adds partition sizes map for specified {@code grpId} to the current 
message.
+     * Supplies partition sizes map for all cache groups.
      *
-     * @param grpId Group id.
-     * @param partSizesMap Partition sizes map.
+     * @param ctx Cache context.
+     * @param partsSizes Partitions sizes map.
      */
-    public void addPartitionSizes(int grpId, Map<Integer, Long> partSizesMap) {
-        if (partSizesMap.isEmpty())
-            return;
+    public void partitionSizes(GridCacheSharedContext ctx, Map<Integer, 
Map<Integer, Long>> partsSizes) {
+        try {
+            byte[] marshalled = U.marshal(ctx, partsSizes);
 
-        if (partsSizes == null)
-            partsSizes = new HashMap<>();
+            if (compressed())
+                marshalled = U.zip(marshalled);
 
-        partsSizes.put(grpId, partSizesMap);
+            partsSizesBytes = marshalled;
+        }
+        catch (IgniteCheckedException ex) {
+            throw new IgniteException(ex);
+        }
     }
 
     /**
-     * Returns partition sizes map for specified {@code grpId}.
+     * Returns partition sizes map for all cache groups.
      *
-     * @param grpId Group id.
-     * @return Partition sizes map (partId, partSize).
+     * @param ctx Cache context.
+     * @return Partition sizes map (grpId, (partId, partSize)).
      */
-    public Map<Integer, Long> partitionSizes(int grpId) {
-        if (partsSizes == null)
+    public Map<Integer, Map<Integer, Long>> 
partitionSizes(GridCacheSharedContext ctx) {
+        if (partsSizesBytes == null)
             return Collections.emptyMap();
 
-        return partsSizes.getOrDefault(grpId, Collections.emptyMap());
+        try {
+            return compressed()
+                ? U.unmarshalZip(ctx.marshaller(), partsSizesBytes, 
ctx.deploy().globalLoader())
+                : U.unmarshal(ctx, partsSizesBytes, 
ctx.deploy().globalLoader());
+        }
+        catch (IgniteCheckedException ex) {
+            throw new IgniteException(ex);
+        }
     }
 
     /**
@@ -415,7 +409,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             byte[] partCntrsBytes20 = null;
             byte[] partHistSuppliersBytes0 = null;
             byte[] partsToReloadBytes0 = null;
-            byte[] partsSizesBytes0 = null;
             byte[] errsBytes0 = null;
 
             if (!F.isEmpty(parts) && partsBytes == null)
@@ -433,32 +426,23 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             if (partsToReload != null && partsToReloadBytes == null)
                 partsToReloadBytes0 = U.marshal(ctx, partsToReload);
 
-            if (partsSizes != null && partsSizesBytes == null)
-                partsSizesBytes0 = U.marshal(ctx, partsSizes);
-
             if (!F.isEmpty(errs) && errsBytes == null)
                 errsBytes0 = U.marshal(ctx, errs);
 
-            if (compress) {
-                assert !compressed();
-
+            if (compressed()) {
                 try {
                     byte[] partsBytesZip = U.zip(partsBytes0);
                     byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
                     byte[] partCntrsBytes2Zip = U.zip(partCntrsBytes20);
                     byte[] partHistSuppliersBytesZip = 
U.zip(partHistSuppliersBytes0);
                     byte[] partsToReloadBytesZip = U.zip(partsToReloadBytes0);
-                    byte[] partsSizesBytesZip = U.zip(partsSizesBytes0);
                     byte[] exsBytesZip = U.zip(errsBytes0);
-
                     partsBytes0 = partsBytesZip;
                     partCntrsBytes0 = partCntrsBytesZip;
                     partCntrsBytes20 = partCntrsBytes2Zip;
                     partHistSuppliersBytes0 = partHistSuppliersBytesZip;
                     partsToReloadBytes0 = partsToReloadBytesZip;
-                    partsSizesBytes0 = partsSizesBytesZip;
                     errsBytes0 = exsBytesZip;
-
                     compressed(true);
                 }
                 catch (IgniteCheckedException e) {
@@ -471,7 +455,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             partCntrsBytes2 = partCntrsBytes20;
             partHistSuppliersBytes = partHistSuppliersBytes0;
             partsToReloadBytes = partsToReloadBytes0;
-            partsSizesBytes = partsSizesBytes0;
             errsBytes = errsBytes0;
         }
     }
@@ -559,13 +542,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
                 partsToReload = U.unmarshal(ctx, partsToReloadBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
         }
 
-        if (partsSizesBytes != null && partsSizes == null) {
-            if (compressed())
-                partsSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
-            else
-                partsSizes = U.unmarshal(ctx, partsSizesBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
-        }
-
         if (partCntrs == null)
             partCntrs = new IgniteDhtPartitionCountersMap();
 
@@ -575,7 +551,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             else
                 errs = U.unmarshal(ctx, errsBytes, U.resolveClassLoader(ldr, 
ctx.gridConfig()));
         }
-
         if (errs == null)
             errs = new HashMap<>();
     }
@@ -830,4 +805,18 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             }
         }
     }
+
+    /**
+     * Cleans up resources to avoid excessive memory usage.
+     */
+    public void cleanUp() {
+        partsBytes = null;
+        partCntrs2 = null;
+        partCntrsBytes = null;
+        partCntrsBytes2 = null;
+        partHistSuppliersBytes = null;
+        partsToReloadBytes = null;
+        partsSizesBytes = null;
+        errsBytes = null;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 088fb31..db5d800 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -96,10 +96,6 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
     private boolean client;
 
     /** */
-    @GridDirectTransient
-    private transient boolean compress;
-
-    /** */
     @GridDirectCollection(Integer.class)
     private Collection<Integer> grpsAffRequest;
 
@@ -131,8 +127,9 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
         boolean compress) {
         super(exchId, lastVer);
 
+        compressed(compress);
+
         this.client = client;
-        this.compress = compress;
     }
 
     /**
@@ -201,7 +198,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
         parts.put(cacheId, locMap);
 
         if (dupDataCache != null) {
-            assert compress;
+            assert compressed();
             assert F.isEmpty(locMap.map());
             assert parts.containsKey(dupDataCache);
 
@@ -367,9 +364,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
             if (err != null && errBytes == null)
                 errBytes0 = U.marshal(ctx, err);
 
-            if (compress) {
-                assert !compressed();
-
+            if (compressed()) {
                 try {
                     byte[] partsBytesZip = U.zip(partsBytes0);
                     byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
@@ -382,8 +377,6 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                     partHistCntrsBytes0 = partHistCntrsBytesZip;
                     partsSizesBytes0 = partsSizesBytesZip;
                     errBytes0 = exBytesZip;
-
-                    compressed(true);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(ctx.logger(getClass()), "Failed to compress 
partitions data: " + e, e);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
index af2dc63..3aa07a9 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
@@ -277,7 +277,7 @@ public class CacheGroupsMetricsRebalanceTest extends 
GridCommonAbstractTest {
             @Override public boolean apply() {
                 return 
ig2.cache(CACHE1).localMetrics().getKeysToRebalanceLeft() == 0;
             }
-        }, timeLeft + 10_000L);
+        }, timeLeft + 12_000L);
 
         log.info("[timePassed=" + timePassed + ", timeLeft=" + timeLeft +
                 ", Time to rebalance=" + (finishTime - startTime) +
@@ -292,7 +292,7 @@ public class CacheGroupsMetricsRebalanceTest extends 
GridCommonAbstractTest {
 
         long diff = finishTime - currTime;
 
-        assertTrue("Expected less than 10000, but actual: " + diff, 
Math.abs(diff) < 10_000L);
+        assertTrue("Expected less than 12000, but actual: " + diff, 
Math.abs(diff) < 12_000L);
     }
 
     /**

Reply via email to