IGNITE-6892 OOM should be covered by failure handling

Signed-off-by: Andrey Gura <ag...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d1be9b85
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d1be9b85
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d1be9b85

Branch: refs/heads/ignite-8201
Commit: d1be9b85507eb3358327e93b81031f92e660531b
Parents: 32fc6c3
Author: Aleksey Plekhanov <plehanov.a...@gmail.com>
Authored: Wed Apr 11 18:24:51 2018 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Wed Apr 11 18:24:51 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   8 +
 .../org/apache/ignite/internal/IgnitionEx.java  |  50 +++-
 .../discovery/GridDiscoveryManager.java         |   3 +
 .../processors/cache/WalStateManager.java       |   8 +-
 .../continuous/GridContinuousProcessor.java     |   3 +
 .../datastreamer/DataStreamProcessor.java       |   3 +
 .../processors/failure/FailureProcessor.java    |  11 +
 .../internal/processors/job/GridJobWorker.java  |   8 +-
 .../service/GridServiceProcessor.java           |  15 +-
 .../thread/IgniteStripedThreadPoolExecutor.java |   8 +-
 .../ignite/thread/IgniteThreadFactory.java      |  30 ++-
 .../ignite/thread/IgniteThreadPoolExecutor.java |  12 +-
 .../ignite/thread/OomExceptionHandler.java      |  44 ++++
 .../ignite/failure/OomFailureHandlerTest.java   | 255 +++++++++++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java |   2 +
 15 files changed, 430 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 662338c..437f49f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -863,6 +863,14 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_BPLUS_TREE_LOCK_RETRIES = 
