IGNITE-4001: Timeouts for threads in Ignite pools. This closes #1130.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/23461b8d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/23461b8d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/23461b8d Branch: refs/heads/ignite-ssl-hotfix Commit: 23461b8d33922772ef8e7217e9e87b3f3b0b37b1 Parents: a92f20b Author: vozerov-gridgain <voze...@gridgain.com> Authored: Thu Oct 6 10:14:59 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Thu Oct 6 10:14:59 2016 +0300 ---------------------------------------------------------------------- .../configuration/IgniteConfiguration.java | 18 +++-- .../org/apache/ignite/internal/IgnitionEx.java | 71 ++++++++++---------- .../processors/igfs/IgfsThreadFactory.java | 61 +++++++++++++++++ 3 files changed, 112 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/23461b8d/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 19b9a4d..73de470 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -149,29 +149,38 @@ public class IgniteConfiguration { public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, AVAILABLE_PROC_CNT) * 2; /** Default keep alive time for public thread pool. */ + @Deprecated public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0; /** Default limit of threads used for rebalance. */ public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 1; /** Default max queue capacity of public thread pool. */ + @Deprecated public static final int DFLT_PUBLIC_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE; /** Default size of system thread pool. */ public static final int DFLT_SYSTEM_CORE_THREAD_CNT = DFLT_PUBLIC_THREAD_CNT; /** Default max size of system thread pool. */ + @Deprecated public static final int DFLT_SYSTEM_MAX_THREAD_CNT = DFLT_PUBLIC_THREAD_CNT; /** Default keep alive time for system thread pool. */ + @Deprecated public static final long DFLT_SYSTEM_KEEP_ALIVE_TIME = 0; /** Default keep alive time for utility thread pool. */ + @Deprecated public static final long DFLT_UTILITY_KEEP_ALIVE_TIME = 10_000; /** Default max queue capacity of system thread pool. */ + @Deprecated public static final int DFLT_SYSTEM_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE; + /** Default Ignite thread keep alive time. */ + public static final long DFLT_THREAD_KEEP_ALIVE_TIME = 60_000L; + /** Default size of peer class loading thread pool. */ public static final int DFLT_P2P_THREAD_CNT = 2; @@ -240,13 +249,13 @@ public class IgniteConfiguration { private int utilityCachePoolSize = DFLT_SYSTEM_CORE_THREAD_CNT; /** Utility cache pool keep alive time. */ - private long utilityCacheKeepAliveTime = DFLT_UTILITY_KEEP_ALIVE_TIME; + private long utilityCacheKeepAliveTime = DFLT_THREAD_KEEP_ALIVE_TIME; /** Marshaller pool size. */ private int marshCachePoolSize = DFLT_SYSTEM_CORE_THREAD_CNT; /** Marshaller pool keep alive time. */ - private long marshCacheKeepAliveTime = DFLT_UTILITY_KEEP_ALIVE_TIME; + private long marshCacheKeepAliveTime = DFLT_THREAD_KEEP_ALIVE_TIME; /** P2P pool size. */ private int p2pPoolSize = DFLT_P2P_THREAD_CNT; @@ -492,6 +501,7 @@ public class IgniteConfiguration { cacheCfg = cfg.getCacheConfiguration(); cacheKeyCfg = cfg.getCacheKeyConfiguration(); cacheSanityCheckEnabled = cfg.isCacheSanityCheckEnabled(); + callbackPoolSize = cfg.getAsyncCallbackPoolSize(); connectorCfg = cfg.getConnectorConfiguration(); classLdr = cfg.getClassLoader(); clientMode = cfg.isClientMode(); @@ -792,7 +802,7 @@ public class IgniteConfiguration { /** * Keep alive time of thread pool that is in charge of processing utility cache messages. * <p> - * If not provided, executor service will have keep alive time {@link #DFLT_UTILITY_KEEP_ALIVE_TIME}. + * If not provided, executor service will have keep alive time {@link #DFLT_THREAD_KEEP_ALIVE_TIME}. * * @return Thread pool keep alive time (in milliseconds) to be used in grid for utility cache messages. */ @@ -814,7 +824,7 @@ public class IgniteConfiguration { /** * Keep alive time of thread pool that is in charge of processing marshaller messages. * <p> - * If not provided, executor service will have keep alive time {@link #DFLT_UTILITY_KEEP_ALIVE_TIME}. + * If not provided, executor service will have keep alive time {@link #DFLT_THREAD_KEEP_ALIVE_TIME}. * * @return Thread pool keep alive time (in milliseconds) to be used in grid for marshaller messages. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/23461b8d/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 2914c7c..001f599 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -36,7 +36,6 @@ import java.util.Map.Entry; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,6 +61,7 @@ import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.processors.igfs.IgfsThreadFactory; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.processors.resource.GridSpringResourceContext; import org.apache.ignite.internal.util.GridConcurrentHashSet; @@ -119,10 +119,7 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_PUBLIC_KEEP_ALIVE_TIME; -import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_PUBLIC_THREADPOOL_QUEUE_CAP; -import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SYSTEM_KEEP_ALIVE_TIME; -import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SYSTEM_THREADPOOL_QUEUE_CAP; +import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME; import static org.apache.ignite.internal.IgniteComponentType.SPRING; import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.RESTART_JVM; @@ -1457,28 +1454,28 @@ public class IgnitionEx { private volatile IgniteKernal grid; /** Executor service. */ - private ExecutorService execSvc; + private ThreadPoolExecutor execSvc; /** System executor service. */ - private ExecutorService sysExecSvc; + private ThreadPoolExecutor sysExecSvc; /** Management executor service. */ - private ExecutorService mgmtExecSvc; + private ThreadPoolExecutor mgmtExecSvc; /** P2P executor service. */ - private ExecutorService p2pExecSvc; + private ThreadPoolExecutor p2pExecSvc; /** IGFS executor service. */ - private ExecutorService igfsExecSvc; + private ThreadPoolExecutor igfsExecSvc; /** REST requests executor service. */ - private ExecutorService restExecSvc; + private ThreadPoolExecutor restExecSvc; /** Utility cache executor service. */ - private ExecutorService utilityCacheExecSvc; + private ThreadPoolExecutor utilityCacheExecSvc; /** Marshaller cache executor service. */ - private ExecutorService marshCacheExecSvc; + private ThreadPoolExecutor marshCacheExecSvc; /** Continuous query executor service. */ private IgniteStripedThreadPoolExecutor callbackExecSvc; @@ -1642,12 +1639,10 @@ public class IgnitionEx { cfg.getGridName(), cfg.getPublicThreadPoolSize(), cfg.getPublicThreadPoolSize(), - DFLT_PUBLIC_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP)); + DFLT_THREAD_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<Runnable>()); - if (!myCfg.isClientMode()) - // Pre-start all threads as they are guaranteed to be needed. - ((ThreadPoolExecutor)execSvc).prestartAllCoreThreads(); + execSvc.allowCoreThreadTimeOut(true); // Note that since we use 'LinkedBlockingQueue', number of // maximum threads has no effect. @@ -1656,11 +1651,10 @@ public class IgnitionEx { cfg.getGridName(), cfg.getSystemThreadPoolSize(), cfg.getSystemThreadPoolSize(), - DFLT_SYSTEM_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP)); + DFLT_THREAD_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<Runnable>()); - // Pre-start all threads as they are guaranteed to be needed. - ((ThreadPoolExecutor)sysExecSvc).prestartAllCoreThreads(); + sysExecSvc.allowCoreThreadTimeOut(true); // Note that since we use 'LinkedBlockingQueue', number of // maximum threads has no effect. @@ -1671,9 +1665,11 @@ public class IgnitionEx { cfg.getGridName(), cfg.getManagementThreadPoolSize(), cfg.getManagementThreadPoolSize(), - 0, + DFLT_THREAD_KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>()); + mgmtExecSvc.allowCoreThreadTimeOut(true); + // Note that since we use 'LinkedBlockingQueue', number of // maximum threads has no effect. // Note, that we do not pre-start threads here as class loading pool may @@ -1683,20 +1679,21 @@ public class IgnitionEx { cfg.getGridName(), cfg.getPeerClassLoadingThreadPoolSize(), cfg.getPeerClassLoadingThreadPoolSize(), - 0, + DFLT_THREAD_KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>()); + p2pExecSvc.allowCoreThreadTimeOut(true); + // Note that we do not pre-start threads here as igfs pool may not be needed. igfsExecSvc = new IgniteThreadPoolExecutor( - "igfs", - cfg.getGridName(), cfg.getIgfsThreadPoolSize(), cfg.getIgfsThreadPoolSize(), - 0, - new LinkedBlockingQueue<Runnable>()); + DFLT_THREAD_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<Runnable>(), + new IgfsThreadFactory(cfg.getGridName(), "igfs"), + null /* Abort policy will be used. */); - // Pre-start all threads to avoid HadoopClassLoader leaks. - ((ThreadPoolExecutor)igfsExecSvc).prestartAllCoreThreads(); + igfsExecSvc.allowCoreThreadTimeOut(true); // Note that we do not pre-start threads here as this pool may not be needed. callbackExecSvc = new IgniteStripedThreadPoolExecutor( @@ -1710,9 +1707,11 @@ public class IgnitionEx { myCfg.getGridName(), myCfg.getConnectorConfiguration().getThreadPoolSize(), myCfg.getConnectorConfiguration().getThreadPoolSize(), - ConnectorConfiguration.DFLT_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>(ConnectorConfiguration.DFLT_THREADPOOL_QUEUE_CAP) + DFLT_THREAD_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<Runnable>() ); + + restExecSvc.allowCoreThreadTimeOut(true); } utilityCacheExecSvc = new IgniteThreadPoolExecutor( @@ -1721,7 +1720,9 @@ public class IgnitionEx { myCfg.getUtilityCacheThreadPoolSize(), myCfg.getUtilityCacheThreadPoolSize(), myCfg.getUtilityCacheKeepAliveTime(), - new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP)); + new LinkedBlockingQueue<Runnable>()); + + utilityCacheExecSvc.allowCoreThreadTimeOut(true); marshCacheExecSvc = new IgniteThreadPoolExecutor( "marshaller-cache", @@ -1729,7 +1730,9 @@ public class IgnitionEx { myCfg.getMarshallerCacheThreadPoolSize(), myCfg.getMarshallerCacheThreadPoolSize(), myCfg.getMarshallerCacheKeepAliveTime(), - new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP)); + new LinkedBlockingQueue<Runnable>()); + + marshCacheExecSvc.allowCoreThreadTimeOut(true); // Register Ignite MBean for current grid instance. registerFactoryMbean(myCfg.getMBeanServer()); http://git-wip-us.apache.org/repos/asf/ignite/blob/23461b8d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThreadFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThreadFactory.java new file mode 100644 index 0000000..32cab0f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThreadFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.thread.IgniteThreadFactory; +import org.jetbrains.annotations.NotNull; + +/** + * Special thread factory used only for IGFS pool which prevents {@link HadoopClassLoader} leak into + * {@code Thread.contextClassLoader} field. To achieve this we switch context class loader back and forth when + * creating threads. + */ +public class IgfsThreadFactory extends IgniteThreadFactory { + /** + * Constructor. + * + * @param gridName Grid name. + * @param threadName Thread name. + */ + public IgfsThreadFactory(String gridName, String threadName) { + super(gridName, threadName); + } + + /** {@inheritDoc} */ + @Override public Thread newThread(@NotNull Runnable r) { + Thread curThread = Thread.currentThread(); + + ClassLoader oldLdr = curThread.getContextClassLoader(); + + curThread.setContextClassLoader(IgfsThreadFactory.class.getClassLoader()); + + try { + return super.newThread(r); + } + finally { + curThread.setContextClassLoader(oldLdr); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsThreadFactory.class, this); + } +}