Repository: ignite Updated Branches: refs/heads/master 889ce79bb -> 70952fe92
IGNITE-9870 GridDhtPartitionsFullMessage#prepareMarshal compression parallelization - Fixes #5330. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/70952fe9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/70952fe9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/70952fe9 Branch: refs/heads/master Commit: 70952fe92d5be7f7c9407783867d524544cd9fec Parents: 889ce79 Author: Pavel Voronkin <pvoron...@gridgain.com> Authored: Fri Nov 9 15:00:03 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Nov 9 15:10:14 2018 +0300 ---------------------------------------------------------------------- .../configuration/IgniteConfiguration.java | 32 ++- .../cache/CacheAffinitySharedManager.java | 14 +- .../processors/cache/GridCacheProcessor.java | 44 ++- .../GridDhtPartitionsExchangeFuture.java | 8 +- .../preloader/GridDhtPartitionsFullMessage.java | 214 ++++++++------ .../GridCacheDatabaseSharedManager.java | 4 +- .../ignite/internal/util/IgniteUtils.java | 281 +++++++++++-------- .../util/lang/IgniteThrowableConsumer.java | 6 +- .../internal/util/IgniteUtilsSelfTest.java | 122 +++++++- .../ApiParity/IgniteConfigurationParityTest.cs | 3 +- 10 files changed, 483 insertions(+), 245 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 528dbbe..6ece0e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.lang.management.ManagementFactory; import java.util.Map; import java.util.UUID; +import java.util.zip.Deflater; import javax.cache.configuration.Factory; import javax.cache.event.CacheEntryListener; import javax.cache.expiry.ExpiryPolicy; @@ -40,7 +41,6 @@ import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeTask; -import org.apache.ignite.spi.encryption.EncryptionSpi; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.failure.FailureHandler; @@ -69,6 +69,7 @@ import org.apache.ignite.spi.deployment.DeploymentSpi; import org.apache.ignite.spi.deployment.local.LocalDeploymentSpi; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.encryption.EncryptionSpi; import org.apache.ignite.spi.eventstorage.EventStorageSpi; import org.apache.ignite.spi.eventstorage.NoopEventStorageSpi; import org.apache.ignite.spi.failover.FailoverSpi; @@ -119,6 +120,9 @@ public class IgniteConfiguration { /** Default maximum timeout to wait for network responses in milliseconds (value is {@code 5,000ms}). */ public static final long DFLT_NETWORK_TIMEOUT = 5000; + /** Default compression level for network messages (value is Deflater.BEST_SPEED. */ + public static final int DFLT_NETWORK_COMPRESSION = Deflater.BEST_SPEED; + /** Default interval between message send retries. */ public static final long DFLT_SEND_RETRY_DELAY = 1000; @@ -302,6 +306,9 @@ public class IgniteConfiguration { /** Maximum network requests timeout. */ private long netTimeout = DFLT_NETWORK_TIMEOUT; + /** Compression level for network binary messages. */ + private int netCompressionLevel = DFLT_NETWORK_COMPRESSION; + /** Interval between message send retries. */ private long sndRetryDelay = DFLT_SEND_RETRY_DELAY; @@ -1478,6 +1485,29 @@ public class IgniteConfiguration { } /** + * Compression level of internal network messages. + * <p> + * If not provided, then default value + * Deflater.BEST_SPEED is used. + * + * @return Network messages default compression level. + */ + public int getNetworkCompressionLevel() { + return netCompressionLevel; + } + + /** + * Compression level for internal network messages. + * <p> + * If not provided, then default value + * Deflater.BEST_SPEED is used. + * + */ + public void setNetworkCompressionLevel(int netCompressionLevel) { + this.netCompressionLevel = netCompressionLevel; + } + + /** * Interval in milliseconds between message send retries. * <p> * If not provided, then default value http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 1283696..936dc3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -993,6 +993,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap initAffinity(cachesRegistry.group(grp.groupId()), grp.affinity(), fut); } } + + return null; } ); } @@ -1255,7 +1257,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap .collect(Collectors.toList()); try { - U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, c::applyx); + U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, t -> { + c.applyx(t); + + return null; + }); } catch (IgniteCheckedException e) { throw new IgniteException("Failed to execute affinity operation on cache groups", e); @@ -1283,7 +1289,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } try { - U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, c::applyx); + U.doInParallel(cctx.kernalContext().getSystemExecutorService(), affinityCaches, t -> { + c.applyx(t); + + return null; + }); } catch (IgniteCheckedException e) { throw new IgniteException("Failed to execute affinity operation on cache groups", e); http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 940e4a6..f4e5b1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2057,19 +2057,23 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ private void prepareStartCaches( Collection<StartCacheInfo> startCacheInfos, - StartCacheFailHandler<StartCacheInfo> cacheStartFailHandler + StartCacheFailHandler<StartCacheInfo, Void> cacheStartFailHandler ) throws IgniteCheckedException { if (!IGNITE_ALLOW_START_CACHES_IN_PARALLEL || startCacheInfos.size() <= 1) { for (StartCacheInfo startCacheInfo : startCacheInfos) { cacheStartFailHandler.handle( startCacheInfo, - cacheInfo -> prepareCacheStart( - cacheInfo.getCacheDescriptor().cacheConfiguration(), - cacheInfo.getCacheDescriptor(), - cacheInfo.getReqNearCfg(), - cacheInfo.getExchangeTopVer(), - cacheInfo.isDisabledAfterStart() - ) + cacheInfo -> { + prepareCacheStart( + cacheInfo.getCacheDescriptor().cacheConfiguration(), + cacheInfo.getCacheDescriptor(), + cacheInfo.getReqNearCfg(), + cacheInfo.getExchangeTopVer(), + cacheInfo.isDisabledAfterStart() + ); + + return null; + } ); context().exchange().exchangerUpdateHeartbeat(); @@ -2087,7 +2091,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { parallelismLvl, sharedCtx.kernalContext().getSystemExecutorService(), startCacheInfos, - startCacheInfo -> + startCacheInfo -> { cacheStartFailHandler.handle( startCacheInfo, cacheInfo -> { @@ -2101,8 +2105,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { cacheContexts.put(cacheInfo, cacheCtx); context().exchange().exchangerUpdateHeartbeat(); + + return null; } - ) + ); + + return null; + } ); /* @@ -2134,6 +2143,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { } context().exchange().exchangerUpdateHeartbeat(); + + return null; } ); } @@ -2142,7 +2153,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { parallelismLvl, sharedCtx.kernalContext().getSystemExecutorService(), cacheContexts.entrySet(), - cacheCtxEntry -> + cacheCtxEntry -> { cacheStartFailHandler.handle( cacheCtxEntry.getKey(), cacheInfo -> { @@ -2154,8 +2165,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { onCacheStarted(cacheCtxEntry.getValue()); context().exchange().exchangerUpdateHeartbeat(); + + return null; } - ) + ); + + return null; + } ); } } @@ -5447,7 +5463,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * * @param <T> Type of started data. */ - private static interface StartCacheFailHandler<T> { + private static interface StartCacheFailHandler<T, R> { /** * Handle of fail. * @@ -5455,7 +5471,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param startCacheOperation Operation for start cache. * @throws IgniteCheckedException if failed. */ - void handle(T data, IgniteThrowableConsumer<T> startCacheOperation) throws IgniteCheckedException; + void handle(T data, IgniteThrowableConsumer<T, R> startCacheOperation) throws IgniteCheckedException; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index fa6f278..3702a51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -3491,7 +3491,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE || grpCtx.config().getExpiryPolicyFactory() == null || SKIP_PARTITION_SIZE_VALIDATION) - return; + return null; try { validator.validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture.this, top, msgs); @@ -3500,6 +3500,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte log.warning("Partition states validation has failed for group: " + grpCtx.cacheOrGroupName() + ". " + ex.getMessage()); // TODO: Handle such errors https://issues.apache.org/jira/browse/IGNITE-7833 } + + return null; } ); } @@ -3532,6 +3534,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assignPartitionSizes(top); else assignPartitionStates(top); + + return null; } ); } @@ -3988,6 +3992,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte null); } } + + return null; }); } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index a63ab70..4e895ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -19,8 +19,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.io.Externalizable; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -33,6 +36,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; @@ -410,69 +414,75 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa (!F.isEmpty(errs) && errsBytes == null); if (marshal) { - byte[] partsBytes0 = null; - byte[] partCntrsBytes0 = null; - byte[] partCntrsBytes20 = null; - byte[] partHistSuppliersBytes0 = null; - byte[] partsToReloadBytes0 = null; - byte[] partsSizesBytes0 = null; - byte[] errsBytes0 = null; + // Reserve at least 2 threads for system operations. + int parallelismLvl = Math.max(1, ctx.kernalContext().config().getSystemThreadPoolSize() - 2); + + Collection<Object> objectsToMarshall = new ArrayList<>(); + + if (!F.isEmpty(parts) && partsBytes == null) + objectsToMarshall.add(parts); + + if (partCntrs != null && !partCntrs.empty() && partCntrsBytes == null) + objectsToMarshall.add(partCntrs); + + if (partCntrs2 != null && !partCntrs2.empty() && partCntrsBytes2 == null) + objectsToMarshall.add(partCntrs2); + + if (partHistSuppliers != null && partHistSuppliersBytes == null) + objectsToMarshall.add(partHistSuppliers); + + if (partsToReload != null && partsToReloadBytes == null) + objectsToMarshall.add(partsToReload); + + if (partsSizes != null && partsSizesBytes == null) + objectsToMarshall.add(partsSizes); + + if (!F.isEmpty(errs) && errsBytes == null) + objectsToMarshall.add(errs); + + Collection<byte[]> marshalled = U.doInParallel( + parallelismLvl, + ctx.kernalContext().getSystemExecutorService(), + objectsToMarshall, + new IgniteThrowableConsumer<Object, byte[]>() { + @Override public byte[] accept(Object payload) throws IgniteCheckedException { + byte[] marshalled = U.marshal(ctx, payload); + + if(compress) + marshalled = U.zip(marshalled, ctx.gridConfig().getNetworkCompressionLevel()); + + return marshalled; + } + }); + + Iterator<byte[]> iterator = marshalled.iterator(); if (!F.isEmpty(parts) && partsBytes == null) - partsBytes0 = U.marshal(ctx, parts); + partsBytes = iterator.next(); if (partCntrs != null && !partCntrs.empty() && partCntrsBytes == null) - partCntrsBytes0 = U.marshal(ctx, partCntrs); + partCntrsBytes = iterator.next(); if (partCntrs2 != null && !partCntrs2.empty() && partCntrsBytes2 == null) - partCntrsBytes20 = U.marshal(ctx, partCntrs2); + partCntrsBytes2 = iterator.next(); if (partHistSuppliers != null && partHistSuppliersBytes == null) - partHistSuppliersBytes0 = U.marshal(ctx, partHistSuppliers); + partHistSuppliersBytes = iterator.next(); if (partsToReload != null && partsToReloadBytes == null) - partsToReloadBytes0 = U.marshal(ctx, partsToReload); + partsToReloadBytes = iterator.next(); if (partsSizes != null && partsSizesBytes == null) - partsSizesBytes0 = U.marshal(ctx, partsSizes); + partsSizesBytes = iterator.next(); if (!F.isEmpty(errs) && errsBytes == null) - errsBytes0 = U.marshal(ctx, errs); + errsBytes = iterator.next(); if (compress) { - assert !compressed(); - - try { - byte[] partsBytesZip = U.zip(partsBytes0); - byte[] partCntrsBytesZip = U.zip(partCntrsBytes0); - byte[] partCntrsBytes2Zip = U.zip(partCntrsBytes20); - byte[] partHistSuppliersBytesZip = U.zip(partHistSuppliersBytes0); - byte[] partsToReloadBytesZip = U.zip(partsToReloadBytes0); - byte[] partsSizesBytesZip = U.zip(partsSizesBytes0); - byte[] exsBytesZip = U.zip(errsBytes0); - - partsBytes0 = partsBytesZip; - partCntrsBytes0 = partCntrsBytesZip; - partCntrsBytes20 = partCntrsBytes2Zip; - partHistSuppliersBytes0 = partHistSuppliersBytesZip; - partsToReloadBytes0 = partsToReloadBytesZip; - partsSizesBytes0 = partsSizesBytesZip; - errsBytes0 = exsBytesZip; - - compressed(true); - } - catch (IgniteCheckedException e) { - U.error(ctx.logger(getClass()), "Failed to compress partitions data: " + e, e); - } - } + assert !compressed() : "Unexpected compressed state"; - partsBytes = partsBytes0; - partCntrsBytes = partCntrsBytes0; - partCntrsBytes2 = partCntrsBytes20; - partHistSuppliersBytes = partHistSuppliersBytes0; - partsToReloadBytes = partsToReloadBytes0; - partsSizesBytes = partsSizesBytes0; - errsBytes = errsBytes0; + compressed(true); + } } } @@ -494,11 +504,51 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); + ClassLoader classLoader = U.resolveClassLoader(ldr, ctx.gridConfig()); + + Collection<byte[]> objectsToUnmarshall = new ArrayList<>(); + + // Reserve at least 2 threads for system operations. + int parallelismLvl = Math.max(1, ctx.kernalContext().config().getSystemThreadPoolSize() - 2); + + if (partsBytes != null && parts == null) + objectsToUnmarshall.add(partsBytes); + + if (partCntrsBytes != null && partCntrs == null) + objectsToUnmarshall.add(partCntrsBytes); + + if (partCntrsBytes2 != null && partCntrs2 == null) + objectsToUnmarshall.add(partCntrsBytes2); + + if (partHistSuppliersBytes != null && partHistSuppliers == null) + objectsToUnmarshall.add(partHistSuppliersBytes); + + if (partsToReloadBytes != null && partsToReload == null) + objectsToUnmarshall.add(partsToReloadBytes); + + if (partsSizesBytes != null && partsSizes == null) + objectsToUnmarshall.add(partsSizesBytes); + + if (errsBytes != null && errs == null) + objectsToUnmarshall.add(errsBytes); + + Collection<Object> unmarshalled = U.doInParallel( + parallelismLvl, + ctx.kernalContext().getSystemExecutorService(), + objectsToUnmarshall, + new IgniteThrowableConsumer<byte[], Object>() { + @Override public Object accept(byte[] binary) throws IgniteCheckedException { + return compressed() + ? U.unmarshalZip(ctx.marshaller(), binary, classLoader) + : U.unmarshal(ctx, binary, classLoader); + } + } + ); + + Iterator<Object> iterator = unmarshalled.iterator(); + if (partsBytes != null && parts == null) { - if (compressed()) - parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - else - parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + parts = (Map<Integer, GridDhtPartitionFullMap>)iterator.next(); if (dupPartsData != null) { assert parts != null; @@ -528,53 +578,41 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } } - if (parts == null) - parts = new HashMap<>(); + if (partCntrsBytes != null && partCntrs == null) + partCntrs = (IgniteDhtPartitionCountersMap)iterator.next(); - if (partCntrsBytes != null && partCntrs == null) { - if (compressed()) - partCntrs = U.unmarshalZip(ctx.marshaller(), partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - else - partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - } + if (partCntrsBytes2 != null && partCntrs2 == null) + partCntrs2 = (IgniteDhtPartitionCountersMap2)iterator.next(); - if (partCntrsBytes2 != null && partCntrs2 == null) { - if (compressed()) - partCntrs2 = U.unmarshalZip(ctx.marshaller(), partCntrsBytes2, U.resolveClassLoader(ldr, ctx.gridConfig())); - else - partCntrs2 = U.unmarshal(ctx, partCntrsBytes2, U.resolveClassLoader(ldr, ctx.gridConfig())); - } + if (partHistSuppliersBytes != null && partHistSuppliers == null) + partHistSuppliers = (IgniteDhtPartitionHistorySuppliersMap)iterator.next(); - if (partHistSuppliersBytes != null && partHistSuppliers == null) { - if (compressed()) - partHistSuppliers = U.unmarshalZip(ctx.marshaller(), partHistSuppliersBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - else - partHistSuppliers = U.unmarshal(ctx, partHistSuppliersBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - } + if (partsToReloadBytes != null && partsToReload == null) + partsToReload = (IgniteDhtPartitionsToReloadMap)iterator.next(); - if (partsToReloadBytes != null && partsToReload == null) { - if (compressed()) - partsToReload = U.unmarshalZip(ctx.marshaller(), partsToReloadBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - else - partsToReload = U.unmarshal(ctx, partsToReloadBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - } + if (partsSizesBytes != null && partsSizes == null) + partsSizes = (Map<Integer, Map<Integer, Long>>)iterator.next(); - if (partsSizesBytes != null && partsSizes == null) { - if (compressed()) - partsSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - else - partsSizes = U.unmarshal(ctx, partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - } + if (errsBytes != null && errs == null) + errs = (Map<UUID, Exception>)iterator.next(); + + if (parts == null) + parts = new HashMap<>(); if (partCntrs == null) partCntrs = new IgniteDhtPartitionCountersMap(); - if (errsBytes != null && errs == null) { - if (compressed()) - errs = U.unmarshalZip(ctx.marshaller(), errsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - else - errs = U.unmarshal(ctx, errsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - } + if (partCntrs2 == null) + partCntrs2 = new IgniteDhtPartitionCountersMap2(); + + if(partHistSuppliers == null) + partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap(); + + if(partsToReload == null) + partsToReload = new IgniteDhtPartitionsToReloadMap(); + + if(partsSizes == null) + partsSizes = new HashMap<>(); if (errs == null) errs = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index b517da6..94ac100 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -1348,7 +1348,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan cctx.cache().cacheGroups(), cacheGroup -> { if (cacheGroup.isLocal()) - return; + return null; cctx.database().checkpointReadLock(); @@ -1361,6 +1361,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan finally { cctx.database().checkpointReadUnlock(); } + + return null; } ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 68219e6..f2be434 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -17,6 +17,113 @@ package org.apache.ignite.internal.util; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteDeploymentException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.binary.BinaryRawWriter; +import org.apache.ignite.cluster.ClusterGroupEmptyException; +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.compute.ComputeTaskCancelledException; +import org.apache.ignite.compute.ComputeTaskName; +import org.apache.ignite.compute.ComputeTaskTimeoutException; +import org.apache.ignite.configuration.AddressResolver; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; +import org.apache.ignite.internal.IgniteDeploymentCheckedException; +import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException; +import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException; +import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.communication.GridIoManager; +import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; +import org.apache.ignite.internal.mxbean.IgniteStandardMXBean; +import org.apache.ignite.internal.processors.cache.CacheClassLoaderMarker; +import org.apache.ignite.internal.processors.cache.GridCacheAttributes; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cluster.BaselineTopology; +import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; +import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; +import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; +import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.io.GridFilenameUtils; +import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryNativeLoader; +import org.apache.ignite.internal.util.lang.GridClosureException; +import org.apache.ignite.internal.util.lang.GridPeerDeployAware; +import org.apache.ignite.internal.util.lang.GridTuple; +import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer; +import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.SB; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteFutureCancelledException; +import org.apache.ignite.lang.IgniteFutureTimeoutException; +import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.apache.ignite.spi.IgniteSpi; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.discovery.DiscoverySpi; +import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; +import org.apache.ignite.transactions.TransactionDeadlockException; +import org.apache.ignite.transactions.TransactionHeuristicException; +import org.apache.ignite.transactions.TransactionOptimisticException; +import org.apache.ignite.transactions.TransactionRollbackException; +import org.apache.ignite.transactions.TransactionTimeoutException; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import sun.misc.Unsafe; + +import javax.management.DynamicMBean; +import javax.management.JMException; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.naming.Context; +import javax.naming.NamingException; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; @@ -143,116 +250,11 @@ import java.util.logging.Logger; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.zip.Deflater; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; -import javax.management.DynamicMBean; -import javax.management.JMException; -import javax.management.MBeanRegistrationException; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.naming.Context; -import javax.naming.NamingException; -import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteClientDisconnectedException; -import org.apache.ignite.IgniteDeploymentException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteIllegalStateException; -import org.apache.ignite.IgniteInterruptedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.binary.BinaryRawReader; -import org.apache.ignite.binary.BinaryRawWriter; -import org.apache.ignite.cluster.ClusterGroupEmptyException; -import org.apache.ignite.cluster.ClusterMetrics; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.cluster.ClusterTopologyException; -import org.apache.ignite.compute.ComputeTask; -import org.apache.ignite.compute.ComputeTaskCancelledException; -import org.apache.ignite.compute.ComputeTaskName; -import org.apache.ignite.compute.ComputeTaskTimeoutException; -import org.apache.ignite.configuration.AddressResolver; -import org.apache.ignite.configuration.DataRegionConfiguration; -import org.apache.ignite.configuration.DataStorageConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; -import org.apache.ignite.internal.IgniteDeploymentCheckedException; -import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; -import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.IgniteNodeAttributes; -import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException; -import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException; -import org.apache.ignite.internal.events.DiscoveryCustomEvent; -import org.apache.ignite.internal.managers.communication.GridIoManager; -import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; -import org.apache.ignite.internal.mxbean.IgniteStandardMXBean; -import org.apache.ignite.internal.processors.cache.CacheClassLoaderMarker; -import org.apache.ignite.internal.processors.cache.GridCacheAttributes; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cluster.BaselineTopology; -import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; -import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; -import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; -import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.future.IgniteFutureImpl; -import org.apache.ignite.internal.util.io.GridFilenameUtils; -import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryNativeLoader; -import org.apache.ignite.internal.util.lang.GridClosureException; -import org.apache.ignite.internal.util.lang.GridPeerDeployAware; -import org.apache.ignite.internal.util.lang.GridTuple; -import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.P1; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.A; -import org.apache.ignite.internal.util.typedef.internal.SB; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteClosure; -import org.apache.ignite.lang.IgniteFutureCancelledException; -import org.apache.ignite.lang.IgniteFutureTimeoutException; -import org.apache.ignite.lang.IgniteOutClosure; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteProductVersion; -import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer; -import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.lifecycle.LifecycleAware; -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.PluginProvider; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.apache.ignite.spi.IgniteSpi; -import org.apache.ignite.spi.IgniteSpiException; -import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; -import org.apache.ignite.transactions.TransactionDeadlockException; -import org.apache.ignite.transactions.TransactionHeuristicException; -import org.apache.ignite.transactions.TransactionOptimisticException; -import org.apache.ignite.transactions.TransactionRollbackException; -import org.apache.ignite.transactions.TransactionTimeoutException; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import sun.misc.Unsafe; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_HOSTNAME_VERIFIER; import static org.apache.ignite.IgniteSystemProperties.IGNITE_HOME; @@ -10324,11 +10326,23 @@ public abstract class IgniteUtils { } /** + * Zip binary payload using default compression. + * * @param bytes Byte array to compress. * @return Compressed bytes. * @throws IgniteCheckedException If failed. */ public static byte[] zip(@Nullable byte[] bytes) throws IgniteCheckedException { + return zip(bytes, Deflater.DEFAULT_COMPRESSION); + } + + /** + * @param bytes Byte array to compress. + * @param compressionLevel Level of compression to encode. + * @return Compressed bytes. + * @throws IgniteCheckedException If failed. + */ + public static byte[] zip(@Nullable byte[] bytes, int compressionLevel) throws IgniteCheckedException { try { if (bytes == null) return null; @@ -10336,6 +10350,8 @@ public abstract class IgniteUtils { ByteArrayOutputStream bos = new ByteArrayOutputStream(); try (ZipOutputStream zos = new ZipOutputStream(bos)) { + zos.setLevel(compressionLevel); + ZipEntry entry = new ZipEntry(""); try { @@ -10740,9 +10756,12 @@ public abstract class IgniteUtils { * @param <T> Type of data. * @throws IgniteCheckedException if parallel execution was failed. */ - public static <T> void doInParallel(ExecutorService executorSvc, Collection<T> srcDatas, - IgniteThrowableConsumer<T> operation) throws IgniteCheckedException, IgniteInterruptedCheckedException { - doInParallel(srcDatas.size(), executorSvc, srcDatas, operation); + public static <T, R> Collection<R> doInParallel( + ExecutorService executorSvc, + Collection<T> srcDatas, + IgniteThrowableConsumer<T, R> operation + ) throws IgniteCheckedException, IgniteInterruptedCheckedException { + return doInParallel(srcDatas.size(), executorSvc, srcDatas, operation); } /** @@ -10753,38 +10772,58 @@ public abstract class IgniteUtils { * @param srcDatas List of data for parallelization. * @param operation Logic for execution of on each item of data. * @param <T> Type of data. + * @param <R> Type of return value. * @throws IgniteCheckedException if parallel execution was failed. */ - public static <T> void doInParallel( + public static <T, R> Collection<R> doInParallel( int parallelismLvl, ExecutorService executorSvc, Collection<T> srcDatas, - IgniteThrowableConsumer<T> operation + IgniteThrowableConsumer<T, R> operation ) throws IgniteCheckedException, IgniteInterruptedCheckedException { + if(srcDatas.isEmpty()) + return Collections.emptyList(); + + int batchSize = srcDatas.size() / parallelismLvl; + + final int finalBatchSize = batchSize == 0 ? srcDatas.size() : batchSize; + List<List<T>> batches = IntStream.range(0, parallelismLvl) - .mapToObj(i -> new ArrayList<T>()) + .mapToObj(i -> new ArrayList<T>(finalBatchSize)) .collect(Collectors.toList()); - int i = 0; + int batchIndex = 0; + + final int maxBatchIndex = batches.size() -1; - for (T src : srcDatas) - batches.get(i++ % parallelismLvl).add(src); + List<T> currentBatch = batches.get(batchIndex); - List<Future<Object>> consumerFutures = batches.stream() + for (T src : srcDatas) { + currentBatch.add(src); + + if(currentBatch.size() >= batchSize && batchIndex < maxBatchIndex) + currentBatch = batches.get(++batchIndex); + } + + List<Future<Collection<R>>> consumerFutures = batches.stream() .filter(batch -> !batch.isEmpty()) .map(batch -> executorSvc.submit(() -> { + Collection<R> results = new ArrayList<>(batch.size()); + for (T item : batch) - operation.accept(item); + results.add(operation.accept(item)); - return null; + return results; })) .collect(Collectors.toList()); Throwable error =null; - for (Future<Object> future : consumerFutures) { + Collection<R> results = new ArrayList<>(srcDatas.size()); + + for (Future<Collection<R>> future : consumerFutures) { try { - future.get(); + results.addAll(future.get()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -10817,6 +10856,8 @@ public abstract class IgniteUtils { throw new IgniteCheckedException(error); } + + return results; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java index 46813a9..55feed8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableConsumer.java @@ -25,13 +25,15 @@ import org.apache.ignite.IgniteCheckedException; * interfaces, {@code IgniteThrowableConsumer} is expected to operate via side-effects. * * @param <E> Type of closure parameter. + * @param <R> Type of result value. */ -public interface IgniteThrowableConsumer<E> extends Serializable { +public interface IgniteThrowableConsumer<E, R> extends Serializable { /** * Consumer body. * * @param e Consumer parameter. + * @return Result of consumer operation. * @throws IgniteCheckedException if body execution was failed. */ - public void accept(E e) throws IgniteCheckedException; + public R accept(E e) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java index 5de12a2..070fd4a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java @@ -45,9 +45,11 @@ import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterGroup; @@ -885,18 +887,26 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest { public void testDoInParallel() throws Throwable { CyclicBarrier barrier = new CyclicBarrier(3); - IgniteUtils.doInParallel(3, - Executors.newFixedThreadPool(3), - Arrays.asList(1, 2, 3), - i -> { - try { - barrier.await(1, TimeUnit.SECONDS); - } - catch (Exception e) { - throw new IgniteCheckedException(e); + ExecutorService executorService = Executors.newFixedThreadPool(3); + + try { + IgniteUtils.doInParallel(3, + executorService, + Arrays.asList(1, 2, 3), + i -> { + try { + barrier.await(1, TimeUnit.SECONDS); + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + + return null; } - } - ); + ); + } finally { + executorService.shutdownNow(); + } } /** @@ -905,9 +915,11 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest { public void testDoInParallelBatch() { CyclicBarrier barrier = new CyclicBarrier(3); + ExecutorService executorService = Executors.newFixedThreadPool(3); + try { IgniteUtils.doInParallel(2, - Executors.newFixedThreadPool(3), + executorService, Arrays.asList(1, 2, 3), i -> { try { @@ -916,6 +928,8 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest { catch (Exception e) { throw new IgniteCheckedException(e); } + + return null; } ); @@ -923,22 +937,98 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest { } catch (Exception e) { assertTrue(e.toString(), X.hasCause(e, TimeoutException.class)); + } finally { + executorService.shutdownNow(); } } /** + * Test parallel execution in order. + */ + public void testDoInParallelResultsOrder() throws IgniteCheckedException { + ExecutorService executorService = Executors.newFixedThreadPool(4); + + try { + testOrder(executorService, 1, 1); + testOrder(executorService, 2, 1); + testOrder(executorService, 3, 1); + testOrder(executorService, 9, 1); + testOrder(executorService, 10, 1); + testOrder(executorService, 9999, 1); + + testOrder(executorService, 1, 2); + testOrder(executorService, 2, 2); + testOrder(executorService, 3, 2); + testOrder(executorService, 9, 2); + testOrder(executorService, 10, 2); + testOrder(executorService, 9999, 2); + + testOrder(executorService, 1, 3); + testOrder(executorService, 2, 3); + testOrder(executorService, 3, 3); + testOrder(executorService, 3, 3); + testOrder(executorService, 10, 3); + testOrder(executorService, 9999, 3); + + testOrder(executorService, 1, 4); + testOrder(executorService, 2, 4); + testOrder(executorService, 3, 4); + testOrder(executorService, 9, 4); + testOrder(executorService, 10, 4); + testOrder(executorService, 9999, 4); + } finally { + executorService.shutdownNow(); + } + } + + /** + * Template method to test parallel execution + * @param executorService ExecutorService. + * @param size Size. + * @param parallelism Parallelism. + * @throws IgniteCheckedException Exception. + */ + private void testOrder(ExecutorService executorService, int size, int parallelism) throws IgniteCheckedException { + List<Integer> list = new ArrayList<>(); + for(int i = 0; i < size; i++) + list.add(i); + + Collection<Integer> results = IgniteUtils.doInParallel( + parallelism, + executorService, + list, + i -> i * 2 + ); + + assertEquals(list.size(), results.size()); + + final int[] i = {0}; + results.forEach(new Consumer<Integer>() { + @Override public void accept(Integer integer) { + assertEquals(2 * list.get(i[0]), integer.intValue()); + i[0]++; + } + }); + } + + /** * */ public void testDoInParallelException() { String expectedException = "ExpectedException"; + ExecutorService executorService = Executors.newFixedThreadPool(1); + try { - IgniteUtils.doInParallel(3, - Executors.newFixedThreadPool(1), + IgniteUtils.doInParallel( + 1, + executorService, Arrays.asList(1, 2, 3), i -> { - if (i == 1) + if (Integer.valueOf(1).equals(i)) throw new IgniteCheckedException(expectedException); + + return null; } ); @@ -946,6 +1036,8 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest { } catch (IgniteCheckedException e) { assertEquals(expectedException, e.getMessage()); + } finally { + executorService.shutdownNow(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/70952fe9/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs index 71d2ba0..a50d48a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/IgniteConfigurationParityTest.cs @@ -81,7 +81,8 @@ namespace Apache.Ignite.Core.Tests.ApiParity "TimeServerPortBase", "TimeServerPortRange", "IncludeProperties", - "isAutoActivationEnabled" // IGNITE-7301 + "isAutoActivationEnabled", // IGNITE-7301 + "NetworkCompressionLevel" }; /// <summary>