"IGNITE_BPLUS_TREE_LOCK_RETRIES";
 
     /**
+     * Amount of memory reserved in the heap at node start, which can be 
dropped to increase the chances of success when
+     * handling OutOfMemoryError.
+     *
+     * Default is {@code 64kb}.
+     */
+    public static final String IGNITE_FAILURE_HANDLER_RESERVE_BUFFER_SIZE = 
"IGNITE_FAILURE_HANDLER_RESERVE_BUFFER_SIZE";
+
+    /**
      * The threshold of uneven distribution above which partition distribution 
will be logged.
      *
      * The default is '50', that means: warn about nodes with 50+% difference.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 10a0752..b3c3ee8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
 import java.net.MalformedURLException;
@@ -88,6 +89,7 @@ import org.apache.ignite.internal.util.typedef.CA;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -1764,6 +1766,13 @@ public class IgnitionEx {
 
             validateThreadPoolSize(cfg.getPublicThreadPoolSize(), "public");
 
+            UncaughtExceptionHandler oomeHnd = new UncaughtExceptionHandler() {
+                @Override public void uncaughtException(Thread t, Throwable e) 
{
+                    if (grid != null && X.hasCause(e, OutOfMemoryError.class))
+                        grid.context().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
+                }
+            };
+
             execSvc = new IgniteThreadPoolExecutor(
                 "pub",
                 cfg.getIgniteInstanceName(),
@@ -1771,7 +1780,8 @@ public class IgnitionEx {
                 cfg.getPublicThreadPoolSize(),
                 DFLT_THREAD_KEEP_ALIVE_TIME,
                 new LinkedBlockingQueue<Runnable>(),
-                GridIoPolicy.PUBLIC_POOL);
+                GridIoPolicy.PUBLIC_POOL,
+                oomeHnd);
 
             execSvc.allowCoreThreadTimeOut(true);
 
@@ -1784,7 +1794,8 @@ public class IgnitionEx {
                 cfg.getServiceThreadPoolSize(),
                 DFLT_THREAD_KEEP_ALIVE_TIME,
                 new LinkedBlockingQueue<Runnable>(),
-                GridIoPolicy.SERVICE_POOL);
+                GridIoPolicy.SERVICE_POOL,
+                oomeHnd);
 
             svcExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1797,7 +1808,8 @@ public class IgnitionEx {
                 cfg.getSystemThreadPoolSize(),
                 DFLT_THREAD_KEEP_ALIVE_TIME,
                 new LinkedBlockingQueue<Runnable>(),
-                GridIoPolicy.SYSTEM_POOL);
+                GridIoPolicy.SYSTEM_POOL,
+                oomeHnd);
 
             sysExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1828,7 +1840,8 @@ public class IgnitionEx {
                 cfg.getManagementThreadPoolSize(),
                 DFLT_THREAD_KEEP_ALIVE_TIME,
                 new LinkedBlockingQueue<Runnable>(),
-                GridIoPolicy.MANAGEMENT_POOL);
+                GridIoPolicy.MANAGEMENT_POOL,
+                oomeHnd);
 
             mgmtExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1844,7 +1857,8 @@ public class IgnitionEx {
                 cfg.getPeerClassLoadingThreadPoolSize(),
                 DFLT_THREAD_KEEP_ALIVE_TIME,
                 new LinkedBlockingQueue<Runnable>(),
-                GridIoPolicy.P2P_POOL);
+                GridIoPolicy.P2P_POOL,
+                oomeHnd);
 
             p2pExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1879,7 +1893,8 @@ public class IgnitionEx {
             callbackExecSvc = new IgniteStripedThreadPoolExecutor(
                 cfg.getAsyncCallbackPoolSize(),
                 cfg.getIgniteInstanceName(),
-                "callback");
+                "callback",
+                oomeHnd);
 
             if (myCfg.getConnectorConfiguration() != null) {
                 
validateThreadPoolSize(myCfg.getConnectorConfiguration().getThreadPoolSize(), 
"connector");
@@ -1890,7 +1905,9 @@ public class IgnitionEx {
                     myCfg.getConnectorConfiguration().getThreadPoolSize(),
                     myCfg.getConnectorConfiguration().getThreadPoolSize(),
                     DFLT_THREAD_KEEP_ALIVE_TIME,
-                    new LinkedBlockingQueue<Runnable>()
+                    new LinkedBlockingQueue<Runnable>(),
+                    GridIoPolicy.UNDEFINED,
+                    oomeHnd
                 );
 
                 restExecSvc.allowCoreThreadTimeOut(true);
@@ -1905,7 +1922,8 @@ public class IgnitionEx {
                 myCfg.getUtilityCacheThreadPoolSize(),
                 myCfg.getUtilityCacheKeepAliveTime(),
                 new LinkedBlockingQueue<Runnable>(),
-                GridIoPolicy.UTILITY_CACHE_POOL);
+                GridIoPolicy.UTILITY_CACHE_POOL,
+                oomeHnd);
 
             utilityCacheExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1916,7 +1934,8 @@ public class IgnitionEx {
                 1,
                 DFLT_THREAD_KEEP_ALIVE_TIME,
                 new LinkedBlockingQueue<Runnable>(),
-                GridIoPolicy.AFFINITY_POOL);
+                GridIoPolicy.AFFINITY_POOL,
+                oomeHnd);
 
             affExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1930,7 +1949,8 @@ public class IgnitionEx {
                     cpus * 2,
                     3000L,
                     new LinkedBlockingQueue<Runnable>(1000),
-                    GridIoPolicy.IDX_POOL
+                    GridIoPolicy.IDX_POOL,
+                    oomeHnd
                 );
             }
 
@@ -1943,7 +1963,8 @@ public class IgnitionEx {
                 cfg.getQueryThreadPoolSize(),
                 DFLT_THREAD_KEEP_ALIVE_TIME,
                 new LinkedBlockingQueue<Runnable>(),
-                GridIoPolicy.QUERY_POOL);
+                GridIoPolicy.QUERY_POOL,
+                oomeHnd);
 
             qryExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1954,7 +1975,8 @@ public class IgnitionEx {
                 2,
                 DFLT_THREAD_KEEP_ALIVE_TIME,
                 new LinkedBlockingQueue<Runnable>(),
-                GridIoPolicy.SCHEMA_POOL);
+                GridIoPolicy.SCHEMA_POOL,
+                oomeHnd);
 
             schemaExecSvc.allowCoreThreadTimeOut(true);
 
@@ -1970,7 +1992,9 @@ public class IgnitionEx {
                         execCfg.getSize(),
                         execCfg.getSize(),
                         DFLT_THREAD_KEEP_ALIVE_TIME,
-                        new LinkedBlockingQueue<Runnable>());
+                        new LinkedBlockingQueue<Runnable>(),
+                        GridIoPolicy.UNDEFINED,
+                        oomeHnd);
 
                     customExecSvcs.put(execCfg.getName(), exec);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 400bb5f..77c9657 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -130,6 +130,7 @@ import 
org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
 import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.thread.OomExceptionHandler;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -924,6 +925,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
             segChkThread = new IgniteThread(segChkWrk);
 
+            segChkThread.setUncaughtExceptionHandler(new 
OomExceptionHandler(ctx));
+
             segChkThread.start();
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
index 0ac699f..64a6819 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
@@ -38,6 +38,7 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.thread.OomExceptionHandler;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
@@ -473,7 +474,12 @@ public class WalStateManager extends 
GridCacheSharedManagerAdapter {
                                     // not-yet-flushed dirty pages have been 
logged.
                                     WalStateChangeWorker worker = new 
WalStateChangeWorker(msg, cpFut);
 
-                                    new IgniteThread(worker).start();
+                                    IgniteThread thread = new 
IgniteThread(worker);
+
+                                    thread.setUncaughtExceptionHandler(new 
OomExceptionHandler(
+                                        cctx.kernalContext()));
+
+                                    thread.start();
                                 }
                                 else {
                                     // Disable: not-yet-flushed operations are 
not logged, so wait for them

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index cebe4b1..2d48b7d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -88,6 +88,7 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
 import 
org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
 import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.thread.OomExceptionHandler;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -1727,6 +1728,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                     }
                 });
 
+                checker.setUncaughtExceptionHandler(new 
OomExceptionHandler(ctx));
+
                 bufCheckThreads.put(routineId, checker);
 
                 checker.start();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 8b984c0..e63d7d4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -44,6 +44,7 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.stream.StreamReceiver;
 import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.thread.OomExceptionHandler;
 import org.jetbrains.annotations.Nullable;
 
 import java.util.Collection;
@@ -125,6 +126,8 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
             }
         });
 
+        flusher.setUncaughtExceptionHandler(new OomExceptionHandler(ctx));
+
         flusher.start();
 
         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
index 615fb9f..0234e84 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
@@ -19,12 +19,14 @@ package org.apache.ignite.internal.processors.failure;
 
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureHandler;
 import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
@@ -40,6 +42,9 @@ public class FailureProcessor extends GridProcessorAdapter {
     /** Failure context. */
     private volatile FailureContext failureCtx;
 
