Repository: ignite
Updated Branches:
  refs/heads/master 889ce79bb -> 70952fe92


IGNITE-9870 GridDhtPartitionsFullMessage#prepareMarshal compression 
parallelization - Fixes #5330.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/70952fe9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/70952fe9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/70952fe9

Branch: refs/heads/master
Commit: 70952fe92d5be7f7c9407783867d524544cd9fec
Parents: 889ce79
Author: Pavel Voronkin <pvoron...@gridgain.com>
Authored: Fri Nov 9 15:00:03 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Fri Nov 9 15:10:14 2018 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      |  32 ++-
 .../cache/CacheAffinitySharedManager.java       |  14 +-
 .../processors/cache/GridCacheProcessor.java    |  44 ++-
 .../GridDhtPartitionsExchangeFuture.java        |   8 +-
 .../preloader/GridDhtPartitionsFullMessage.java | 214 ++++++++------
 .../GridCacheDatabaseSharedManager.java         |   4 +-
 .../ignite/internal/util/IgniteUtils.java       | 281 +++++++++++--------
 .../util/lang/IgniteThrowableConsumer.java      |   6 +-
 .../internal/util/IgniteUtilsSelfTest.java      | 122 +++++++-
 .../ApiParity/IgniteConfigurationParityTest.cs  |   3 +-
 10 files changed, 483 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 528dbbe..6ece0e4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.lang.management.ManagementFactory;
 import java.util.Map;
 import java.util.UUID;
+import java.util.zip.Deflater;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryListener;
 import javax.cache.expiry.ExpiryPolicy;
@@ -40,7 +41,6 @@ import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeTask;
-import org.apache.ignite.spi.encryption.EncryptionSpi;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.failure.FailureHandler;
@@ -69,6 +69,7 @@ import org.apache.ignite.spi.deployment.DeploymentSpi;
 import org.apache.ignite.spi.deployment.local.LocalDeploymentSpi;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
 import org.apache.ignite.spi.eventstorage.EventStorageSpi;
 import org.apache.ignite.spi.eventstorage.NoopEventStorageSpi;
 import org.apache.ignite.spi.failover.FailoverSpi;
