Author: cdouglas Date: Mon Jun 6 13:44:06 2011 New Revision: 1132638 URL: http://svn.apache.org/viewvc?rev=1132638&view=rev Log: Add deletion of distributed cache resources.
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1132638&r1=1132637&r2=1132638&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original) +++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Mon Jun 6 13:44:06 2011 @@ -4,6 +4,9 @@ Trunk (unreleased changes) MAPREDUCE-279 + + Add deletion of distributed cache resources. (cdouglas) + Fix class cast exception in Task abort for old mapreduce apis. (sharad) Fix various issues with Web UI's. (Luke Lu) Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java?rev=1132638&r1=1132637&r2=1132638&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java Mon Jun 6 13:44:06 2011 @@ -92,6 +92,7 @@ public class ConverterUtils { } // TODO: Why thread local? + // ^ NumberFormat instances are not threadsafe private static final ThreadLocal<NumberFormat> appIdFormat = new ThreadLocal<NumberFormat>() { @Override @@ -104,6 +105,7 @@ public class ConverterUtils { }; // TODO: Why thread local? + // ^ NumberFormat instances are not threadsafe private static final ThreadLocal<NumberFormat> containerIdFormat = new ThreadLocal<NumberFormat>() { @Override @@ -159,4 +161,4 @@ public class ConverterUtils { containerId.setId(Integer.parseInt(it.next())); return containerId; } -} \ No newline at end of file +} Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java?rev=1132638&r1=1132637&r2=1132638&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java Mon Jun 6 13:44:06 2011 @@ -89,4 +89,15 @@ public class NMConfig { NM_PREFIX + "container.manager.threads"; public static final int DEFAULT_NM_CONTAINER_MGR_THREADS = 5; + + public static final String NM_TARGET_CACHE_MB = + NM_PREFIX + "target.cache.size"; + + public static final long DEFAULT_NM_TARGET_CACHE_MB = 10 * 1024; + + public static final String NM_CACHE_CLEANUP_MS = + NM_PREFIX + "target.cache.cleanup.period.ms"; + + public static final long DEFAULT_NM_CACHE_CLEANUP_MS = 10 * 60 * 1000; + } Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1132638&r1=1132637&r2=1132638&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Mon Jun 6 13:44:06 2011 @@ -140,6 +140,9 @@ public class ApplicationImpl implements // Transitions from APPLICATION_RESOURCES_CLEANINGUP state .addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, + ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, + ApplicationEventType.APPLICATION_CONTAINER_FINISHED) + .addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, ApplicationState.FINISHED, ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP, new AppCompletelyDoneTransition()) Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1132638&r1=1132637&r2=1132638&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Mon Jun 6 13:44:06 2011 @@ -194,6 +194,10 @@ public class ContainerImpl implements Co ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new ContainerKilledTransition()) + .addTransition(ContainerState.KILLING, + ContainerState.KILLING, + ContainerEventType.RESOURCE_LOCALIZED, + new LocalizedResourceDuringKillTransition()) .addTransition(ContainerState.KILLING, ContainerState.KILLING, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) @@ -205,6 +209,10 @@ public class ContainerImpl implements Co .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, new ExitedWithFailureTransition()) + .addTransition(ContainerState.KILLING, + ContainerState.DONE, + ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, + CONTAINER_DONE_TRANSITION) // From CONTAINER_CLEANEDUP_AFTER_KILL State. .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, @@ -568,6 +576,24 @@ public class ContainerImpl implements Co } @SuppressWarnings("unchecked") // dispatcher not typed + static class LocalizedResourceDuringKillTransition implements + SingleArcTransition<ContainerImpl, ContainerEvent> { + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event; + String sym = container.pendingResources.remove(rsrcEvent.getResource()); + if (null == sym) { + LOG.warn("Localized unknown resource " + rsrcEvent.getResource() + + " for container " + container.getContainerID()); + assert false; + // fail container? + return; + } + container.localizedResources.put(rsrcEvent.getLocation(), sym); + } + } + + @SuppressWarnings("unchecked") // dispatcher not typed static class KillTransition implements SingleArcTransition<ContainerImpl, ContainerEvent> { @Override Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java?rev=1132638&r1=1132637&r2=1132638&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java Mon Jun 6 13:44:06 2011 @@ -41,9 +41,15 @@ public class LocalResourceRequest */ public LocalResourceRequest(LocalResource resource) throws URISyntaxException { - this.loc = ConverterUtils.getPathFromYarnURL(resource.getResource()); - this.timestamp = resource.getTimestamp(); - this.type = resource.getType(); + this(ConverterUtils.getPathFromYarnURL(resource.getResource()), + resource.getTimestamp(), + resource.getType()); + } + + LocalResourceRequest(Path loc, long timestamp, LocalResourceType type) { + this.loc = loc; + this.timestamp = timestamp; + this.type = type; } @Override Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1132638&r1=1132637&r2=1132638&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Mon Jun 6 13:44:06 2011 @@ -20,15 +20,21 @@ package org.apache.hadoop.yarn.server.no import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; /** * Component tracking resources all of the same {@link LocalResourceVisibility} * */ -interface LocalResourcesTracker extends EventHandler<ResourceEvent> { +interface LocalResourcesTracker + extends EventHandler<ResourceEvent>, Iterable<LocalizedResource> { // TODO: Not used at all!! boolean contains(LocalResourceRequest resource); + boolean remove(LocalizedResource req, DeletionService delService); + + String getUser(); + } Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1132638&r1=1132637&r2=1132638&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Mon Jun 6 13:44:06 2011 @@ -17,12 +17,15 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; /** @@ -34,12 +37,20 @@ class LocalResourcesTrackerImpl implemen static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class); + private final String user; private final Dispatcher dispatcher; - private final ConcurrentHashMap<LocalResourceRequest,LocalizedResource> - localrsrc = new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>(); + private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc; - public LocalResourcesTrackerImpl(Dispatcher dispatcher) { + public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher) { + this(user, dispatcher, + new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>()); + } + + LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, + ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc) { + this.user = user; this.dispatcher = dispatcher; + this.localrsrc = localrsrc; } @Override @@ -66,7 +77,40 @@ class LocalResourcesTrackerImpl implemen @Override public boolean contains(LocalResourceRequest resource) { - return localrsrc.contains(resource); + return localrsrc.containsKey(resource); + } + + @Override + public boolean remove(LocalizedResource rem, DeletionService delService) { + // current synchronization guaranteed by crude RLS event for cleanup + LocalizedResource rsrc = localrsrc.remove(rem.getRequest()); + if (null == rsrc) { + LOG.error("Attempt to remove absent resource: " + rem.getRequest() + + " from " + getUser()); + return true; + } + if (rsrc.getRefCount() > 0 + || ResourceState.DOWNLOADING.equals(rsrc.getState()) + || rsrc != rem) { + // internal error + LOG.error("Attempt to remove resource with non-zero refcount"); + assert false; + return false; + } + if (ResourceState.LOCALIZED.equals(rsrc.getState())) { + delService.delete(getUser(), rsrc.getLocalPath()); + } + return true; + } + + @Override + public String getUser() { + return user; + } + + @Override + public Iterator<LocalizedResource> iterator() { + return localrsrc.values().iterator(); } } Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1132638&r1=1132637&r2=1132638&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Mon Jun 6 13:44:06 2011 @@ -94,7 +94,7 @@ public class LocalizedResource implement .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, ResourceEventType.REQUEST, new LocalizedResourceTransition()) .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, - ResourceEventType.LOCALIZED, new LocalizedResourceTransition()) + ResourceEventType.LOCALIZED) .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, ResourceEventType.RELEASE, new ReleaseTransition()) .installTopology(); @@ -115,12 +115,12 @@ public class LocalizedResource implement StringBuilder sb = new StringBuilder(); sb.append("{ ").append(rsrc.toString()).append(",") .append(getState() == ResourceState.LOCALIZED - ? localPath + ? getLocalPath() + "," + getSize() : "pending").append(",["); for (ContainerId c : ref) { sb.append("(").append(c.toString()).append(")"); } - sb.append("],").append(timestamp.get()).append("}"); + sb.append("],").append(getTimestamp()).append("}"); return sb.toString(); } @@ -150,6 +150,22 @@ public class LocalizedResource implement return rsrc; } + public Path getLocalPath() { + return localPath; + } + + public long getTimestamp() { + return timestamp.get(); + } + + public long getSize() { + return size; + } + + public int getRefCount() { + return ref.size(); + } + public boolean tryAcquire() { return sem.tryAcquire(); } Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1132638&r1=1132637&r2=1132638&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Mon Jun 6 13:44:06 2011 @@ -34,24 +34,32 @@ import java.util.concurrent.ExecutorComp import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_MAX_PUBLIC_FETCH_THREADS; +import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_CACHE_CLEANUP_MS; import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCALIZER_BIND_ADDRESS; import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCAL_DIR; import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOG_DIR; +import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_TARGET_CACHE_MB; +import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_CACHE_CLEANUP_MS; import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCALIZER_BIND_ADDRESS; import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR; import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOG_DIR; import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_MAX_PUBLIC_FETCH_THREADS; +import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_TARGET_CACHE_MB; import java.io.IOException; import java.net.InetSocketAddress; @@ -99,6 +107,7 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; @@ -118,6 +127,8 @@ public class ResourceLocalizationService private Server server; private InetSocketAddress localizationServerAddress; + private long cacheTargetSize; + private long cacheCleanupPeriod; private List<Path> logDirs; private List<Path> localDirs; private List<Path> sysDirs; @@ -127,6 +138,7 @@ public class ResourceLocalizationService private LocalizerTracker localizerTracker; private RecordFactory recordFactory; private final LocalDirAllocator localDirsSelector; + private final ScheduledExecutorService cacheCleanup; private final LocalResourcesTracker publicRsrc; private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc = @@ -141,7 +153,8 @@ public class ResourceLocalizationService this.dispatcher = dispatcher; this.delService = delService; this.localDirsSelector = new LocalDirAllocator(NMConfig.NM_LOCAL_DIR); - this.publicRsrc = new LocalResourcesTrackerImpl(dispatcher); + this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher); + this.cacheCleanup = new ScheduledThreadPoolExecutor(1); } FileContext getLocalFileContext(Configuration conf) { @@ -190,10 +203,16 @@ public class ResourceLocalizationService localDirs = Collections.unmodifiableList(localDirs); logDirs = Collections.unmodifiableList(logDirs); sysDirs = Collections.unmodifiableList(sysDirs); + cacheTargetSize = + conf.getLong(NM_TARGET_CACHE_MB, DEFAULT_NM_TARGET_CACHE_MB) << 20; + cacheCleanupPeriod = + conf.getLong(NM_CACHE_CLEANUP_MS, DEFAULT_NM_CACHE_CLEANUP_MS); localizationServerAddress = NetUtils.createSocketAddr( conf.get(NM_LOCALIZER_BIND_ADDRESS, DEFAULT_NM_LOCALIZER_BIND_ADDRESS)); localizerTracker = new LocalizerTracker(conf); dispatcher.register(LocalizerEventType.class, localizerTracker); + cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher), + cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS); super.init(conf); } @@ -250,12 +269,16 @@ public class ResourceLocalizationService Application app = ((ApplicationLocalizationEvent)event).getApplication(); // 0) Create application tracking structs - privateRsrc.putIfAbsent(app.getUser(), - new LocalResourcesTrackerImpl(dispatcher)); + userName = app.getUser(); + privateRsrc.putIfAbsent(userName, + new LocalResourcesTrackerImpl(userName, dispatcher)); if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()), - new LocalResourcesTrackerImpl(dispatcher))) { + new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) { LOG.warn("Initializing application " + app + " already present"); assert false; // TODO: FIXME assert doesn't help + // ^ The condition is benign. Tests should fail and it + // should appear in logs, but it's an internal error + // that should have no effect on applications } // 1) Signal container init dispatcher.getEventHandler().handle(new ApplicationInitedEvent( @@ -288,6 +311,16 @@ public class ResourceLocalizationService tracker.handle(new ResourceRequestEvent(req, vis, ctxt)); } break; + case CACHE_CLEANUP: + ResourceRetentionSet retain = + new ResourceRetentionSet(delService, cacheTargetSize); + retain.addResources(publicRsrc); + LOG.debug("Resource cleanup (public) " + retain); + for (LocalResourcesTracker t : privateRsrc.values()) { + retain.addResources(t); + LOG.debug("Resource cleanup " + t.getUser() + ":" + retain); + } + break; case CLEANUP_CONTAINER_RESOURCES: Container container = ((ContainerLocalizationEvent)event).getContainer(); @@ -536,8 +569,9 @@ public class ResourceLocalizationService final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled; final List<LocalizerResourceRequestEvent> pending; + // TODO: threadsafe, use outer? private final RecordFactory recordFactory = - RecordFactoryProvider.getRecordFactory(null); + RecordFactoryProvider.getRecordFactory(getConfig()); LocalizerRunner(LocalizerContext context, String localizerId) { this.context = context; @@ -702,17 +736,37 @@ public class ResourceLocalizationService context.getUser(), ConverterUtils.toString(context.getContainerId().getAppId()), localizerId, localDirs); + // TODO handle ExitCodeException separately? } catch (Exception e) { + LOG.info("Localizer failed", e); // 3) on error, report failure to Container and signal ABORT // 3.1) notify resource of failed localization + ContainerId cId = context.getContainerId(); + dispatcher.getEventHandler().handle( + new ContainerResourceFailedEvent(cId, null, e)); + } finally { for (LocalizerResourceRequestEvent event : scheduled.values()) { event.getResource().unlock(); } - //dispatcher.getEventHandler().handle( - // new ContainerResourceFailedEvent(current.getContainer(), - // current.getResource().getRequest(), e)); } } } + + static class CacheCleanup extends Thread { + + private final Dispatcher dispatcher; + + public CacheCleanup(Dispatcher dispatcher) { + this.dispatcher = dispatcher; + } + + @Override + public void run() { + dispatcher.getEventHandler().handle( + new LocalizationEvent(LocalizationEventType.CACHE_CLEANUP)); + } + + } + } Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java?rev=1132638&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java Mon Jun 6 13:44:06 2011 @@ -0,0 +1,78 @@ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; + +public class ResourceRetentionSet { + + private long delSize; + private long currentSize; + private final long targetSize; + private final DeletionService delService; + private final SortedMap<LocalizedResource,LocalResourcesTracker> retain; + + ResourceRetentionSet(DeletionService delService, long targetSize) { + this(delService, targetSize, new LRUComparator()); + } + + ResourceRetentionSet(DeletionService delService, long targetSize, + Comparator<? super LocalizedResource> cmp) { + this(delService, targetSize, + new TreeMap<LocalizedResource,LocalResourcesTracker>(cmp)); + } + + ResourceRetentionSet(DeletionService delService, long targetSize, + SortedMap<LocalizedResource,LocalResourcesTracker> retain) { + this.retain = retain; + this.delService = delService; + this.targetSize = targetSize; + } + + public void addResources(LocalResourcesTracker newTracker) { + for (LocalizedResource resource : newTracker) { + currentSize += resource.getSize(); + if (resource.getRefCount() > 0) { + // always retain resources in use + continue; + } + retain.put(resource, newTracker); + } + for (Iterator<Map.Entry<LocalizedResource,LocalResourcesTracker>> i = + retain.entrySet().iterator(); + currentSize - delSize > targetSize && i.hasNext();) { + Map.Entry<LocalizedResource,LocalResourcesTracker> rsrc = i.next(); + LocalizedResource resource = rsrc.getKey(); + LocalResourcesTracker tracker = rsrc.getValue(); + if (tracker.remove(resource, delService)) { + delSize += resource.getSize(); + i.remove(); + } + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Cache: ").append(currentSize).append(", "); + sb.append("Deleted: ").append(delSize); + return sb.toString(); + } + + static class LRUComparator implements Comparator<LocalizedResource> { + public int compare(LocalizedResource r1, LocalizedResource r2) { + long ret = r1.getTimestamp() - r2.getTimestamp(); + if (0 == ret) { + return System.identityHashCode(r1) - System.identityHashCode(r2); + } + return ret > 0 ? 1 : -1; + } + public boolean equals(Object other) { + return this == other; + } + } +} Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java?rev=1132638&r1=1132637&r2=1132638&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java Mon Jun 6 13:44:06 2011 @@ -19,9 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event; import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.hadoop.yarn.event.Event; -public abstract class LocalizationEvent extends AbstractEvent<LocalizationEventType> { +public class LocalizationEvent extends AbstractEvent<LocalizationEventType> { public LocalizationEvent(LocalizationEventType event) { super(event); Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java?rev=1132638&r1=1132637&r2=1132638&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java Mon Jun 6 13:44:06 2011 @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no public enum LocalizationEventType { INIT_APPLICATION_RESOURCES, INIT_CONTAINER_RESOURCES, + CACHE_CLEANUP, CLEANUP_CONTAINER_RESOURCES, DESTROY_APPLICATION_RESOURCES, } Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java?rev=1132638&r1=1132637&r2=1132638&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java Mon Jun 6 13:44:06 2011 @@ -37,7 +37,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; @@ -281,7 +280,8 @@ public class TestContainerLocalizer { return rsrc; } - static DataInputBuffer createFakeCredentials(Random r, int nTok) + @SuppressWarnings({ "rawtypes", "unchecked" }) +static DataInputBuffer createFakeCredentials(Random r, int nTok) throws IOException { Credentials creds = new Credentials(); byte[] password = new byte[20]; Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1132638&r1=1132637&r2=1132638&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Mon Jun 6 13:44:06 2011 @@ -1,19 +1,57 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import java.net.InetSocketAddress; + import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; import java.util.List; +import java.util.Random; +import org.apache.avro.ipc.Server; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; +import static org.junit.Assert.*; +import org.mockito.ArgumentMatcher; import static org.mockito.Mockito.*; import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR; @@ -75,4 +113,178 @@ public class TestResourceLocalizationSer } } + @Test + @SuppressWarnings("unchecked") // mocked generics + public void testLocalizationHeartbeat() throws Exception { + Configuration conf = new Configuration(); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + final FileContext lfs = FileContext.getFileContext(spylfs, conf); + doNothing().when(spylfs).mkdir( + isA(Path.class), isA(FsPermission.class), anyBoolean()); + + List<Path> localDirs = new ArrayList<Path>(); + String[] sDirs = new String[4]; + for (int i = 0; i < 4; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(NM_LOCAL_DIR, sDirs); + + Server ignore = mock(Server.class); + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, applicationBus); + EventHandler<ContainerEvent> containerBus = mock(EventHandler.class); + dispatcher.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = new DeletionService(exec); + delService.init(null); + delService.start(); + + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher, exec, delService); + ResourceLocalizationService spyService = spy(rawService); + doReturn(ignore).when(spyService).createServer(); + doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class)); + try { + spyService.init(conf); + spyService.start(); + + // init application + final Application app = mock(Application.class); + final ApplicationId appId = mock(ApplicationId.class); + when(appId.getClusterTimestamp()).thenReturn(314159265358979L); + when(appId.getId()).thenReturn(3); + when(app.getUser()).thenReturn("user0"); + when(app.getAppId()).thenReturn(appId); + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + ArgumentMatcher<ApplicationEvent> matchesAppInit = + new ArgumentMatcher<ApplicationEvent>() { + @Override + public boolean matches(Object o) { + ApplicationEvent evt = (ApplicationEvent) o; + return evt.getType() == ApplicationEventType.APPLICATION_INITED + && appId == evt.getApplicationID(); + } + }; + dispatcher.await(); + verify(applicationBus).handle(argThat(matchesAppInit)); + + // init container rsrc, localizer + Random r = new Random(); + long seed = r.nextLong(); + System.out.println("SEED: " + seed); + r.setSeed(seed); + final Container c = getMockContainer(appId, 42); + FSDataOutputStream out = + new FSDataOutputStream(new DataOutputBuffer(), null); + doReturn(out).when(spylfs).createInternal(isA(Path.class), + isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(), + anyLong(), isA(Progressable.class), anyInt(), anyBoolean()); + final LocalResource resource = getMockResource(r); + final LocalResourceRequest req = new LocalResourceRequest(resource); + spyService.handle(new ContainerLocalizationRequestEvent( + c, Collections.singletonList(req), + LocalResourceVisibility.PRIVATE)); + // Sigh. Thread init of private localizer not accessible + Thread.sleep(500); + dispatcher.await(); + String appStr = ConverterUtils.toString(appId); + String ctnrStr = ConverterUtils.toString(c.getContainerID()); + verify(exec).startLocalizer(isA(Path.class), isA(InetSocketAddress.class), + eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class)); + + // heartbeat from localizer + LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class); + LocalizerStatus stat = mock(LocalizerStatus.class); + when(stat.getLocalizerId()).thenReturn(ctnrStr); + when(rsrcStat.getResource()).thenReturn(resource); + when(rsrcStat.getLocalSize()).thenReturn(4344L); + URL locPath = getPath("/cache/private/blah"); + when(rsrcStat.getLocalPath()).thenReturn(locPath); + when(rsrcStat.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS); + when(stat.getResources()) + .thenReturn(Collections.<LocalResourceStatus>emptyList()) + .thenReturn(Collections.singletonList(rsrcStat)) + .thenReturn(Collections.<LocalResourceStatus>emptyList()); + + // get rsrc + LocalizerHeartbeatResponse response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + assertEquals(req, new LocalResourceRequest(response.getLocalResource(0))); + + // empty rsrc + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + assertEquals(0, response.getAllResources().size()); + + // get shutdown + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.DIE, response.getLocalizerAction()); + + // verify container notification + ArgumentMatcher<ContainerEvent> matchesContainerLoc = + new ArgumentMatcher<ContainerEvent>() { + @Override + public boolean matches(Object o) { + ContainerEvent evt = (ContainerEvent) o; + return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED + && c.getContainerID() == evt.getContainerID(); + } + }; + dispatcher.await(); + verify(containerBus).handle(argThat(matchesContainerLoc)); + } finally { + delService.stop(); + dispatcher.stop(); + spyService.stop(); + } + } + + static URL getPath(String path) { + URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class); + when(uri.getScheme()).thenReturn("file"); + when(uri.getHost()).thenReturn(null); + when(uri.getFile()).thenReturn(path); + return uri; + } + + static LocalResource getMockResource(Random r) { + LocalResource rsrc = mock(LocalResource.class); + + String name = Long.toHexString(r.nextLong()); + URL uri = getPath("/local/PRIVATE/" + name); + + when(rsrc.getResource()).thenReturn(uri); + when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L); + when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L); + when(rsrc.getType()).thenReturn(LocalResourceType.FILE); + when(rsrc.getVisibility()).thenReturn(LocalResourceVisibility.PRIVATE); + return rsrc; + } + + static Container getMockContainer(ApplicationId appId, int id) { + Container c = mock(Container.class); + ContainerId cId = mock(ContainerId.class); + when(cId.getAppId()).thenReturn(appId); + when(cId.getId()).thenReturn(id); + when(c.getUser()).thenReturn("user0"); + when(c.getContainerID()).thenReturn(cId); + Credentials creds = new Credentials(); + creds.addToken(new Text("tok" + id), getToken(id)); + when(c.getCredentials()).thenReturn(creds); + return c; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + static Token<? extends TokenIdentifier> getToken(int id) { + return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(), + new Text("kind" + id), new Text("service" + id)); + } + } Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java?rev=1132638&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java Mon Jun 6 13:44:06 2011 @@ -0,0 +1,83 @@ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; + +import org.junit.Test; +import static org.junit.Assert.*; + +import org.mockito.ArgumentCaptor; +import static org.mockito.Mockito.*; + +public class TestResourceRetention { + + @Test + public void testRsrcUnused() { + DeletionService delService = mock(DeletionService.class); + long TARGET_MB = 10 << 20; + ResourceRetentionSet rss = new ResourceRetentionSet(delService, TARGET_MB); + // 3MB files @{10, 15} + LocalResourcesTracker pubTracker = + createMockTracker(null, 3 * 1024 * 1024, 2, 10, 5); + // 1MB files @{3, 6, 9, 12} + LocalResourcesTracker trackerA = + createMockTracker("A", 1 * 1024 * 1024, 4, 3, 3); + // 4MB file @{1} + LocalResourcesTracker trackerB = + createMockTracker("B", 4 * 1024 * 1024, 1, 10, 5); + // 2MB files @{7, 9, 11} + LocalResourcesTracker trackerC = + createMockTracker("C", 2 * 1024 * 1024, 3, 7, 2); + // Total cache: 20MB; verify removed at least 10MB + rss.addResources(pubTracker); + rss.addResources(trackerA); + rss.addResources(trackerB); + rss.addResources(trackerC); + long deleted = 0L; + ArgumentCaptor<LocalizedResource> captor = + ArgumentCaptor.forClass(LocalizedResource.class); + verify(pubTracker, atMost(2)) + .remove(captor.capture(), isA(DeletionService.class)); + verify(trackerA, atMost(4)) + .remove(captor.capture(), isA(DeletionService.class)); + verify(trackerB, atMost(1)) + .remove(captor.capture(), isA(DeletionService.class)); + verify(trackerC, atMost(3)) + .remove(captor.capture(), isA(DeletionService.class)); + for (LocalizedResource rem : captor.getAllValues()) { + deleted += rem.getSize(); + } + assertTrue(deleted >= 10 * 1024 * 1024); + assertTrue(deleted < 15 * 1024 * 1024); + } + + LocalResourcesTracker createMockTracker(String user, final long rsrcSize, + long nRsrcs, long timestamp, long tsstep) { + ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources = + new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>(); + LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null, + trackerResources)); + for (int i = 0; i < nRsrcs; ++i) { + final LocalResourceRequest req = new LocalResourceRequest( + new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep, + LocalResourceType.FILE); + final long ts = timestamp + i * tsstep; + final Path p = new Path("file:///local/" + user + "/rsrc" + i); + LocalizedResource rsrc = new LocalizedResource(req, null) { + @Override public int getRefCount() { return 0; } + @Override public long getSize() { return rsrcSize; } + @Override public Path getLocalPath() { return p; } + @Override public long getTimestamp() { return ts; } + @Override + public ResourceState getState() { return ResourceState.LOCALIZED; } + }; + trackerResources.put(req, rsrc); + } + return ret; + } + +}