IGNITE-4041: Created separate processor for thread pools and refactored IO 
manager. This closes #1150.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9a0676f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9a0676f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9a0676f

Branch: refs/heads/ignite-ssl-hotfix
Commit: f9a0676fad7fd6c23e3c91c10d7e0412ccb27c06
Parents: b9c776a
Author: vozerov-gridgain <voze...@gridgain.com>
Authored: Tue Oct 11 10:23:01 2016 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Tue Oct 11 10:23:01 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridKernalContext.java      |  15 ++
 .../ignite/internal/GridKernalContextImpl.java  |  24 +++
 .../apache/ignite/internal/IgniteKernal.java    |   6 +
 .../org/apache/ignite/internal/IgnitionEx.java  |  19 ++-
 .../managers/communication/GridIoManager.java   | 161 ++-----------------
 .../processors/closure/GridClosurePolicy.java   |  51 ------
 .../closure/GridClosureProcessor.java           |  88 +++-------
 .../internal/processors/pool/PoolProcessor.java | 149 +++++++++++++++++
 .../resources/META-INF/classnames.properties    |   1 -
 .../managers/GridManagerStopSelfTest.java       |   2 +
 .../util/future/GridFutureAdapterSelfTest.java  |   3 +
 .../junits/GridTestKernalContext.java           |   1 +
 12 files changed, 250 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index b123a4a..e608af2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -53,6 +53,7 @@ import 
org.apache.ignite.internal.processors.odbc.OdbcProcessor;
 import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor;
 import org.apache.ignite.internal.processors.platform.PlatformProcessor;
 import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.port.GridPortProcessor;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
