IGNITE-4041: Created separate processor for thread pools and refactored IO manager. This closes #1150.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9a0676f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9a0676f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9a0676f Branch: refs/heads/ignite-ssl-hotfix Commit: f9a0676fad7fd6c23e3c91c10d7e0412ccb27c06 Parents: b9c776a Author: vozerov-gridgain <voze...@gridgain.com> Authored: Tue Oct 11 10:23:01 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Oct 11 10:23:01 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalContext.java | 15 ++ .../ignite/internal/GridKernalContextImpl.java | 24 +++ .../apache/ignite/internal/IgniteKernal.java | 6 + .../org/apache/ignite/internal/IgnitionEx.java | 19 ++- .../managers/communication/GridIoManager.java | 161 ++----------------- .../processors/closure/GridClosurePolicy.java | 51 ------ .../closure/GridClosureProcessor.java | 88 +++------- .../internal/processors/pool/PoolProcessor.java | 149 +++++++++++++++++ .../resources/META-INF/classnames.properties | 1 - .../managers/GridManagerStopSelfTest.java | 2 + .../util/future/GridFutureAdapterSelfTest.java | 3 + .../junits/GridTestKernalContext.java | 1 + 12 files changed, 250 insertions(+), 270 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index b123a4a..e608af2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.odbc.OdbcProcessor; import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; +import org.apache.ignite.internal.processors.pool.PoolProcessor; import org.apache.ignite.internal.processors.port.GridPortProcessor; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.processors.resource.GridResourceProcessor; @@ -286,6 +287,13 @@ public interface GridKernalContext extends Iterable<GridComponent> { public HadoopProcessorAdapter hadoop(); /** + * Gets pool processor. + * + * @return Pool processor. + */ + public PoolProcessor pools(); + + /** * Gets Hadoop helper. * * @return Hadoop helper. @@ -533,6 +541,13 @@ public interface GridKernalContext extends Iterable<GridComponent> { public ExecutorService getRestExecutorService(); /** + * Get affinity executor service. + * + * @return Affinity executor service. + */ + public ExecutorService getAffinityExecutorService(); + + /** * Gets exception registry. * * @return Exception registry. http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index c7e26e9..ddef345 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -70,6 +70,7 @@ import org.apache.ignite.internal.processors.odbc.OdbcProcessor; import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; +import org.apache.ignite.internal.processors.pool.PoolProcessor; import org.apache.ignite.internal.processors.port.GridPortProcessor; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.processors.resource.GridResourceProcessor; @@ -259,6 +260,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude + private PoolProcessor poolProc; + + /** */ + @GridToStringExclude private IgnitePluginProcessor pluginProc; /** */ @@ -311,6 +316,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude + protected ExecutorService affExecSvc; + + /** */ + @GridToStringExclude protected IgniteStripedThreadPoolExecutor callbackExecSvc; /** */ @@ -372,6 +381,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable * @param mgmtExecSvc Management executor service. * @param igfsExecSvc IGFS executor service. * @param restExecSvc REST executor service. + * @param affExecSvc Affinity executor service. * @param plugins Plugin providers. * @throws IgniteCheckedException In case of error. */ @@ -389,6 +399,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, ExecutorService restExecSvc, + ExecutorService affExecSvc, IgniteStripedThreadPoolExecutor callbackExecSvc, List<PluginProvider> plugins) throws IgniteCheckedException { assert grid != null; @@ -406,6 +417,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable this.mgmtExecSvc = mgmtExecSvc; this.igfsExecSvc = igfsExecSvc; this.restExecSvc = restExecSvc; + this.affExecSvc = affExecSvc; this.callbackExecSvc = callbackExecSvc; String workDir = U.workDirectory(cfg.getWorkDirectory(), cfg.getIgniteHome()); @@ -533,6 +545,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable cluster = (ClusterProcessor)comp; else if (comp instanceof PlatformProcessor) platformProc = (PlatformProcessor)comp; + else if (comp instanceof PoolProcessor) + poolProc = (PoolProcessor) comp; else if (!(comp instanceof DiscoveryNodeValidationProcessor)) assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass(); @@ -757,6 +771,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public PoolProcessor pools() { + return poolProc; + } + + /** {@inheritDoc} */ @Override public ExecutorService utilityCachePool() { return utilityCachePool; } @@ -942,6 +961,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public ExecutorService getAffinityExecutorService() { + return affExecSvc; + } + + /** {@inheritDoc} */ @Override public IgniteExceptionRegistry exceptionRegistry() { return IgniteExceptionRegistry.get(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index e0a36a7..02f16af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -126,6 +126,7 @@ import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor; import org.apache.ignite.internal.processors.platform.PlatformNoopProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; +import org.apache.ignite.internal.processors.pool.PoolProcessor; import org.apache.ignite.internal.processors.port.GridPortProcessor; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.processors.query.GridQueryProcessor; @@ -666,6 +667,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { * @param mgmtExecSvc Management executor service. * @param igfsExecSvc IGFS executor service. * @param restExecSvc Reset executor service. + * @param affExecSvc Affinity executor service. * @param errHnd Error handler to use for notification about startup problems. * @throws IgniteCheckedException Thrown in case of any errors. */ @@ -679,6 +681,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, ExecutorService restExecSvc, + ExecutorService affExecSvc, IgniteStripedThreadPoolExecutor callbackExecSvc, GridAbsClosure errHnd) throws IgniteCheckedException @@ -784,6 +787,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { mgmtExecSvc, igfsExecSvc, restExecSvc, + affExecSvc, callbackExecSvc, plugins); @@ -827,6 +831,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { startProcessor(new IgnitePluginProcessor(ctx, cfg, plugins)); + startProcessor(new PoolProcessor(ctx)); + // Off-heap processor has no dependencies. startProcessor(new GridOffHeapProcessor(ctx)); http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 001f599..a6860b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1477,6 +1477,9 @@ public class IgnitionEx { /** Marshaller cache executor service. */ private ThreadPoolExecutor marshCacheExecSvc; + /** Affinity executor service. */ + private ThreadPoolExecutor affExecSvc; + /** Continuous query executor service. */ private IgniteStripedThreadPoolExecutor callbackExecSvc; @@ -1734,6 +1737,16 @@ public class IgnitionEx { marshCacheExecSvc.allowCoreThreadTimeOut(true); + affExecSvc = new IgniteThreadPoolExecutor( + "aff", + cfg.getGridName(), + 1, + 1, + DFLT_THREAD_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<Runnable>()); + + affExecSvc.allowCoreThreadTimeOut(true); + // Register Ignite MBean for current grid instance. registerFactoryMbean(myCfg.getMBeanServer()); @@ -1746,7 +1759,7 @@ public class IgnitionEx { grid = grid0; grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, - igfsExecSvc, restExecSvc, callbackExecSvc, + igfsExecSvc, restExecSvc, affExecSvc, callbackExecSvc, new CA() { @Override public void apply() { startLatch.countDown(); @@ -2381,6 +2394,10 @@ public class IgnitionEx { marshCacheExecSvc = null; + U.shutdownNow(getClass(), affExecSvc, log); + + affExecSvc = null; + U.shutdownNow(getClass(), callbackExecSvc, log); callbackExecSvc = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 4bc2eea..bd285b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -29,9 +29,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -54,6 +51,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; +import org.apache.ignite.internal.processors.pool.PoolProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; import org.apache.ignite.internal.util.GridSpinReadWriteLock; @@ -68,7 +66,6 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.IoPool; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFormatter; @@ -78,7 +75,6 @@ import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.CommunicationListener; import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; @@ -130,32 +126,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** Disconnect listeners. */ private final Collection<GridDisconnectListener> disconnectLsnrs = new ConcurrentLinkedQueue<>(); - /** Map of {@link IoPool}-s injected by Ignite plugins. */ - private final IoPool[] ioPools = new IoPool[128]; - - /** Public pool. */ - private ExecutorService pubPool; - - /** Internal P2P pool. */ - private ExecutorService p2pPool; - - /** Internal system pool. */ - private ExecutorService sysPool; - - /** Internal management pool. */ - private ExecutorService mgmtPool; - - /** Affinity assignment executor service. */ - private ExecutorService affPool; - - /** Utility cache pool. */ - private ExecutorService utilityCachePool; - - /** Marshaller cache pool. */ - private ExecutorService marshCachePool; - - /** IGFS pool. */ - private ExecutorService igfsPool; + /** Pool processor. */ + private PoolProcessor pools; /** Discovery listener. */ private GridLocalEventListener discoLsnr; @@ -210,6 +182,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa public GridIoManager(GridKernalContext ctx) { super(ctx, ctx.config().getCommunicationSpi()); + pools = ctx.pools(); + + assert pools != null; + locNodeId = ctx.localNodeId(); discoDelay = ctx.config().getDiscoveryStartupDelay(); @@ -253,21 +229,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa startSpi(); - pubPool = ctx.getExecutorService(); - p2pPool = ctx.getPeerClassLoadingExecutorService(); - sysPool = ctx.getSystemExecutorService(); - mgmtPool = ctx.getManagementExecutorService(); - utilityCachePool = ctx.utilityCachePool(); - marshCachePool = ctx.marshallerCachePool(); - igfsPool = ctx.getIgfsExecutorService(); - affPool = new IgniteThreadPoolExecutor( - "aff", - ctx.gridName(), - 1, - 1, - 0, - new LinkedBlockingQueue<Runnable>()); - getSpi().setListener(commLsnr = new CommunicationListener<Serializable>() { @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) { try { @@ -335,41 +296,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (log.isDebugEnabled()) log.debug(startInfo()); - - registerIoPoolExtensions(); - } - - /** - * Processes IO messaging pool extensions. - * @throws IgniteCheckedException On error. - */ - private void registerIoPoolExtensions() throws IgniteCheckedException { - // Process custom IO messaging pool extensions: - final IoPool[] executorExtensions = ctx.plugins().extensions(IoPool.class); - - if (executorExtensions != null) { - // Store it into the map and check for duplicates: - for (IoPool ex : executorExtensions) { - final byte id = ex.id(); - - // 1. Check the pool id is non-negative: - if (id < 0) - throw new IgniteCheckedException("Failed to register IO executor pool because its Id is negative " + - "[id=" + id + ']'); - - // 2. Check the pool id is in allowed range: - if (isReservedGridIoPolicy(id)) - throw new IgniteCheckedException("Failed to register IO executor pool because its Id in in the " + - "reserved range (0-31) [id=" + id + ']'); - - // 3. Check the pool for duplicates: - if (ioPools[id] != null) - throw new IgniteCheckedException("Failed to register IO executor pool because its " + - "Id as already used [id=" + id + ']'); - - ioPools[id] = ex; - } - } } /** {@inheritDoc} */ @@ -546,8 +472,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (interrupted) Thread.currentThread().interrupt(); - U.shutdownNow(getClass(), affPool, log); - GridEventStorageManager evtMgr = ctx.event(); if (evtMgr != null && discoLsnr != null) @@ -566,8 +490,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (log.isDebugEnabled()) log.debug(stopInfo()); - - Arrays.fill(ioPools, null); } /** @@ -683,67 +605,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** - * Gets execution pool for policy. - * - * @param plc Policy. - * @return Execution pool. - * @throws IgniteCheckedException If failed. - */ - private Executor pool(byte plc) throws IgniteCheckedException { - switch (plc) { - case P2P_POOL: - return p2pPool; - case SYSTEM_POOL: - return sysPool; - case PUBLIC_POOL: - return pubPool; - case MANAGEMENT_POOL: - return mgmtPool; - case AFFINITY_POOL: - return affPool; - - case UTILITY_CACHE_POOL: - assert utilityCachePool != null : "Utility cache pool is not configured."; - - return utilityCachePool; - - case MARSH_CACHE_POOL: - assert marshCachePool != null : "Marshaller cache pool is not configured."; - - return marshCachePool; - - case IGFS_POOL: - assert igfsPool != null : "IGFS pool is not configured."; - - return igfsPool; - - default: { - assert plc >= 0 : "Negative policy: " + plc; - - if (isReservedGridIoPolicy(plc)) - throw new IgniteCheckedException("Failed to process message with policy of reserved" + - " range (0-31), [policy=" + plc + ']'); - - IoPool pool = ioPools[plc]; - - if (pool == null) - throw new IgniteCheckedException("Failed to process message because no pool is registered " + - "for policy. [policy=" + plc + ']'); - - assert plc == pool.id(); - - Executor ex = pool.executor(); - - if (ex == null) - throw new IgniteCheckedException("Failed to process message because corresponding executor " + - "is null. [id=" + plc + ']'); - - return ex; - } - } - } - - /** * @param nodeId Node ID. * @param msg Message. * @param msgC Closure to call when message processing finished. @@ -778,7 +639,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa }; try { - p2pPool.execute(c); + pools.p2pPool().execute(c); } catch (RejectedExecutionException e) { U.error(log, "Failed to process P2P message due to execution rejection. Increase the upper bound " + @@ -818,7 +679,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa }; try { - pool(plc).execute(c); + pools.poolForPolicy(plc).execute(c); } catch (RejectedExecutionException e) { U.error(log, "Failed to process regular message due to execution rejection. Increase the upper bound " + @@ -1154,7 +1015,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa }; try { - pool(plc).execute(c); + pools.poolForPolicy(plc).execute(c); } catch (RejectedExecutionException e) { U.error(log, "Failed to process ordered message due to execution rejection. " + @@ -1781,7 +1642,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa try { for (final GridCommunicationMessageSet msgSet : msgSets) { - pool(msgSet.policy()).execute( + pools.poolForPolicy(msgSet.policy()).execute( new Runnable() { @Override public void run() { unwindMessageSet(msgSet, lsnrs0); http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java deleted file mode 100644 index c17cedd..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.closure; - -import org.jetbrains.annotations.Nullable; - -/** - * This enumeration defines different types of closure - * processing by the closure processor. - */ -public enum GridClosurePolicy { - /** Public execution pool. */ - PUBLIC_POOL, - - /** P2P execution pool. */ - P2P_POOL, - - /** System execution pool. */ - SYSTEM_POOL, - - /** IGFS pool. */ - IGFS_POOL; - - /** Enum values. */ - private static final GridClosurePolicy[] VALS = values(); - - /** - * Efficiently gets enumerated value from its ordinal. - * - * @param ord Ordinal value. - * @return Enumerated value. - */ - @Nullable public static GridClosurePolicy fromOrdinal(int ord) { - return ord >= 0 && ord < VALS.length ? VALS[ord] : null; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index 6f878ce..a96d6eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -50,8 +50,10 @@ import org.apache.ignite.internal.GridClosureCallMode; import org.apache.ignite.internal.GridInternalWrapper; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.pool.PoolProcessor; import org.apache.ignite.internal.processors.resource.GridNoImplicitInjection; import org.apache.ignite.internal.util.GridSpinReadWriteLock; import org.apache.ignite.internal.util.IgniteUtils; @@ -88,14 +90,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** Ignite version in which binarylizable versions of closures were introduced. */ public static final IgniteProductVersion BINARYLIZABLE_CLOSURES_SINCE = IgniteProductVersion.fromString("1.6.0"); - /** */ - private final Executor sysPool; - - /** */ - private final Executor pubPool; - - /** */ - private final Executor igfsPool; + /** Pool processor. */ + private final PoolProcessor pools; /** Lock to control execution after stop. */ private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); @@ -109,9 +105,9 @@ public class GridClosureProcessor extends GridProcessorAdapter { public GridClosureProcessor(GridKernalContext ctx) { super(ctx); - sysPool = ctx.getSystemExecutorService(); - pubPool = ctx.getExecutorService(); - igfsPool = ctx.getIgfsExecutorService(); + pools = ctx.pools(); + + assert pools != null; } /** {@inheritDoc} */ @@ -731,57 +727,13 @@ public class GridClosureProcessor extends GridProcessorAdapter { } /** - * Gets pool by execution policy. - * - * @param plc Whether to get system or public pool. - * @return Requested worker pool. - */ - private Executor pool(GridClosurePolicy plc) { - switch (plc) { - case PUBLIC_POOL: - return pubPool; - - case SYSTEM_POOL: - return sysPool; - - case IGFS_POOL: - return igfsPool; - - default: - throw new IllegalArgumentException("Invalid closure execution policy: " + plc); - } - } - - /** - * Gets pool name by execution policy. - * - * @param plc Policy to choose executor pool. - * @return Pool name. - */ - private String poolName(GridClosurePolicy plc) { - switch (plc) { - case PUBLIC_POOL: - return "public"; - - case SYSTEM_POOL: - return "system"; - - case IGFS_POOL: - return "igfs"; - - default: - throw new IllegalArgumentException("Invalid closure execution policy: " + plc); - } - } - - /** * @param c Closure to execute. * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used. * @return Future. * @throws IgniteCheckedException Thrown in case of any errors. */ private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, boolean sys) throws IgniteCheckedException { - return runLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); + return runLocal(c, sys ? GridIoPolicy.SYSTEM_POOL : GridIoPolicy.PUBLIC_POOL); } /** @@ -790,7 +742,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @return Future. * @throws IgniteCheckedException Thrown in case of any errors. */ - private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, GridClosurePolicy plc) throws IgniteCheckedException { + private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, byte plc) + throws IgniteCheckedException { if (c == null) return new GridFinishedFuture(); @@ -830,11 +783,11 @@ public class GridClosureProcessor extends GridProcessorAdapter { fut.setWorker(w); try { - pool(plc).execute(w); + pools.poolForPolicy(plc).execute(w); } catch (RejectedExecutionException e) { U.error(log, "Failed to execute worker due to execution rejection " + - "(increase upper bound on " + poolName(plc) + " executor service).", e); + "(increase upper bound on executor service) [policy=" + plc + ']', e); w.run(); } @@ -866,7 +819,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @return Future. */ public IgniteInternalFuture<?> runLocalSafe(Runnable c, boolean sys) { - return runLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); + return runLocalSafe(c, sys ? GridIoPolicy.SYSTEM_POOL : GridIoPolicy.PUBLIC_POOL); } /** @@ -877,7 +830,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param plc Policy to choose executor pool. * @return Future. */ - public IgniteInternalFuture<?> runLocalSafe(Runnable c, GridClosurePolicy plc) { + public IgniteInternalFuture<?> runLocalSafe(Runnable c, byte plc) { try { return runLocal(c, plc); } @@ -921,7 +874,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException Thrown in case of any errors. */ private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, boolean sys) throws IgniteCheckedException { - return callLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); + return callLocal(c, sys ? GridIoPolicy.SYSTEM_POOL : GridIoPolicy.PUBLIC_POOL); } /** @@ -931,7 +884,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @return Future. * @throws IgniteCheckedException Thrown in case of any errors. */ - private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, GridClosurePolicy plc) throws IgniteCheckedException { + private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, byte plc) + throws IgniteCheckedException { if (c == null) return new GridFinishedFuture<>(); @@ -969,11 +923,11 @@ public class GridClosureProcessor extends GridProcessorAdapter { fut.setWorker(w); try { - pool(plc).execute(w); + pools.poolForPolicy(plc).execute(w); } catch (RejectedExecutionException e) { U.error(log, "Failed to execute worker due to execution rejection " + - "(increase upper bound on " + poolName(plc) + " executor service).", e); + "(increase upper bound on executor service) [policy=" + plc + ']', e); w.run(); } @@ -1005,7 +959,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @return Future. */ public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, boolean sys) { - return callLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); + return callLocalSafe(c, sys ? GridIoPolicy.SYSTEM_POOL : GridIoPolicy.PUBLIC_POOL); } /** @@ -1016,7 +970,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param plc Policy to choose executor pool. * @return Future. */ - public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, GridClosurePolicy plc) { + private <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte plc) { try { return callLocal(c, plc); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java new file mode 100644 index 0000000..41e805e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.pool; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; +import org.apache.ignite.plugin.extensions.communication.IoPool; + +import java.util.Arrays; +import java.util.concurrent.Executor; + +/** + * Processor which abstracts out thread pool management. + */ +public class PoolProcessor extends GridProcessorAdapter { + /** Map of {@link IoPool}-s injected by Ignite plugins. */ + private final IoPool[] extPools = new IoPool[128]; + + /** + * Constructor. + * + * @param ctx Kernal context. + */ + public PoolProcessor(GridKernalContext ctx) { + super(ctx); + + IgnitePluginProcessor plugins = ctx.plugins(); + + if (plugins != null) { + // Process custom IO messaging pool extensions: + final IoPool[] executorExtensions = ctx.plugins().extensions(IoPool.class); + + if (executorExtensions != null) { + // Store it into the map and check for duplicates: + for (IoPool ex : executorExtensions) { + final byte id = ex.id(); + + // 1. Check the pool id is non-negative: + if (id < 0) + throw new IgniteException("Failed to register IO executor pool because its ID is " + + "negative: " + id); + + // 2. Check the pool id is in allowed range: + if (GridIoPolicy.isReservedGridIoPolicy(id)) + throw new IgniteException("Failed to register IO executor pool because its ID in in the " + + "reserved range: " + id); + + // 3. Check the pool for duplicates: + if (extPools[id] != null) + throw new IgniteException("Failed to register IO executor pool because its ID as " + + "already used: " + id); + + extPools[id] = ex; + } + } + } + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + // Avoid external thread pools GC retention. + Arrays.fill(extPools, null); + } + + /** + * @return P2P pool. + */ + public Executor p2pPool() { + return ctx.getPeerClassLoadingExecutorService(); + } + + /** + * Get executor service for policy. + * + * @param plc Policy. + * @return Executor service. + * @throws IgniteCheckedException If failed. + */ + public Executor poolForPolicy(byte plc) throws IgniteCheckedException { + switch (plc) { + case GridIoPolicy.P2P_POOL: + return ctx.getPeerClassLoadingExecutorService(); + case GridIoPolicy.SYSTEM_POOL: + return ctx.getSystemExecutorService(); + case GridIoPolicy.PUBLIC_POOL: + return ctx.getExecutorService(); + case GridIoPolicy.MANAGEMENT_POOL: + return ctx.getManagementExecutorService(); + case GridIoPolicy.AFFINITY_POOL: + return ctx.getAffinityExecutorService(); + + case GridIoPolicy.UTILITY_CACHE_POOL: + assert ctx.utilityCachePool() != null : "Utility cache pool is not configured."; + + return ctx.utilityCachePool(); + + case GridIoPolicy.MARSH_CACHE_POOL: + assert ctx.marshallerCachePool() != null : "Marshaller cache pool is not configured."; + + return ctx.marshallerCachePool(); + + case GridIoPolicy.IGFS_POOL: + assert ctx.getIgfsExecutorService() != null : "IGFS pool is not configured."; + + return ctx.getIgfsExecutorService(); + + default: { + if (plc < 0) + throw new IgniteCheckedException("Policy cannot be negative: " + plc); + + if (GridIoPolicy.isReservedGridIoPolicy(plc)) + throw new IgniteCheckedException("Policy is reserved for internal usage (range 0-31): " + plc); + + IoPool pool = extPools[plc]; + + if (pool == null) + throw new IgniteCheckedException("No pool is registered for policy: " + plc); + + assert plc == pool.id(); + + Executor res = pool.executor(); + + if (res == null) + throw new IgniteCheckedException("Thread pool for policy is null: " + plc); + + return res; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 57762c7..2d8d245 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -948,7 +948,6 @@ org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl$UserKeyCacheObjectImpl org.apache.ignite.internal.processors.clock.GridClockDeltaSnapshotMessage org.apache.ignite.internal.processors.clock.GridClockDeltaVersion -org.apache.ignite.internal.processors.closure.GridClosurePolicy org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1 org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1MLA org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1MLAV2 http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerStopSelfTest.java index 04eb8ef..328b775 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerStopSelfTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.managers.failover.GridFailoverManager; import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManager; +import org.apache.ignite.internal.processors.pool.PoolProcessor; import org.apache.ignite.internal.processors.resource.GridResourceProcessor; import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; import org.apache.ignite.resources.LoggerResource; @@ -68,6 +69,7 @@ public class GridManagerStopSelfTest extends GridCommonAbstractTest { ctx.config().setPeerClassLoadingEnabled(true); + ctx.add(new PoolProcessor(ctx)); ctx.add(new GridResourceProcessor(ctx)); ctx.start(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java index f44d282..adcd144 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; +import org.apache.ignite.internal.processors.pool.PoolProcessor; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.testframework.GridTestUtils; @@ -174,6 +175,7 @@ public class GridFutureAdapterSelfTest extends GridCommonAbstractTest { ctx.setExecutorService(Executors.newFixedThreadPool(1)); ctx.setSystemExecutorService(Executors.newFixedThreadPool(1)); + ctx.add(new PoolProcessor(ctx)); ctx.add(new GridClosureProcessor(ctx)); ctx.start(); @@ -238,6 +240,7 @@ public class GridFutureAdapterSelfTest extends GridCommonAbstractTest { ctx.setExecutorService(Executors.newFixedThreadPool(1)); ctx.setSystemExecutorService(Executors.newFixedThreadPool(1)); + ctx.add(new PoolProcessor(ctx)); ctx.add(new GridClosureProcessor(ctx)); ctx.start(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java index e4afe73..cba67e0 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java @@ -61,6 +61,7 @@ public class GridTestKernalContext extends GridKernalContextImpl { null, null, null, + null, U.allPluginProviders()); GridTestUtils.setFieldValue(grid(), "cfg", config());