@@ -119,6 +120,9 @@ public class IgniteConfiguration {
     /** Default maximum timeout to wait for network responses in milliseconds 
(value is {@code 5,000ms}). */
     public static final long DFLT_NETWORK_TIMEOUT = 5000;
 
+    /** Default compression level for network messages (value is 
Deflater.BEST_SPEED. */
+    public static final int DFLT_NETWORK_COMPRESSION = Deflater.BEST_SPEED;
+
     /** Default interval between message send retries. */
     public static final long DFLT_SEND_RETRY_DELAY = 1000;
 
@@ -302,6 +306,9 @@ public class IgniteConfiguration {
     /** Maximum network requests timeout. */
     private long netTimeout = DFLT_NETWORK_TIMEOUT;
 
+    /** Compression level for network binary messages. */
+    private int netCompressionLevel = DFLT_NETWORK_COMPRESSION;
+
     /** Interval between message send retries. */
     private long sndRetryDelay = DFLT_SEND_RETRY_DELAY;
 
@@ -1478,6 +1485,29 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Compression level of internal network messages.
+     * <p>
+     * If not provided, then default value
+     * Deflater.BEST_SPEED is used.
+     *
+     * @return Network messages default compression level.
+     */
+    public int getNetworkCompressionLevel() {
+        return netCompressionLevel;
+    }
+
+    /**
+     * Compression level for internal network messages.
+     * <p>
+     * If not provided, then default value
+     * Deflater.BEST_SPEED is used.
+     *
+     */
+    public void setNetworkCompressionLevel(int netCompressionLevel) {
+        this.netCompressionLevel = netCompressionLevel;
+    }
+
+    /**
      * Interval in milliseconds between message send retries.
      * <p>
      * If not provided, then default value

http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 1283696..936dc3e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -993,6 +993,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         initAffinity(cachesRegistry.group(grp.groupId()), 
grp.affinity(), fut);
                     }
                 }
+
+                return null;
             }
         );
     }
@@ -1255,7 +1257,11 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             .collect(Collectors.toList());
 
         try {
-            U.doInParallel(cctx.kernalContext().getSystemExecutorService(), 
affinityCaches, c::applyx);
+            U.doInParallel(cctx.kernalContext().getSystemExecutorService(), 
affinityCaches, t -> {
+                c.applyx(t);
+
+                return null;
+            });
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException("Failed to execute affinity operation on 
cache groups", e);
@@ -1283,7 +1289,11 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         }
 
         try {
-            U.doInParallel(cctx.kernalContext().getSystemExecutorService(), 
affinityCaches, c::applyx);
+            U.doInParallel(cctx.kernalContext().getSystemExecutorService(), 
affinityCaches, t -> {
+                c.applyx(t);
+
+                return null;
+            });
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException("Failed to execute affinity operation on 
cache groups", e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 940e4a6..f4e5b1b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2057,19 +2057,23 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      */
     private void prepareStartCaches(
         Collection<StartCacheInfo> startCacheInfos,
-        StartCacheFailHandler<StartCacheInfo> cacheStartFailHandler
+        StartCacheFailHandler<StartCacheInfo, Void> cacheStartFailHandler
     ) throws IgniteCheckedException {
         if (!IGNITE_ALLOW_START_CACHES_IN_PARALLEL || startCacheInfos.size() 
<= 1) {
             for (StartCacheInfo startCacheInfo : startCacheInfos) {
                 cacheStartFailHandler.handle(
                     startCacheInfo,
-                    cacheInfo -> prepareCacheStart(
-                        cacheInfo.getCacheDescriptor().cacheConfiguration(),
-                        cacheInfo.getCacheDescriptor(),
-                        cacheInfo.getReqNearCfg(),
-                        cacheInfo.getExchangeTopVer(),
-                        cacheInfo.isDisabledAfterStart()
-                    )
+                    cacheInfo -> {
+                        prepareCacheStart(
+                            
cacheInfo.getCacheDescriptor().cacheConfiguration(),
+                            cacheInfo.getCacheDescriptor(),
+                            cacheInfo.getReqNearCfg(),
+                            cacheInfo.getExchangeTopVer(),
+                            cacheInfo.isDisabledAfterStart()
+                        );
+
+                        return null;
+                    }
                 );
 
                 context().exchange().exchangerUpdateHeartbeat();
@@ -2087,7 +2091,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 parallelismLvl,
                 sharedCtx.kernalContext().getSystemExecutorService(),
                 startCacheInfos,
-                startCacheInfo ->
+                startCacheInfo -> {
                     cacheStartFailHandler.handle(
                         startCacheInfo,
                         cacheInfo -> {
@@ -2101,8 +2105,13 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                             cacheContexts.put(cacheInfo, cacheCtx);
 
                             context().exchange().exchangerUpdateHeartbeat();
+
+                            return null;
                         }
-                    )
+                    );
+
+                    return null;
+                }
             );
 
             /*
@@ -2134,6 +2143,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                         }
 
                         context().exchange().exchangerUpdateHeartbeat();
+
+                        return null;
                     }
                 );
             }
@@ -2142,7 +2153,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 parallelismLvl,
                 sharedCtx.kernalContext().getSystemExecutorService(),
                 cacheContexts.entrySet(),
-                cacheCtxEntry ->
+                cacheCtxEntry -> {
                     cacheStartFailHandler.handle(
                         cacheCtxEntry.getKey(),
                         cacheInfo -> {
@@ -2154,8 +2165,13 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                                 onCacheStarted(cacheCtxEntry.getValue());
 
                             context().exchange().exchangerUpdateHeartbeat();
+
+                            return null;
                         }
-                    )
+                    );
+
+                    return null;
+                }
             );
         }
     }
@@ -5447,7 +5463,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      *
      * @param <T> Type of started data.
      */
-    private static interface StartCacheFailHandler<T> {
+    private static interface StartCacheFailHandler<T, R> {
         /**
          * Handle of fail.
          *
@@ -5455,7 +5471,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
          * @param startCacheOperation Operation for start cache.
          * @throws IgniteCheckedException if failed.
          */
-        void handle(T data, IgniteThrowableConsumer<T> startCacheOperation) 
throws IgniteCheckedException;
+        void handle(T data, IgniteThrowableConsumer<T, R> startCacheOperation) 
throws IgniteCheckedException;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index fa6f278..3702a51 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -3491,7 +3491,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         || grpCtx.config().getRebalanceMode() == 
CacheRebalanceMode.NONE
                         || grpCtx.config().getExpiryPolicyFactory() == null
                         || SKIP_PARTITION_SIZE_VALIDATION)
-                        return;
+                        return null;
 
                     try {
                         
validator.validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture.this,
 top, msgs);
@@ -3500,6 +3500,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         log.warning("Partition states validation has failed 
for group: " + grpCtx.cacheOrGroupName() + ". " + ex.getMessage());
                         // TODO: Handle such errors 
https://issues.apache.org/jira/browse/IGNITE-7833
                     }
+
+                    return null;
                 }
             );
         }
@@ -3532,6 +3534,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         assignPartitionSizes(top);
                     else
                         assignPartitionStates(top);
+
+                    return null;
                 }
             );
         }