@@ -286,6 +287,13 @@ public interface GridKernalContext extends 
Iterable<GridComponent> {
     public HadoopProcessorAdapter hadoop();
 
     /**
+     * Gets pool processor.
+     *
+     * @return Pool processor.
+     */
+    public PoolProcessor pools();
+
+    /**
      * Gets Hadoop helper.
      *
      * @return Hadoop helper.
@@ -533,6 +541,13 @@ public interface GridKernalContext extends 
Iterable<GridComponent> {
     public ExecutorService getRestExecutorService();
 
     /**
+     * Get affinity executor service.
+     *
+     * @return Affinity executor service.
+     */
+    public ExecutorService getAffinityExecutorService();
+
+    /**
      * Gets exception registry.
      *
      * @return Exception registry.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index c7e26e9..ddef345 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -70,6 +70,7 @@ import 
org.apache.ignite.internal.processors.odbc.OdbcProcessor;
 import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor;
 import org.apache.ignite.internal.processors.platform.PlatformProcessor;
 import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.port.GridPortProcessor;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
@@ -259,6 +260,10 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
+    private PoolProcessor poolProc;
+
+    /** */
+    @GridToStringExclude
     private IgnitePluginProcessor pluginProc;
 
     /** */
@@ -311,6 +316,10 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
+    protected ExecutorService affExecSvc;
+
+    /** */
+    @GridToStringExclude
     protected IgniteStripedThreadPoolExecutor callbackExecSvc;
 
     /** */
@@ -372,6 +381,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
      * @param mgmtExecSvc Management executor service.
      * @param igfsExecSvc IGFS executor service.
      * @param restExecSvc REST executor service.
+     * @param affExecSvc Affinity executor service.
      * @param plugins Plugin providers.
      * @throws IgniteCheckedException In case of error.
      */
@@ -389,6 +399,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
         ExecutorService restExecSvc,
+        ExecutorService affExecSvc,
         IgniteStripedThreadPoolExecutor callbackExecSvc,
         List<PluginProvider> plugins) throws IgniteCheckedException {
         assert grid != null;
@@ -406,6 +417,7 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
         this.mgmtExecSvc = mgmtExecSvc;
         this.igfsExecSvc = igfsExecSvc;
         this.restExecSvc = restExecSvc;
+        this.affExecSvc = affExecSvc;
         this.callbackExecSvc = callbackExecSvc;
 
         String workDir = U.workDirectory(cfg.getWorkDirectory(), 
cfg.getIgniteHome());
@@ -533,6 +545,8 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
             cluster = (ClusterProcessor)comp;
         else if (comp instanceof PlatformProcessor)
             platformProc = (PlatformProcessor)comp;
+        else if (comp instanceof PoolProcessor)
+            poolProc = (PoolProcessor) comp;
         else if (!(comp instanceof DiscoveryNodeValidationProcessor))
             assert (comp instanceof GridPluginComponent) : "Unknown manager 
class: " + comp.getClass();
 
@@ -757,6 +771,11 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public PoolProcessor pools() {
+        return poolProc;
+    }
+
+    /** {@inheritDoc} */
     @Override public ExecutorService utilityCachePool() {
         return utilityCachePool;
     }
@@ -942,6 +961,11 @@ public class GridKernalContextImpl implements 
GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public ExecutorService getAffinityExecutorService() {
+        return affExecSvc;
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteExceptionRegistry exceptionRegistry() {
         return IgniteExceptionRegistry.get();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index e0a36a7..02f16af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -126,6 +126,7 @@ import 
org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor;
 import org.apache.ignite.internal.processors.platform.PlatformNoopProcessor;
 import org.apache.ignite.internal.processors.platform.PlatformProcessor;
 import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.port.GridPortProcessor;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
@@ -666,6 +667,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
      * @param mgmtExecSvc Management executor service.
      * @param igfsExecSvc IGFS executor service.
      * @param restExecSvc Reset executor service.
+     * @param affExecSvc Affinity executor service.
      * @param errHnd Error handler to use for notification about startup 
problems.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
@@ -679,6 +681,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
         ExecutorService restExecSvc,
+        ExecutorService affExecSvc,
         IgniteStripedThreadPoolExecutor callbackExecSvc,
         GridAbsClosure errHnd)
         throws IgniteCheckedException
@@ -784,6 +787,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
                 mgmtExecSvc,
                 igfsExecSvc,
                 restExecSvc,
+                affExecSvc,
                 callbackExecSvc,
                 plugins);
 
@@ -827,6 +831,8 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
 
             startProcessor(new IgnitePluginProcessor(ctx, cfg, plugins));
 
+            startProcessor(new PoolProcessor(ctx));
+
             // Off-heap processor has no dependencies.
             startProcessor(new GridOffHeapProcessor(ctx));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 001f599..a6860b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1477,6 +1477,9 @@ public class IgnitionEx {
         /** Marshaller cache executor service. */
         private ThreadPoolExecutor marshCacheExecSvc;
 
+        /** Affinity executor service. */
+        private ThreadPoolExecutor affExecSvc;
+
         /** Continuous query executor service. */
         private IgniteStripedThreadPoolExecutor callbackExecSvc;
 
@@ -1734,6 +1737,16 @@ public class IgnitionEx {
 
             marshCacheExecSvc.allowCoreThreadTimeOut(true);
 
+            affExecSvc = new IgniteThreadPoolExecutor(
+                "aff",
+                cfg.getGridName(),
+                1,
+                1,
+                DFLT_THREAD_KEEP_ALIVE_TIME,
+                new LinkedBlockingQueue<Runnable>());
+
+            affExecSvc.allowCoreThreadTimeOut(true);
+
             // Register Ignite MBean for current grid instance.
             registerFactoryMbean(myCfg.getMBeanServer());
 
@@ -1746,7 +1759,7 @@ public class IgnitionEx {
                 grid = grid0;
 
                 grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, 
execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc,
-                    igfsExecSvc, restExecSvc, callbackExecSvc,
+                    igfsExecSvc, restExecSvc, affExecSvc, callbackExecSvc,
                     new CA() {
                         @Override public void apply() {
                             startLatch.countDown();
@@ -2381,6 +2394,10 @@ public class IgnitionEx {
 
             marshCacheExecSvc = null;
 
+            U.shutdownNow(getClass(), affExecSvc, log);
+
+            affExecSvc = null;
+
             U.shutdownNow(getClass(), callbackExecSvc, log);
 
             callbackExecSvc = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 4bc2eea..bd285b2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -29,9 +29,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -54,6 +51,7 @@ import 
org.apache.ignite.internal.managers.deployment.GridDeployment;
 import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import 
org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
@@ -68,7 +66,6 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.IoPool;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
@@ -78,7 +75,6 @@ import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.CommunicationListener;
 import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
@@ -130,32 +126,8 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     /** Disconnect listeners. */
     private final Collection<GridDisconnectListener> disconnectLsnrs = new 
ConcurrentLinkedQueue<>();
 
-    /** Map of {@link IoPool}-s injected by Ignite plugins. */
-    private final IoPool[] ioPools = new IoPool[128];
-
-    /** Public pool. */
-    private ExecutorService pubPool;
-
-    /** Internal P2P pool. */
-    private ExecutorService p2pPool;
-
-    /** Internal system pool. */
-    private ExecutorService sysPool;
-
-    /** Internal management pool. */
-    private ExecutorService mgmtPool;
-
-    /** Affinity assignment executor service. */
-    private ExecutorService affPool;
-
-    /** Utility cache pool. */
-    private ExecutorService utilityCachePool;
-
-    /** Marshaller cache pool. */
-    private ExecutorService marshCachePool;
-
-    /** IGFS pool. */
-    private ExecutorService igfsPool;
+    /** Pool processor. */
+    private PoolProcessor pools;
 
     /** Discovery listener. */
     private GridLocalEventListener discoLsnr;
@@ -210,6 +182,10 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     public GridIoManager(GridKernalContext ctx) {
         super(ctx, ctx.config().getCommunicationSpi());
 
+        pools = ctx.pools();
+
+        assert pools != null;
+
         locNodeId = ctx.localNodeId();
 
         discoDelay = ctx.config().getDiscoveryStartupDelay();
@@ -253,21 +229,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
         startSpi();
 
-        pubPool = ctx.getExecutorService();
-        p2pPool = ctx.getPeerClassLoadingExecutorService();
-        sysPool = ctx.getSystemExecutorService();
-        mgmtPool = ctx.getManagementExecutorService();
-        utilityCachePool = ctx.utilityCachePool();
-        marshCachePool = ctx.marshallerCachePool();
-        igfsPool = ctx.getIgfsExecutorService();
-        affPool = new IgniteThreadPoolExecutor(
-            "aff",
-            ctx.gridName(),
-            1,
-            1,
-            0,
-            new LinkedBlockingQueue<Runnable>());
-
         getSpi().setListener(commLsnr = new 
CommunicationListener<Serializable>() {
             @Override public void onMessage(UUID nodeId, Serializable msg, 
IgniteRunnable msgC) {
                 try {
@@ -335,41 +296,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
         if (log.isDebugEnabled())
             log.debug(startInfo());
-
-        registerIoPoolExtensions();
-    }
-
-    /**
-     * Processes IO messaging pool extensions.
-     * @throws IgniteCheckedException On error.
-     */
-    private void registerIoPoolExtensions() throws IgniteCheckedException {
-        // Process custom IO messaging pool extensions:
-        final IoPool[] executorExtensions = 
ctx.plugins().extensions(IoPool.class);
-
-        if (executorExtensions != null) {
-            // Store it into the map and check for duplicates:
-            for (IoPool ex : executorExtensions) {
-                final byte id = ex.id();
-
-                // 1. Check the pool id is non-negative:
-                if (id < 0)
-                    throw new IgniteCheckedException("Failed to register IO 
executor pool because its Id is negative " +
-                        "[id=" + id + ']');
-
-                // 2. Check the pool id is in allowed range:
-                if (isReservedGridIoPolicy(id))
-                    throw new IgniteCheckedException("Failed to register IO 
executor pool because its Id in in the " +
-                        "reserved range (0-31) [id=" + id + ']');
-
-                // 3. Check the pool for duplicates:
-                if (ioPools[id] != null)
-                    throw new IgniteCheckedException("Failed to register IO 
executor pool because its " +
-                        "Id as already used [id=" + id + ']');
-
-                ioPools[id] = ex;
-            }
-        }
     }
 
     /** {@inheritDoc} */
@@ -546,8 +472,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             if (interrupted)
                 Thread.currentThread().interrupt();
 
-            U.shutdownNow(getClass(), affPool, log);
-
             GridEventStorageManager evtMgr = ctx.event();
 
             if (evtMgr != null && discoLsnr != null)
@@ -566,8 +490,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
         if (log.isDebugEnabled())
             log.debug(stopInfo());
-
-        Arrays.fill(ioPools, null);
     }
 
     /**
@@ -683,67 +605,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * Gets execution pool for policy.
-     *
-     * @param plc Policy.
-     * @return Execution pool.
-     * @throws IgniteCheckedException If failed.
-     */
-    private Executor pool(byte plc) throws IgniteCheckedException {
-        switch (plc) {
-            case P2P_POOL:
-                return p2pPool;
-            case SYSTEM_POOL:
-                return sysPool;
-            case PUBLIC_POOL:
-                return pubPool;
-            case MANAGEMENT_POOL:
-                return mgmtPool;
-            case AFFINITY_POOL:
-                return affPool;
-
-            case UTILITY_CACHE_POOL:
-                assert utilityCachePool != null : "Utility cache pool is not 
configured.";
-
-                return utilityCachePool;
-
-            case MARSH_CACHE_POOL:
-                assert marshCachePool != null : "Marshaller cache pool is not 
configured.";
-
-                return marshCachePool;
-
-            case IGFS_POOL:
-                assert igfsPool != null : "IGFS pool is not configured.";
-
-                return igfsPool;
-
-            default: {
-                assert plc >= 0 : "Negative policy: " + plc;
-
-                if (isReservedGridIoPolicy(plc))
-                    throw new IgniteCheckedException("Failed to process 
message with policy of reserved" +
-                        " range (0-31), [policy=" + plc + ']');
-
-                IoPool pool = ioPools[plc];
-
-                if (pool == null)
-                    throw new IgniteCheckedException("Failed to process 
message because no pool is registered " +
-                        "for policy. [policy=" + plc + ']');
-
-                assert plc == pool.id();
-
-                Executor ex = pool.executor();
-
-                if (ex == null)
-                    throw new IgniteCheckedException("Failed to process 
message because corresponding executor " +
-                        "is null. [id=" + plc + ']');
-
-                return ex;
-            }
-        }
-    }
-
-    /**
      * @param nodeId Node ID.
      * @param msg Message.
      * @param msgC Closure to call when message processing finished.
@@ -778,7 +639,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         };
 
         try {
-            p2pPool.execute(c);
+            pools.p2pPool().execute(c);
         }
         catch (RejectedExecutionException e) {
             U.error(log, "Failed to process P2P message due to execution 
rejection. Increase the upper bound " +
@@ -818,7 +679,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         };
 
         try {
-            pool(plc).execute(c);
+            pools.poolForPolicy(plc).execute(c);
         }
         catch (RejectedExecutionException e) {
             U.error(log, "Failed to process regular message due to execution 
rejection. Increase the upper bound " +
@@ -1154,7 +1015,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         };
 
         try {
-            pool(plc).execute(c);
+            pools.poolForPolicy(plc).execute(c);
         }
         catch (RejectedExecutionException e) {
             U.error(log, "Failed to process ordered message due to execution 
rejection. " +
@@ -1781,7 +1642,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
             try {
                 for (final GridCommunicationMessageSet msgSet : msgSets) {
-                    pool(msgSet.policy()).execute(
+                    pools.poolForPolicy(msgSet.policy()).execute(
                         new Runnable() {
                             @Override public void run() {
                                 unwindMessageSet(msgSet, lsnrs0);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java
deleted file mode 100644
index c17cedd..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.closure;
-
-import org.jetbrains.annotations.Nullable;
-
-/**
- * This enumeration defines different types of closure
- * processing by the closure processor.
- */
-public enum GridClosurePolicy {
-    /** Public execution pool. */
-    PUBLIC_POOL,
-
-    /** P2P execution pool. */
-    P2P_POOL,
-
-    /** System execution pool. */
-    SYSTEM_POOL,
-
-    /** IGFS pool. */
-    IGFS_POOL;
-
-    /** Enum values. */
-    private static final GridClosurePolicy[] VALS = values();
-
-    /**
-     * Efficiently gets enumerated value from its ordinal.
-     *
-     * @param ord Ordinal value.
-     * @return Enumerated value.
-     */
-    @Nullable public static GridClosurePolicy fromOrdinal(int ord) {
-        return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 6f878ce..a96d6eb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -50,8 +50,10 @@ import org.apache.ignite.internal.GridClosureCallMode;
 import org.apache.ignite.internal.GridInternalWrapper;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.resource.GridNoImplicitInjection;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -88,14 +90,8 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
     /** Ignite version in which binarylizable versions of closures were 
introduced. */
     public static final IgniteProductVersion BINARYLIZABLE_CLOSURES_SINCE = 
IgniteProductVersion.fromString("1.6.0");
 
-    /** */
-    private final Executor sysPool;
-
-    /** */
-    private final Executor pubPool;
-
-    /** */
-    private final Executor igfsPool;
+    /** Pool processor. */
+    private final PoolProcessor pools;
 
     /** Lock to control execution after stop. */
     private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
@@ -109,9 +105,9 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
     public GridClosureProcessor(GridKernalContext ctx) {
         super(ctx);
 
-        sysPool = ctx.getSystemExecutorService();
-        pubPool = ctx.getExecutorService();
-        igfsPool = ctx.getIgfsExecutorService();
+        pools = ctx.pools();
+
+        assert pools != null;
     }
 
     /** {@inheritDoc} */
@@ -731,57 +727,13 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * Gets pool by execution policy.
-     *
-     * @param plc Whether to get system or public pool.
-     * @return Requested worker pool.
-     */
-    private Executor pool(GridClosurePolicy plc) {
-        switch (plc) {
-            case PUBLIC_POOL:
-                return pubPool;
-
-            case SYSTEM_POOL:
-                return sysPool;
-
-            case IGFS_POOL:
-                return igfsPool;
-
-            default:
-                throw new IllegalArgumentException("Invalid closure execution 
policy: " + plc);
-        }
-    }
-
-    /**
-     * Gets pool name by execution policy.
-     *
-     * @param plc Policy to choose executor pool.
-     * @return Pool name.
-     */
-    private String poolName(GridClosurePolicy plc) {
-        switch (plc) {
-            case PUBLIC_POOL:
-                return "public";
-
-            case SYSTEM_POOL:
-                return "system";
-
-            case IGFS_POOL:
-                return "igfs";
-
-            default:
-                throw new IllegalArgumentException("Invalid closure execution 
policy: " + plc);
-        }
-    }
-
-    /**
      * @param c Closure to execute.
      * @param sys If {@code true}, then system pool will be used, otherwise 
public pool will be used.
      * @return Future.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, 
boolean sys) throws IgniteCheckedException {
-        return runLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : 
GridClosurePolicy.PUBLIC_POOL);
+        return runLocal(c, sys ? GridIoPolicy.SYSTEM_POOL : 
GridIoPolicy.PUBLIC_POOL);
     }
 
     /**
@@ -790,7 +742,8 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      * @return Future.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, 
GridClosurePolicy plc) throws IgniteCheckedException {
+    private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, byte 
plc)
+        throws IgniteCheckedException {
         if (c == null)
             return new GridFinishedFuture();
 
@@ -830,11 +783,11 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             fut.setWorker(w);
 
             try {
-                pool(plc).execute(w);
+                pools.poolForPolicy(plc).execute(w);
             }
             catch (RejectedExecutionException e) {
                 U.error(log, "Failed to execute worker due to execution 
rejection " +
-                    "(increase upper bound on " + poolName(plc) + " executor 
service).", e);
+                    "(increase upper bound on executor service) [policy=" + 
plc + ']', e);
 
                 w.run();
             }
@@ -866,7 +819,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      * @return Future.
      */
     public IgniteInternalFuture<?> runLocalSafe(Runnable c, boolean sys) {
-        return runLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : 
GridClosurePolicy.PUBLIC_POOL);
+        return runLocalSafe(c, sys ? GridIoPolicy.SYSTEM_POOL : 
GridIoPolicy.PUBLIC_POOL);
     }
 
     /**
@@ -877,7 +830,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      * @param plc Policy to choose executor pool.
      * @return Future.
      */
-    public IgniteInternalFuture<?> runLocalSafe(Runnable c, GridClosurePolicy 
plc) {
+    public IgniteInternalFuture<?> runLocalSafe(Runnable c, byte plc) {
         try {
             return runLocal(c, plc);
         }
@@ -921,7 +874,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> 
c, boolean sys) throws IgniteCheckedException {
-        return callLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : 
GridClosurePolicy.PUBLIC_POOL);
+        return callLocal(c, sys ? GridIoPolicy.SYSTEM_POOL : 
GridIoPolicy.PUBLIC_POOL);
     }
 
     /**
@@ -931,7 +884,8 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      * @return Future.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> 
c, GridClosurePolicy plc) throws IgniteCheckedException {
+    private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> 
c, byte plc)
+        throws IgniteCheckedException {
         if (c == null)
             return new GridFinishedFuture<>();
 
@@ -969,11 +923,11 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             fut.setWorker(w);
 
             try {
-                pool(plc).execute(w);
+                pools.poolForPolicy(plc).execute(w);
             }
             catch (RejectedExecutionException e) {
                 U.error(log, "Failed to execute worker due to execution 
rejection " +
-                    "(increase upper bound on " + poolName(plc) + " executor 
service).", e);
+                    "(increase upper bound on executor service) [policy=" + 
plc + ']', e);
 
                 w.run();
             }
@@ -1005,7 +959,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      * @return Future.
      */
     public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, boolean 
sys) {
-        return callLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : 
GridClosurePolicy.PUBLIC_POOL);
+        return callLocalSafe(c, sys ? GridIoPolicy.SYSTEM_POOL : 
GridIoPolicy.PUBLIC_POOL);
     }
 
     /**
@@ -1016,7 +970,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      * @param plc Policy to choose executor pool.
      * @return Future.
      */
-    public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, 
GridClosurePolicy plc) {
+    private <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte plc) 
{
         try {
             return callLocal(c, plc);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
new file mode 100644
index 0000000..41e805e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.pool;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
+import org.apache.ignite.plugin.extensions.communication.IoPool;
+
+import java.util.Arrays;
+import java.util.concurrent.Executor;
+
+/**
+ * Processor which abstracts out thread pool management.
+ */
+public class PoolProcessor extends GridProcessorAdapter {
+    /** Map of {@link IoPool}-s injected by Ignite plugins. */
+    private final IoPool[] extPools = new IoPool[128];
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     */
+    public PoolProcessor(GridKernalContext ctx) {
+        super(ctx);
+
+        IgnitePluginProcessor plugins = ctx.plugins();
+
+        if (plugins != null) {
+            // Process custom IO messaging pool extensions:
+            final IoPool[] executorExtensions = 
ctx.plugins().extensions(IoPool.class);
+
+            if (executorExtensions != null) {
+                // Store it into the map and check for duplicates:
+                for (IoPool ex : executorExtensions) {
+                    final byte id = ex.id();
+
+                    // 1. Check the pool id is non-negative:
+                    if (id < 0)
+                        throw new IgniteException("Failed to register IO 
executor pool because its ID is " +
+                            "negative: " + id);
+
+                    // 2. Check the pool id is in allowed range:
+                    if (GridIoPolicy.isReservedGridIoPolicy(id))
+                        throw new IgniteException("Failed to register IO 
executor pool because its ID in in the " +
+                            "reserved range: " + id);
+
+                    // 3. Check the pool for duplicates:
+                    if (extPools[id] != null)
+                        throw new IgniteException("Failed to register IO 
executor pool because its ID as " +
+                            "already used: " + id);
+
+                    extPools[id] = ex;
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) throws IgniteCheckedException {
+        // Avoid external thread pools GC retention.
+        Arrays.fill(extPools, null);
+    }
+
+    /**
+     * @return P2P pool.
+     */
+    public Executor p2pPool() {
+        return ctx.getPeerClassLoadingExecutorService();
+    }
+
+    /**
+     * Get executor service for policy.
+     *
+     * @param plc Policy.
+     * @return Executor service.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Executor poolForPolicy(byte plc) throws IgniteCheckedException {
+        switch (plc) {
+            case GridIoPolicy.P2P_POOL:
+                return ctx.getPeerClassLoadingExecutorService();
+            case GridIoPolicy.SYSTEM_POOL:
+                return ctx.getSystemExecutorService();
+            case GridIoPolicy.PUBLIC_POOL:
+                return ctx.getExecutorService();
+            case GridIoPolicy.MANAGEMENT_POOL:
+                return ctx.getManagementExecutorService();
+            case GridIoPolicy.AFFINITY_POOL:
+                return ctx.getAffinityExecutorService();
+
+            case GridIoPolicy.UTILITY_CACHE_POOL:
+                assert ctx.utilityCachePool() != null : "Utility cache pool is 
not configured.";
+
+                return ctx.utilityCachePool();
+
+            case GridIoPolicy.MARSH_CACHE_POOL:
+                assert ctx.marshallerCachePool() != null : "Marshaller cache 
pool is not configured.";
+
+                return ctx.marshallerCachePool();
+
+            case GridIoPolicy.IGFS_POOL:
+                assert ctx.getIgfsExecutorService() != null : "IGFS pool is 
not configured.";
+
+                return ctx.getIgfsExecutorService();
+
+            default: {
+                if (plc < 0)
+                    throw new IgniteCheckedException("Policy cannot be 
negative: " + plc);
+
+                if (GridIoPolicy.isReservedGridIoPolicy(plc))
+                    throw new IgniteCheckedException("Policy is reserved for 
internal usage (range 0-31): " + plc);
+
+                IoPool pool = extPools[plc];
+
+                if (pool == null)
+                    throw new IgniteCheckedException("No pool is registered 
for policy: " + plc);
+
+                assert plc == pool.id();
+
+                Executor res = pool.executor();
+
+                if (res == null)
+                    throw new IgniteCheckedException("Thread pool for policy 
is null: " + plc);
+
+                return res;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties 
b/modules/core/src/main/resources/META-INF/classnames.properties
index 57762c7..2d8d245 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -948,7 +948,6 @@ 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl
 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl$UserKeyCacheObjectImpl
 org.apache.ignite.internal.processors.clock.GridClockDeltaSnapshotMessage
 org.apache.ignite.internal.processors.clock.GridClockDeltaVersion
-org.apache.ignite.internal.processors.closure.GridClosurePolicy
 org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1
 org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1MLA
 org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1MLAV2

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerStopSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerStopSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerStopSelfTest.java
index 04eb8ef..328b775 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerStopSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerStopSelfTest.java
@@ -27,6 +27,7 @@ import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.failover.GridFailoverManager;
 import 
org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
 import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManager;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
 import org.apache.ignite.resources.LoggerResource;
@@ -68,6 +69,7 @@ public class GridManagerStopSelfTest extends 
GridCommonAbstractTest {
 
         ctx.config().setPeerClassLoadingEnabled(true);
 
+        ctx.add(new PoolProcessor(ctx));
         ctx.add(new GridResourceProcessor(ctx));
 
         ctx.start();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
index f44d282..adcd144 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
@@ -28,6 +28,7 @@ import 
org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -174,6 +175,7 @@ public class GridFutureAdapterSelfTest extends 
GridCommonAbstractTest {
         ctx.setExecutorService(Executors.newFixedThreadPool(1));
         ctx.setSystemExecutorService(Executors.newFixedThreadPool(1));
 
+        ctx.add(new PoolProcessor(ctx));
         ctx.add(new GridClosureProcessor(ctx));
 
         ctx.start();
@@ -238,6 +240,7 @@ public class GridFutureAdapterSelfTest extends 
GridCommonAbstractTest {
         ctx.setExecutorService(Executors.newFixedThreadPool(1));
         ctx.setSystemExecutorService(Executors.newFixedThreadPool(1));
 
+        ctx.add(new PoolProcessor(ctx));
         ctx.add(new GridClosureProcessor(ctx));
 
         ctx.start();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a0676f/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index e4afe73..cba67e0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -61,6 +61,7 @@ public class GridTestKernalContext extends 
GridKernalContextImpl {
                 null,
                 null,
                 null,
+                null,
                 U.allPluginProviders());
 
         GridTestUtils.setFieldValue(grid(), "cfg", config());

Reply via email to