Author: vinodkv
Date: Tue Mar 11 23:33:56 2014
New Revision: 1576545
URL: http://svn.apache.org/r1576545
Log:
YARN-1800. Fixed NodeManager to gracefully handle RejectedExecutionException in
the public-localizer thread-pool. Contributed by Varun Vasudev.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1576545&r1=1576544&r2=1576545&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Mar 11 23:33:56 2014
@@ -446,6 +446,9 @@ Release 2.4.0 - UNRELEASED
YARN-1821. NPE on registerNodeManager if the request has containers for
UnmanagedAMs. (kasha)
+ YARN-1800. Fixed NodeManager to gracefully handle
RejectedExecutionException
+ in the public-localizer thread-pool. (Varun Vasudev via vinodkv)
+
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1576545&r1=1576544&r2=1576545&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
Tue Mar 11 23:33:56 2014
@@ -44,6 +44,7 @@ import java.util.concurrent.ExecutorComp
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -683,9 +684,16 @@ public class ResourceLocalizationService
}
} catch (IOException e) {
rsrc.unlock();
- // TODO Need to Fix IO Exceptions - Notifying resource
+ publicRsrc.handle(new ResourceFailedLocalizationEvent(request
+ .getResource().getRequest(), e.getMessage()));
LOG.error("Local path for public localization is not found. "
+ " May be disks failed.", e);
+ } catch (RejectedExecutionException re) {
+ rsrc.unlock();
+ publicRsrc.handle(new ResourceFailedLocalizationEvent(request
+ .getResource().getRequest(), re.getMessage()));
+ LOG.error("Failed to submit rsrc " + rsrc + " for download."
+ + " Either queue is full or threadpool is shutdown.", re);
}
} else {
rsrc.unlock();
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1576545&r1=1576544&r2=1576545&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
Tue Mar 11 23:33:56 2014
@@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -109,6 +110,7 @@ import org.apache.hadoop.yarn.server.nod
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerRunner;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
+import
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
@@ -126,6 +128,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -681,6 +684,121 @@ public class TestResourceLocalizationSer
dispatcher.stop();
}
}
+
+ /*
+ * Test case for handling RejectedExecutionException and IOException which
can
+ * be thrown when adding public resources to the pending queue.
+ * RejectedExecutionException can be thrown either due to the incoming queue
+ * being full or if the ExecutorCompletionService threadpool is shutdown.
+ * Since it's hard to simulate the queue being full, this test just shuts
down
+ * the threadpool and makes sure the exception is handled. If anything is
+ * messed up the async dispatcher thread will cause a system exit causing the
+ * test to fail.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testPublicResourceAddResourceExceptions() throws Exception {
+ List<Path> localDirs = new ArrayList<Path>();
+ String[] sDirs = new String[4];
+ for (int i = 0; i < 4; ++i) {
+ localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+ sDirs[i] = localDirs.get(i).toString();
+ }
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+ conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, applicationBus);
+ EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+ dispatcher.register(ContainerEventType.class, containerBus);
+
+ ContainerExecutor exec = mock(ContainerExecutor.class);
+ DeletionService delService = mock(DeletionService.class);
+ LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+ LocalDirsHandlerService dirsHandlerSpy = spy(dirsHandler);
+ dirsHandlerSpy.init(conf);
+
+ dispatcher.init(conf);
+ dispatcher.start();
+
+ try {
+ ResourceLocalizationService rawService =
+ new ResourceLocalizationService(dispatcher, exec, delService,
+ dirsHandlerSpy);
+ ResourceLocalizationService spyService = spy(rawService);
+ doReturn(mockServer).when(spyService).createServer();
+ doReturn(lfs).when(spyService).getLocalFileContext(
+ isA(Configuration.class));
+
+ spyService.init(conf);
+ spyService.start();
+
+ final String user = "user0";
+ // init application
+ final Application app = mock(Application.class);
+ final ApplicationId appId =
+ BuilderUtils.newApplicationId(314159265358979L, 3);
+ when(app.getUser()).thenReturn(user);
+ when(app.getAppId()).thenReturn(appId);
+ spyService.handle(new ApplicationLocalizationEvent(
+ LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+ dispatcher.await();
+
+ // init resources
+ Random r = new Random();
+ r.setSeed(r.nextLong());
+
+ // Queue localization request for the public resource
+ final LocalResource pubResource = getPublicMockedResource(r);
+ final LocalResourceRequest pubReq = new
LocalResourceRequest(pubResource);
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+ new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
+ req
+ .put(LocalResourceVisibility.PUBLIC,
Collections.singletonList(pubReq));
+
+ // init container.
+ final Container c = getMockContainer(appId, 42);
+
+ // first test ioexception
+ Mockito
+ .doThrow(new IOException())
+ .when(dirsHandlerSpy)
+ .getLocalPathForWrite(isA(String.class), Mockito.anyLong(),
+ Mockito.anyBoolean());
+ // send request
+ spyService.handle(new ContainerLocalizationRequestEvent(c, req));
+ dispatcher.await();
+ LocalResourcesTracker tracker =
+ spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
+ user, appId);
+ Assert.assertNull(tracker.getLocalizedResource(pubReq));
+
+ // test RejectedExecutionException
+ Mockito
+ .doCallRealMethod()
+ .when(dirsHandlerSpy)
+ .getLocalPathForWrite(isA(String.class), Mockito.anyLong(),
+ Mockito.anyBoolean());
+
+ // shutdown the thread pool
+ PublicLocalizer publicLocalizer = spyService.getPublicLocalizer();
+ publicLocalizer.threadPool.shutdown();
+
+ spyService.handle(new ContainerLocalizationRequestEvent(c, req));
+ dispatcher.await();
+ tracker =
+ spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
+ user, appId);
+ Assert.assertNull(tracker.getLocalizedResource(pubReq));
+
+ } finally {
+ // if we call stop with events in the queue, an InterruptedException gets
+ // thrown resulting in the dispatcher thread causing a system exit
+ dispatcher.await();
+ dispatcher.stop();
+ }
+ }
@Test(timeout = 100000)
@SuppressWarnings("unchecked")
@@ -829,6 +947,8 @@ public class TestResourceLocalizationSer
}
}
}
+
+
@Test(timeout = 10000)
@SuppressWarnings("unchecked")