@@ -3988,6 +3992,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                                 null);
                         }
                     }
+
+                    return null;
                 });
         }
         catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index a63ab70..4e895ed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -19,8 +19,11 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -33,6 +36,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -410,69 +414,75 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             (!F.isEmpty(errs) && errsBytes == null);
 
         if (marshal) {
-            byte[] partsBytes0 = null;
-            byte[] partCntrsBytes0 = null;
-            byte[] partCntrsBytes20 = null;
-            byte[] partHistSuppliersBytes0 = null;
-            byte[] partsToReloadBytes0 = null;
-            byte[] partsSizesBytes0 = null;
-            byte[] errsBytes0 = null;
+            // Reserve at least 2 threads for system operations.
+            int parallelismLvl = Math.max(1, 
ctx.kernalContext().config().getSystemThreadPoolSize() - 2);
+
+            Collection<Object> objectsToMarshall = new ArrayList<>();
+
+            if (!F.isEmpty(parts) && partsBytes == null)
+                objectsToMarshall.add(parts);
+
+            if (partCntrs != null && !partCntrs.empty() && partCntrsBytes == 
null)
+                objectsToMarshall.add(partCntrs);
+
+            if (partCntrs2 != null && !partCntrs2.empty() && partCntrsBytes2 
== null)
+                objectsToMarshall.add(partCntrs2);
+
+            if (partHistSuppliers != null && partHistSuppliersBytes == null)
+                objectsToMarshall.add(partHistSuppliers);
+
+            if (partsToReload != null && partsToReloadBytes == null)
+                objectsToMarshall.add(partsToReload);
+
+            if (partsSizes != null && partsSizesBytes == null)
+                objectsToMarshall.add(partsSizes);
+
+            if (!F.isEmpty(errs) && errsBytes == null)
+                objectsToMarshall.add(errs);
+
+            Collection<byte[]> marshalled = U.doInParallel(
+                parallelismLvl,
+                ctx.kernalContext().getSystemExecutorService(),
+                objectsToMarshall,
+                new IgniteThrowableConsumer<Object, byte[]>() {
+                    @Override public byte[] accept(Object payload) throws 
IgniteCheckedException {
+                        byte[] marshalled = U.marshal(ctx, payload);
+
+                        if(compress)
+                            marshalled = U.zip(marshalled, 
ctx.gridConfig().getNetworkCompressionLevel());
+
+                        return marshalled;
+                    }
+            });
+
+            Iterator<byte[]> iterator = marshalled.iterator();
 
             if (!F.isEmpty(parts) && partsBytes == null)
-                partsBytes0 = U.marshal(ctx, parts);
+                partsBytes = iterator.next();
 
             if (partCntrs != null && !partCntrs.empty() && partCntrsBytes == 
null)
-                partCntrsBytes0 = U.marshal(ctx, partCntrs);
+                partCntrsBytes = iterator.next();
 
             if (partCntrs2 != null && !partCntrs2.empty() && partCntrsBytes2 
== null)
-                partCntrsBytes20 = U.marshal(ctx, partCntrs2);
+                partCntrsBytes2 = iterator.next();
 
             if (partHistSuppliers != null && partHistSuppliersBytes == null)
-                partHistSuppliersBytes0 = U.marshal(ctx, partHistSuppliers);
+                partHistSuppliersBytes = iterator.next();
 
             if (partsToReload != null && partsToReloadBytes == null)
-                partsToReloadBytes0 = U.marshal(ctx, partsToReload);
+                partsToReloadBytes = iterator.next();
 
             if (partsSizes != null && partsSizesBytes == null)
-                partsSizesBytes0 = U.marshal(ctx, partsSizes);
+                partsSizesBytes = iterator.next();
 
             if (!F.isEmpty(errs) && errsBytes == null)
-                errsBytes0 = U.marshal(ctx, errs);
+                errsBytes = iterator.next();
 
             if (compress) {
-                assert !compressed();
-
-                try {
-                    byte[] partsBytesZip = U.zip(partsBytes0);
-                    byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
-                    byte[] partCntrsBytes2Zip = U.zip(partCntrsBytes20);
-                    byte[] partHistSuppliersBytesZip = 
U.zip(partHistSuppliersBytes0);
-                    byte[] partsToReloadBytesZip = U.zip(partsToReloadBytes0);
-                    byte[] partsSizesBytesZip = U.zip(partsSizesBytes0);
-                    byte[] exsBytesZip = U.zip(errsBytes0);
-
-                    partsBytes0 = partsBytesZip;
-                    partCntrsBytes0 = partCntrsBytesZip;
-                    partCntrsBytes20 = partCntrsBytes2Zip;
-                    partHistSuppliersBytes0 = partHistSuppliersBytesZip;
-                    partsToReloadBytes0 = partsToReloadBytesZip;
-                    partsSizesBytes0 = partsSizesBytesZip;
-                    errsBytes0 = exsBytesZip;
-
-                    compressed(true);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(ctx.logger(getClass()), "Failed to compress 
partitions data: " + e, e);
-                }
-            }
+                assert !compressed() : "Unexpected compressed state";
 
-            partsBytes = partsBytes0;
-            partCntrsBytes = partCntrsBytes0;
-            partCntrsBytes2 = partCntrsBytes20;
-            partHistSuppliersBytes = partHistSuppliersBytes0;
-            partsToReloadBytes = partsToReloadBytes0;
-            partsSizesBytes = partsSizesBytes0;
-            errsBytes = errsBytes0;
+                compressed(true);
+            }
         }
     }
 