+    /** Reserve buffer, which can be dropped to handle OOME. */
+    private volatile byte[] reserveBuf;
+
     /**
      * @param ctx Context.
      */
@@ -56,6 +61,9 @@ public class FailureProcessor extends GridProcessorAdapter {
         if (hnd == null)
             hnd = getDefaultFailureHandler();
 
+        reserveBuf = new byte[IgniteSystemProperties.getInteger(
+            IgniteSystemProperties.IGNITE_FAILURE_HANDLER_RESERVE_BUFFER_SIZE, 
64 * 1024)];
+
         assert hnd != null;
 
         this.hnd = hnd;
@@ -102,6 +110,9 @@ public class FailureProcessor extends GridProcessorAdapter {
         U.error(ignite.log(), "Critical failure. Will be handled accordingly 
to configured handler [hnd=" +
             hnd.getClass() + ", failureCtx=" + failureCtx + ']', 
failureCtx.error());
 
+        if (reserveBuf != null && X.hasCause(failureCtx.error(), 
OutOfMemoryError.class))
+            reserveBuf = null;
+
         boolean invalidated = hnd.onFailure(ignite, failureCtx);
 
         if (invalidated) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 6d2e621..f7c07f5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -36,6 +36,8 @@ import org.apache.ignite.compute.ComputeJobContext;
 import org.apache.ignite.compute.ComputeJobMasterLeaveAware;
 import org.apache.ignite.compute.ComputeUserUndeclaredException;
 import org.apache.ignite.events.JobEvent;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.igfs.IgfsOutOfSpaceException;
 import org.apache.ignite.internal.GridInternalException;
 import org.apache.ignite.internal.GridJobContextImpl;
@@ -603,9 +605,13 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
                         X.hasCause(e, ClusterTopologyCheckedException.class))
                         // Should be throttled, because GridServiceProxy 
continuously retry getting service.
                         LT.error(log, e, "Failed to execute job [jobId=" + 
ses.getJobId() + ", ses=" + ses + ']');
-                    else
+                    else {
                         U.error(log, "Failed to execute job [jobId=" + 
ses.getJobId() + ", ses=" + ses + ']', e);
 
+                        if (X.hasCause(e, OutOfMemoryError.class))
+                            ctx.failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
+                    }
+
                     ex = e;
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index ff68e72..63f5027 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.service;
 
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -103,6 +104,7 @@ import org.apache.ignite.services.ServiceConfiguration;
 import org.apache.ignite.services.ServiceDeploymentException;
 import org.apache.ignite.services.ServiceDescriptor;
 import org.apache.ignite.thread.IgniteThreadFactory;
+import org.apache.ignite.thread.OomExceptionHandler;
 import org.apache.ignite.transactions.Transaction;
 import org.jetbrains.annotations.Nullable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -112,7 +114,6 @@ import static 
org.apache.ignite.IgniteSystemProperties.getString;
 import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
 import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SERVICES_COMPATIBILITY_MODE;
-import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
 import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
 import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -154,8 +155,12 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
     /** Busy lock. */
     private volatile GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
+    /** Uncaught exception handler for thread pools. */
+    private final UncaughtExceptionHandler oomeHnd = new 
OomExceptionHandler(ctx);
+
     /** Thread factory. */
-    private ThreadFactory threadFactory = new 
IgniteThreadFactory(ctx.igniteInstanceName(), "service");
+    private ThreadFactory threadFactory = new 
IgniteThreadFactory(ctx.igniteInstanceName(), "service",
+        oomeHnd);
 
     /** Thread local for service name. */
     private ThreadLocal<String> svcName = new ThreadLocal<>();
@@ -175,7 +180,8 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
     public GridServiceProcessor(GridKernalContext ctx) {
         super(ctx);
 
-        depExe = Executors.newSingleThreadExecutor(new 
IgniteThreadFactory(ctx.igniteInstanceName(), "srvc-deploy"));
+        depExe = Executors.newSingleThreadExecutor(new 
IgniteThreadFactory(ctx.igniteInstanceName(),
+            "srvc-deploy", oomeHnd));
 
         String servicesCompatibilityMode = 
getString(IGNITE_SERVICES_COMPATIBILITY_MODE);
 
@@ -373,7 +379,8 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
 
         busyLock = new GridSpinBusyLock();
 
-        depExe = Executors.newSingleThreadExecutor(new 
IgniteThreadFactory(ctx.igniteInstanceName(), "srvc-deploy"));
+        depExe = Executors.newSingleThreadExecutor(new 
IgniteThreadFactory(ctx.igniteInstanceName(),
+            "srvc-deploy", oomeHnd));
 
         start();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
index 3cd7484..418812f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.thread;
 
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -45,10 +46,11 @@ public class IgniteStripedThreadPoolExecutor implements 
ExecutorService {
      * @param igniteInstanceName Node name.
      * @param threadNamePrefix Thread name prefix.
      */
-    public IgniteStripedThreadPoolExecutor(int concurrentLvl, String 
igniteInstanceName, String threadNamePrefix) {
+    public IgniteStripedThreadPoolExecutor(int concurrentLvl, String 
igniteInstanceName, String threadNamePrefix,
+        UncaughtExceptionHandler eHnd) {
         execs = new ExecutorService[concurrentLvl];
 
-        ThreadFactory factory = new IgniteThreadFactory(igniteInstanceName, 
threadNamePrefix);
+        ThreadFactory factory = new IgniteThreadFactory(igniteInstanceName, 
threadNamePrefix, eHnd);
 
         for (int i = 0; i < concurrentLvl; i++)
             execs[i] = Executors.newSingleThreadExecutor(factory);
@@ -173,4 +175,4 @@ public class IgniteStripedThreadPoolExecutor implements 
ExecutorService {
     @Override public String toString() {
         return S.toString(IgniteStripedThreadPoolExecutor.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java 
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
index 062c973..23bf14d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
@@ -17,9 +17,9 @@
 
 package org.apache.ignite.thread;
 
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.NotNull;
@@ -41,6 +41,9 @@ public class IgniteThreadFactory implements ThreadFactory {
     /** */
     private final byte plc;
 
+    /** Exception handler. */
+    private final UncaughtExceptionHandler eHnd;
+
     /**
      * Constructs new thread factory for given grid. All threads will belong
      * to the same default thread group.
@@ -49,7 +52,19 @@ public class IgniteThreadFactory implements ThreadFactory {
      * @param threadName Thread name.
      */
     public IgniteThreadFactory(String igniteInstanceName, String threadName) {
-        this(igniteInstanceName, threadName, GridIoPolicy.UNDEFINED);
+        this(igniteInstanceName, threadName, null);
+    }
+
+    /**
+     * Constructs new thread factory for given grid. All threads will belong
+     * to the same default thread group.
+     *
+     * @param igniteInstanceName Ignite instance name.
+     * @param threadName Thread name.
+     * @param eHnd Uncaught exception handler.
+     */
+    public IgniteThreadFactory(String igniteInstanceName, String threadName, 
UncaughtExceptionHandler eHnd) {
+        this(igniteInstanceName, threadName, GridIoPolicy.UNDEFINED, eHnd);
     }
 
     /**
@@ -59,16 +74,23 @@ public class IgniteThreadFactory implements ThreadFactory {
      * @param igniteInstanceName Ignite instance name.
      * @param threadName Thread name.
      * @param plc {@link GridIoPolicy} for thread pool.
+     * @param eHnd Uncaught exception handler.
      */
-    public IgniteThreadFactory(String igniteInstanceName, String threadName, 
byte plc) {
+    public IgniteThreadFactory(String igniteInstanceName, String threadName, 
byte plc, UncaughtExceptionHandler eHnd) {
         this.igniteInstanceName = igniteInstanceName;
         this.threadName = threadName;
         this.plc = plc;
+        this.eHnd = eHnd;
     }
 
     /** {@inheritDoc} */
     @Override public Thread newThread(@NotNull Runnable r) {
-        return new IgniteThread(igniteInstanceName, threadName, r, 
idxGen.incrementAndGet(), -1, plc);
+        Thread thread = new IgniteThread(igniteInstanceName, threadName, r, 
idxGen.incrementAndGet(), -1, plc);
+
+        if (eHnd != null)
+            thread.setUncaughtExceptionHandler(eHnd);
+
+        return thread;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
index 83c64c3..fed77ad 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.thread;
 
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -53,7 +54,8 @@ public class IgniteThreadPoolExecutor extends 
ThreadPoolExecutor {
             maxPoolSize,
             keepAliveTime,
             workQ,
-            GridIoPolicy.UNDEFINED);
+            GridIoPolicy.UNDEFINED,
+            null);
     }
 
     /**
@@ -68,6 +70,7 @@ public class IgniteThreadPoolExecutor extends 
ThreadPoolExecutor {
      * @param workQ The queue to use for holding tasks before they are 
executed. This queue will hold only
      *      runnable tasks submitted by the {@link #execute(Runnable)} method.
      * @param plc {@link GridIoPolicy} for thread pool.
+     * @param eHnd Uncaught exception handler for thread pool.
      */
     public IgniteThreadPoolExecutor(
         String threadNamePrefix,
@@ -76,14 +79,15 @@ public class IgniteThreadPoolExecutor extends 
ThreadPoolExecutor {
         int maxPoolSize,
         long keepAliveTime,
         BlockingQueue<Runnable> workQ,
-        byte plc) {
+        byte plc,
+        UncaughtExceptionHandler eHnd) {
         super(
             corePoolSize,
             maxPoolSize,
             keepAliveTime,
             TimeUnit.MILLISECONDS,
             workQ,
-            new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, plc)
+            new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, plc, 
eHnd)
         );
     }
 
@@ -114,4 +118,4 @@ public class IgniteThreadPoolExecutor extends 
ThreadPoolExecutor {
             new AbortPolicy()
         );
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/thread/OomExceptionHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/thread/OomExceptionHandler.java 
b/modules/core/src/main/java/org/apache/ignite/thread/OomExceptionHandler.java
new file mode 100644
index 0000000..3a62ad8
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/thread/OomExceptionHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.thread;
+
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.util.typedef.X;
+
+/**
+ * OOM exception handler for system threads.
+ */
+public class OomExceptionHandler implements Thread.UncaughtExceptionHandler {
+    /** Context. */
+    private final GridKernalContext ctx;
+
+    /**
+     * @param ctx Context.
+     */
+    public OomExceptionHandler(GridKernalContext ctx) {
+        this.ctx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void uncaughtException(Thread t, Throwable e) {
+        if (X.hasCause(e, OutOfMemoryError.class))
+            ctx.failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/test/java/org/apache/ignite/failure/OomFailureHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/failure/OomFailureHandlerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/failure/OomFailureHandlerTest.java
new file mode 100644
index 0000000..2af94b8
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/failure/OomFailureHandlerTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.failure;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Out of memory error failure handler test.
+ */
+public class OomFailureHandlerTest extends AbstractFailureHandlerTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setCacheConfiguration(new CacheConfiguration()
+            .setName(DEFAULT_CACHE_NAME)
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setBackups(0)
+        );
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Test OOME in IgniteCompute.
+     */
+    public void testComputeOomError() throws Exception {
+        IgniteEx ignite0 = startGrid(0);
+        IgniteEx ignite1 = startGrid(1);
+
+        try {
+            IgniteFuture<Boolean> res = 
ignite0.compute(ignite0.cluster().forNodeId(ignite1.cluster().localNode().id()))
+                .callAsync(new IgniteCallable<Boolean>() {
+                    @Override public Boolean call() throws Exception {
+                        throw new OutOfMemoryError();
+                    }
+                });
+
+            res.get();
+        }
+        catch (Throwable ignore) {
+            // Expected.
+        }
+
+        assertFailureState(ignite0, ignite1);
+    }
+
+    /**
+     * Test OOME in EntryProcessor.
+     */
+    public void testEntryProcessorOomError() throws Exception {
+        IgniteEx ignite0 = startGrid(0);
+        IgniteEx ignite1 = startGrid(1);
+
+        IgniteCache<Integer, Integer> cache0 = 
ignite0.getOrCreateCache(DEFAULT_CACHE_NAME);
+        IgniteCache<Integer, Integer> cache1 = 
ignite1.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        awaitPartitionMapExchange();
+
+        Integer key = primaryKey(cache1);
+
+        cache1.put(key, key);
+
+        try {
+            IgniteFuture fut = cache0.invokeAsync(key, new 
EntryProcessor<Integer, Integer, Object>() {
+                @Override public Object process(MutableEntry<Integer, Integer> 
entry,
+                    Object... arguments) throws EntryProcessorException {
+                    throw new OutOfMemoryError();
+                }
+            });
+
+            fut.get();
+        }
+        catch (Throwable ignore) {
+            // Expected.
+        }
+
+        assertFailureState(ignite0, ignite1);
+    }
+
+    /**
+     * Test OOME in service method invocation.
+     */
+    public void testServiceInvokeOomError() throws Exception {
+        IgniteEx ignite0 = startGrid(0);
+        IgniteEx ignite1 = startGrid(1);
+
+        IgniteCache<Integer, Integer> cache1 = 
ignite1.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        awaitPartitionMapExchange();
+
+        Integer key = primaryKey(cache1);
+
+        ignite0.services().deployKeyAffinitySingleton("fail-invoke-service", 
new FailServiceImpl(false),
+            DEFAULT_CACHE_NAME, key);
+
+        FailService svc = 
ignite0.services().serviceProxy("fail-invoke-service", FailService.class, 
false);
+
+        try {
+            svc.fail();
+        }
+        catch (Throwable ignore) {
+            // Expected.
+        }
+
+        assertFailureState(ignite0, ignite1);
+    }
+
+    /**
+     * Test OOME in service execute.
+     */
+    public void testServiceExecuteOomError() throws Exception {
+        IgniteEx ignite0 = startGrid(0);
+        IgniteEx ignite1 = startGrid(1);
+
+        IgniteCache<Integer, Integer> cache1 = 
ignite1.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        awaitPartitionMapExchange();
+
+        Integer key = primaryKey(cache1);
+
+        ignite0.services().deployKeyAffinitySingleton("fail-execute-service", 
new FailServiceImpl(true),
+            DEFAULT_CACHE_NAME, key);
+
+        assertFailureState(ignite0, ignite1);
+    }
+
+    /**
+     * Test OOME in event listener.
+     */
+    public void testEventListenerOomError() throws Exception {
+        IgniteEx ignite0 = startGrid(0);
+        IgniteEx ignite1 = startGrid(1);
+
+        IgniteCache<Integer, Integer> cache0 = 
ignite0.getOrCreateCache(DEFAULT_CACHE_NAME);
+        IgniteCache<Integer, Integer> cache1 = 
ignite1.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        awaitPartitionMapExchange();
+
+        ignite1.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                throw new OutOfMemoryError();
+            }
+        }, EventType.EVT_CACHE_OBJECT_PUT);
+
+        Integer key = primaryKey(cache1);
+
+        try {
+            cache0.put(key, key);
+        }
+        catch (Throwable ignore) {
+            // Expected.
+        }
+
+        assertFailureState(ignite0, ignite1);
+    }
+
+    /**
+     * @param igniteWork Working ignite instance.
+     * @param igniteFail Failed ignite instance.
+     */
+    private static void assertFailureState(Ignite igniteWork, Ignite 
igniteFail) throws IgniteInterruptedCheckedException {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return dummyFailureHandler(igniteFail).failure();
+            }
+        }, 5000L));
+
+        assertFalse(dummyFailureHandler(igniteWork).failure());
+    }
+
+    /**
+     *
+     */
+    private interface FailService extends Service {
+        /**
+         * Fail.
+         */
+        void fail();
+    }
+
+    /**
+     *
+     */
+    private static class FailServiceImpl implements FailService {
+        /** Fail on execute. */
+        private final boolean failOnExec;
+
+        /**
+         * @param failOnExec Fail on execute.
+         */
+        private FailServiceImpl(boolean failOnExec) {
+            this.failOnExec = failOnExec;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void fail() {
+            throw new OutOfMemoryError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel(ServiceContext ctx) {
+        }
+
+        /** {@inheritDoc} */
+        @Override public void init(ServiceContext ctx) throws Exception {
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute(ServiceContext ctx) throws Exception {
+            if (failOnExec)
+                throw new OutOfMemoryError();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index c4b7d92..c388f1d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -22,6 +22,7 @@ import junit.framework.TestSuite;
 import org.apache.ignite.GridSuppressedExceptionSelfTest;
 import org.apache.ignite.failure.FailureHandlerTriggeredTest;
 import org.apache.ignite.failure.IoomFailureHandlerTest;
+import org.apache.ignite.failure.OomFailureHandlerTest;
 import org.apache.ignite.failure.StopNodeFailureHandlerTest;
 import org.apache.ignite.failure.StopNodeOrHaltFailureHandlerTest;
 import org.apache.ignite.internal.ClassSetTest;
@@ -199,6 +200,7 @@ public class IgniteBasicTestSuite extends TestSuite {
         suite.addTestSuite(StopNodeFailureHandlerTest.class);
         suite.addTestSuite(StopNodeOrHaltFailureHandlerTest.class);
         suite.addTestSuite(IoomFailureHandlerTest.class);
+        suite.addTestSuite(OomFailureHandlerTest.class);
 
         return suite;
     }

Reply via email to