@@ -494,11 +504,51 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, 
ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
+        ClassLoader classLoader = U.resolveClassLoader(ldr, ctx.gridConfig());
+
+        Collection<byte[]> objectsToUnmarshall = new ArrayList<>();
+
+        // Reserve at least 2 threads for system operations.
+        int parallelismLvl = Math.max(1, 
ctx.kernalContext().config().getSystemThreadPoolSize() - 2);
+
+        if (partsBytes != null && parts == null)
+            objectsToUnmarshall.add(partsBytes);
+
+        if (partCntrsBytes != null && partCntrs == null)
+            objectsToUnmarshall.add(partCntrsBytes);
+
+        if (partCntrsBytes2 != null && partCntrs2 == null)
+            objectsToUnmarshall.add(partCntrsBytes2);
+
+        if (partHistSuppliersBytes != null && partHistSuppliers == null)
+            objectsToUnmarshall.add(partHistSuppliersBytes);
+
+        if (partsToReloadBytes != null && partsToReload == null)
+            objectsToUnmarshall.add(partsToReloadBytes);
+
+        if (partsSizesBytes != null && partsSizes == null)
+            objectsToUnmarshall.add(partsSizesBytes);
+
+        if (errsBytes != null && errs == null)
+            objectsToUnmarshall.add(errsBytes);
+
+        Collection<Object> unmarshalled = U.doInParallel(
+            parallelismLvl,
+            ctx.kernalContext().getSystemExecutorService(),
+            objectsToUnmarshall,
+            new IgniteThrowableConsumer<byte[], Object>() {
+                @Override public Object accept(byte[] binary) throws 
IgniteCheckedException {
+                    return compressed()
+                        ? U.unmarshalZip(ctx.marshaller(), binary, classLoader)
+                        : U.unmarshal(ctx, binary, classLoader);
+                }
+            }
+        );
+
+        Iterator<Object> iterator = unmarshalled.iterator();
+
         if (partsBytes != null && parts == null) {
-            if (compressed())
-                parts = U.unmarshalZip(ctx.marshaller(), partsBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
-            else
-                parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, 
ctx.gridConfig()));
+            parts = (Map<Integer, GridDhtPartitionFullMap>)iterator.next();
 
             if (dupPartsData != null) {
                 assert parts != null;
@@ -528,53 +578,41 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             }
         }
 
-        if (parts == null)
-            parts = new HashMap<>();
+        if (partCntrsBytes != null && partCntrs == null)
+            partCntrs = (IgniteDhtPartitionCountersMap)iterator.next();
 
-        if (partCntrsBytes != null && partCntrs == null) {
-            if (compressed())
-                partCntrs = U.unmarshalZip(ctx.marshaller(), partCntrsBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
-            else
-                partCntrs = U.unmarshal(ctx, partCntrsBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
-        }
+        if (partCntrsBytes2 != null && partCntrs2 == null)
+            partCntrs2 = (IgniteDhtPartitionCountersMap2)iterator.next();
 
-        if (partCntrsBytes2 != null && partCntrs2 == null) {
-            if (compressed())
-                partCntrs2 = U.unmarshalZip(ctx.marshaller(), partCntrsBytes2, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
-            else
-                partCntrs2 = U.unmarshal(ctx, partCntrsBytes2, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
-        }
+        if (partHistSuppliersBytes != null && partHistSuppliers == null)
+            partHistSuppliers = 
(IgniteDhtPartitionHistorySuppliersMap)iterator.next();
 
-        if (partHistSuppliersBytes != null && partHistSuppliers == null) {
-            if (compressed())
-                partHistSuppliers = U.unmarshalZip(ctx.marshaller(), 
partHistSuppliersBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
-            else
-                partHistSuppliers = U.unmarshal(ctx, partHistSuppliersBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
-        }
+        if (partsToReloadBytes != null && partsToReload == null)
+            partsToReload = (IgniteDhtPartitionsToReloadMap)iterator.next();
 
-        if (partsToReloadBytes != null && partsToReload == null) {
-            if (compressed())
-                partsToReload = U.unmarshalZip(ctx.marshaller(), 
partsToReloadBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
-            else
-                partsToReload = U.unmarshal(ctx, partsToReloadBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
-        }
+        if (partsSizesBytes != null && partsSizes == null)
+            partsSizes = (Map<Integer, Map<Integer, Long>>)iterator.next();
 
-        if (partsSizesBytes != null && partsSizes == null) {
-            if (compressed())
-                partsSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
-            else
-                partsSizes = U.unmarshal(ctx, partsSizesBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
-        }
+        if (errsBytes != null && errs == null)
+            errs = (Map<UUID, Exception>)iterator.next();
+
+        if (parts == null)
+            parts = new HashMap<>();
 
         if (partCntrs == null)
             partCntrs = new IgniteDhtPartitionCountersMap();
 
-        if (errsBytes != null && errs == null) {
-            if (compressed())
-                errs = U.unmarshalZip(ctx.marshaller(), errsBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
-            else
-                errs = U.unmarshal(ctx, errsBytes, U.resolveClassLoader(ldr, 
ctx.gridConfig()));
-        }
+        if (partCntrs2 == null)
+            partCntrs2 = new IgniteDhtPartitionCountersMap2();
+
+        if(partHistSuppliers == null)
+            partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
+
+        if(partsToReload == null)
+            partsToReload = new IgniteDhtPartitionsToReloadMap();
+
+        if(partsSizes == null)
+            partsSizes = new HashMap<>();
 
         if (errs == null)
             errs = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index b517da6..94ac100 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1348,7 +1348,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                 cctx.cache().cacheGroups(),
                 cacheGroup -> {
                     if (cacheGroup.isLocal())
-                        return;
+                        return null;
 
                     cctx.database().checkpointReadLock();
 
@@ -1361,6 +1361,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                     finally {
                         cctx.database().checkpointReadUnlock();
                     }
+
+                    return null;
                 }
             );
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 68219e6..f2be434 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -17,6 +17,113 @@
 
 package org.apache.ignite.internal.util;
 
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteDeploymentException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.cluster.ClusterGroupEmptyException;
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskCancelledException;
+import org.apache.ignite.compute.ComputeTaskName;
+import org.apache.ignite.compute.ComputeTaskTimeoutException;
+import org.apache.ignite.configuration.AddressResolver;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException;
+import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.mxbean.IgniteStandardMXBean;
+import org.apache.ignite.internal.processors.cache.CacheClassLoaderMarker;
+import org.apache.ignite.internal.processors.cache.GridCacheAttributes;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cluster.BaselineTopology;
+import 
org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
+import 
org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
+import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.io.GridFilenameUtils;
+import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryNativeLoader;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
+import org.apache.ignite.internal.util.lang.GridTuple;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFutureCancelledException;
+import org.apache.ignite.lang.IgniteFutureTimeoutException;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.IgniteSpi;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+import org.apache.ignite.transactions.TransactionDeadlockException;
+import org.apache.ignite.transactions.TransactionHeuristicException;
+import org.apache.ignite.transactions.TransactionOptimisticException;
+import org.apache.ignite.transactions.TransactionRollbackException;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import sun.misc.Unsafe;
+
+import javax.management.DynamicMBean;
+import javax.management.JMException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
@@ -143,116 +250,11 @@ import java.util.logging.Logger;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.zip.Deflater;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
 import java.util.zip.ZipInputStream;
 import java.util.zip.ZipOutputStream;
-import javax.management.DynamicMBean;
-import javax.management.JMException;
-import javax.management.MBeanRegistrationException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSession;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.X509TrustManager;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteClientDisconnectedException;
-import org.apache.ignite.IgniteDeploymentException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteIllegalStateException;
-import org.apache.ignite.IgniteInterruptedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.binary.BinaryRawReader;
-import org.apache.ignite.binary.BinaryRawWriter;
-import org.apache.ignite.cluster.ClusterGroupEmptyException;
-import org.apache.ignite.cluster.ClusterMetrics;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterTopologyException;
-import org.apache.ignite.compute.ComputeTask;
-import org.apache.ignite.compute.ComputeTaskCancelledException;
-import org.apache.ignite.compute.ComputeTaskName;
-import org.apache.ignite.compute.ComputeTaskTimeoutException;
-import org.apache.ignite.configuration.AddressResolver;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
-import org.apache.ignite.internal.IgniteDeploymentCheckedException;
-import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.IgniteNodeAttributes;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException;
-import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
-import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.communication.GridIoManager;
-import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
-import org.apache.ignite.internal.mxbean.IgniteStandardMXBean;
-import org.apache.ignite.internal.processors.cache.CacheClassLoaderMarker;
-import org.apache.ignite.internal.processors.cache.GridCacheAttributes;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cluster.BaselineTopology;
-import 
org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
-import 
org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
-import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
-import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.future.IgniteFutureImpl;
-import org.apache.ignite.internal.util.io.GridFilenameUtils;
-import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryNativeLoader;
-import org.apache.ignite.internal.util.lang.GridClosureException;
-import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
-import org.apache.ignite.internal.util.lang.GridTuple;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.SB;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgniteFutureCancelledException;
-import org.apache.ignite.lang.IgniteFutureTimeoutException;
-import org.apache.ignite.lang.IgniteOutClosure;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteProductVersion;
-import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.lifecycle.LifecycleAware;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.PluginProvider;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.apache.ignite.spi.IgniteSpi;
-import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
-import org.apache.ignite.transactions.TransactionDeadlockException;
-import org.apache.ignite.transactions.TransactionHeuristicException;
-import org.apache.ignite.transactions.TransactionOptimisticException;
-import org.apache.ignite.transactions.TransactionRollbackException;
-import org.apache.ignite.transactions.TransactionTimeoutException;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-import sun.misc.Unsafe;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_HOSTNAME_VERIFIER;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_HOME;
@@ -10324,11 +10326,23 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * Zip binary payload using default compression.
+     *
      * @param bytes Byte array to compress.
      * @return Compressed bytes.
      * @throws IgniteCheckedException If failed.
      */
     public static byte[] zip(@Nullable byte[] bytes) throws 
IgniteCheckedException {
+        return zip(bytes, Deflater.DEFAULT_COMPRESSION);
+    }
+
+    /**
+     * @param bytes Byte array to compress.
+     * @param compressionLevel Level of compression to encode.
+     * @return Compressed bytes.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static byte[] zip(@Nullable byte[] bytes, int compressionLevel) 
throws IgniteCheckedException {
         try {
             if (bytes == null)
                 return null;
@@ -10336,6 +10350,8 @@ public abstract class IgniteUtils {
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
 
             try (ZipOutputStream zos = new ZipOutputStream(bos)) {
+                zos.setLevel(compressionLevel);
+
                 ZipEntry entry = new ZipEntry("");
 
                 try {
@@ -10740,9 +10756,12 @@ public abstract class IgniteUtils {
      * @param <T> Type of data.
      * @throws IgniteCheckedException if parallel execution was failed.
      */
-    public static <T> void doInParallel(ExecutorService executorSvc, 
Collection<T> srcDatas,
-        IgniteThrowableConsumer<T> operation) throws IgniteCheckedException, 
IgniteInterruptedCheckedException {
-        doInParallel(srcDatas.size(), executorSvc, srcDatas, operation);
+    public static <T, R> Collection<R> doInParallel(
+        ExecutorService executorSvc,
+        Collection<T> srcDatas,
+        IgniteThrowableConsumer<T, R> operation
+    ) throws IgniteCheckedException, IgniteInterruptedCheckedException {
+        return doInParallel(srcDatas.size(), executorSvc, srcDatas, operation);
     }
 
     /**
@@ -10753,38 +10772,58 @@ public abstract class IgniteUtils {
      * @param srcDatas List of data for parallelization.
      * @param operation Logic for execution of on each item of data.
      * @param <T> Type of data.
+     * @param <R> Type of return value.
      * @throws IgniteCheckedException if parallel execution was failed.
      */
-    public static <T> void doInParallel(
+    public static <T, R> Collection<R> doInParallel(
         int parallelismLvl,
         ExecutorService executorSvc,
         Collection<T> srcDatas,
-        IgniteThrowableConsumer<T> operation
+        IgniteThrowableConsumer<T, R> operation
     ) throws IgniteCheckedException, IgniteInterruptedCheckedException {
+        if(srcDatas.isEmpty())
+            return Collections.emptyList();
+
+        int batchSize = srcDatas.size() / parallelismLvl;
+
+        final int finalBatchSize = batchSize == 0 ? srcDatas.size() : 
batchSize;
+
         List<List<T>> batches = IntStream.range(0, parallelismLvl)
-            .mapToObj(i -> new ArrayList<T>())
+            .mapToObj(i -> new ArrayList<T>(finalBatchSize))
             .collect(Collectors.toList());
 
-        int i = 0;
+        int batchIndex = 0;
+
+        final int maxBatchIndex = batches.size() -1;
 
-        for (T src : srcDatas)
-            batches.get(i++ % parallelismLvl).add(src);
+        List<T> currentBatch = batches.get(batchIndex);
 
-        List<Future<Object>> consumerFutures = batches.stream()
+        for (T src : srcDatas) {
+            currentBatch.add(src);
+
+            if(currentBatch.size() >= batchSize && batchIndex < maxBatchIndex)
+                currentBatch = batches.get(++batchIndex);
+        }
+
+        List<Future<Collection<R>>> consumerFutures = batches.stream()
             .filter(batch -> !batch.isEmpty())
             .map(batch -> executorSvc.submit(() -> {
+                Collection<R> results = new ArrayList<>(batch.size());
+
                 for (T item : batch)
-                    operation.accept(item);
+                    results.add(operation.accept(item));
 
-                return null;
+                return results;
             }))
             .collect(Collectors.toList());
 
         Throwable error =null;
 
-        for (Future<Object> future : consumerFutures) {
+        Collection<R> results = new ArrayList<>(srcDatas.size());
+
+        for (Future<Collection<R>> future : consumerFutures) {
             try {
-                future.get();
+                results.addAll(future.get());
             }
             catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -10817,6 +10856,8 @@ public abstract class IgniteUtils {
 
             throw new IgniteCheckedException(error);
         }
+
+        return results;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
index 46813a9..55feed8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java
@@ -25,13 +25,15 @@ import org.apache.ignite.IgniteCheckedException;
  * interfaces, {@code IgniteThrowableConsumer} is expected to operate via 
side-effects.
  *
  * @param <E> Type of closure parameter.
+ * @param <R> Type of result value.
  */
-public interface IgniteThrowableConsumer<E> extends Serializable {
+public interface IgniteThrowableConsumer<E, R> extends Serializable {
     /**
      * Consumer body.
      *
      * @param e Consumer parameter.
+     * @return Result of consumer operation.
      * @throws IgniteCheckedException if body execution was failed.
      */
-    public void accept(E e) throws IgniteCheckedException;
+    public R accept(E e) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 5de12a2..070fd4a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -45,9 +45,11 @@ import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterGroup;
@@ -885,18 +887,26 @@ public class IgniteUtilsSelfTest extends 
GridCommonAbstractTest {
     public void testDoInParallel() throws Throwable {
         CyclicBarrier barrier = new CyclicBarrier(3);
 
-        IgniteUtils.doInParallel(3,
-            Executors.newFixedThreadPool(3),
-            Arrays.asList(1, 2, 3),
-            i -> {
-                try {
-                    barrier.await(1, TimeUnit.SECONDS);
-                }
-                catch (Exception e) {
-                    throw new IgniteCheckedException(e);
+        ExecutorService executorService = Executors.newFixedThreadPool(3);
+
+        try {
+            IgniteUtils.doInParallel(3,
+                executorService,
+                Arrays.asList(1, 2, 3),
+                i -> {
+                    try {
+                        barrier.await(1, TimeUnit.SECONDS);
+                    }
+                    catch (Exception e) {
+                        throw new IgniteCheckedException(e);
+                    }
+
+                    return null;
                 }
-            }
-        );
+            );
+        } finally {
+            executorService.shutdownNow();
+        }
     }
 
     /**
@@ -905,9 +915,11 @@ public class IgniteUtilsSelfTest extends 
GridCommonAbstractTest {
     public void testDoInParallelBatch() {
         CyclicBarrier barrier = new CyclicBarrier(3);
 
+        ExecutorService executorService = Executors.newFixedThreadPool(3);
+
         try {
             IgniteUtils.doInParallel(2,
-                Executors.newFixedThreadPool(3),
+                executorService,
                 Arrays.asList(1, 2, 3),
                 i -> {
                     try {
@@ -916,6 +928,8 @@ public class IgniteUtilsSelfTest extends 
GridCommonAbstractTest {
                     catch (Exception e) {
                         throw new IgniteCheckedException(e);
                     }
+
+                    return null;
                 }
             );
 
@@ -923,22 +937,98 @@ public class IgniteUtilsSelfTest extends 
GridCommonAbstractTest {
         }
         catch (Exception e) {
             assertTrue(e.toString(), X.hasCause(e, TimeoutException.class));
+        } finally {
+            executorService.shutdownNow();
         }
     }
 
     /**
+     * Test parallel execution in order.
+     */
+    public void testDoInParallelResultsOrder() throws IgniteCheckedException {
+        ExecutorService executorService = Executors.newFixedThreadPool(4);
+
+        try {
+            testOrder(executorService, 1, 1);
+            testOrder(executorService, 2, 1);
+            testOrder(executorService, 3, 1);
+            testOrder(executorService, 9, 1);
+            testOrder(executorService, 10, 1);
+            testOrder(executorService, 9999, 1);
+
+            testOrder(executorService, 1, 2);
+            testOrder(executorService, 2, 2);
+            testOrder(executorService, 3, 2);
+            testOrder(executorService, 9, 2);
+            testOrder(executorService, 10, 2);
+            testOrder(executorService, 9999, 2);
+
+            testOrder(executorService, 1, 3);
+            testOrder(executorService, 2, 3);
+            testOrder(executorService, 3, 3);
+            testOrder(executorService, 3, 3);
+            testOrder(executorService, 10, 3);
+            testOrder(executorService, 9999, 3);
+
+            testOrder(executorService, 1, 4);
+            testOrder(executorService, 2, 4);
+            testOrder(executorService, 3, 4);
+            testOrder(executorService, 9, 4);
+            testOrder(executorService, 10, 4);
+            testOrder(executorService, 9999, 4);
+        } finally {
+            executorService.shutdownNow();
+        }
+    }
+
+    /**
+     * Template method to test parallel execution
+     * @param executorService ExecutorService.
+     * @param size Size.
+     * @param parallelism Parallelism.
+     * @throws IgniteCheckedException Exception.
+     */
+    private void testOrder(ExecutorService executorService, int size, int 
parallelism) throws IgniteCheckedException {
+        List<Integer> list = new ArrayList<>();
+        for(int i = 0; i < size; i++)
+            list.add(i);
+
+        Collection<Integer> results = IgniteUtils.doInParallel(
+            parallelism,
+            executorService,
+            list,
+            i -> i * 2
+        );
+
+        assertEquals(list.size(), results.size());
+
+        final int[] i = {0};
+        results.forEach(new Consumer<Integer>() {
+            @Override public void accept(Integer integer) {
+                assertEquals(2 * list.get(i[0]), integer.intValue());
+                i[0]++;
+            }
+        });
+    }
+
+    /**
      *
      */
     public void testDoInParallelException() {
         String expectedException = "ExpectedException";
 
+        ExecutorService executorService = Executors.newFixedThreadPool(1);
+
         try {
-            IgniteUtils.doInParallel(3,
-                Executors.newFixedThreadPool(1),
+            IgniteUtils.doInParallel(
+                1,
+                executorService,
                 Arrays.asList(1, 2, 3),
                 i -> {
-                    if (i == 1)
+                    if (Integer.valueOf(1).equals(i))
                         throw new IgniteCheckedException(expectedException);
+
+                    return  null;
                 }
             );
 
@@ -946,6 +1036,8 @@ public class IgniteUtilsSelfTest extends 
GridCommonAbstractTest {
         }
         catch (IgniteCheckedException e) {
             assertEquals(expectedException, e.getMessage());
+        } finally {
+            executorService.shutdownNow();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
----------------------------------------------------------------------
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
index 71d2ba0..a50d48a 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs
@@ -81,7 +81,8 @@ namespace Apache.Ignite.Core.Tests.ApiParity
             "TimeServerPortBase",
             "TimeServerPortRange",
             "IncludeProperties",
-            "isAutoActivationEnabled"  // IGNITE-7301
+            "isAutoActivationEnabled",  // IGNITE-7301
+            "NetworkCompressionLevel"
         };
 
         /// <summary>

Reply via email to