Author: shv Date: Thu Jun 28 23:47:04 2012 New Revision: 1355197 URL: http://svn.apache.org/viewvc?rev=1355197&view=rev Log: MAPREDUCE-4342. Distributed Cache gives inconsistent result if cache files get deleted from task tracker. Contributed by Mayank Bansal.
Modified: hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Modified: hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt?rev=1355197&r1=1355196&r2=1355197&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt Thu Jun 28 23:47:04 2012 @@ -78,6 +78,11 @@ Release 0.22.1 - Unreleased MAPREDUCE-4318. TestRecoveryManager should not use raw configuration keys. (Benoy Antony via shv) + END OF HADOOP-8357 SUBTASKS + + MAPREDUCE-4342. Distributed Cache gives inconsistent result if cache files + get deleted from task tracker. (Mayank Bansal via shv) + Release 0.22.0 - 2011-11-29 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=1355197&r1=1355196&r2=1355197&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Thu Jun 28 23:47:04 2012 @@ -163,7 +163,13 @@ public class TrackerDistributedCacheMana Path localPath = null; synchronized (cachedArchives) { lcacheStatus = cachedArchives.get(key); - if (lcacheStatus == null) { + if (lcacheStatus == null + || !checkPathExists(lcacheStatus.localizedLoadPath, conf)) { + if (lcacheStatus != null) { + LOG.warn("Key: " + key + " is not valid removing it from the cache"); + LOG.warn("Local Cache has been deleted... Downloading the cache again"); + cachedArchives.remove(key); + } // was never localized String uniqueString = String.valueOf(random.nextLong()); String cachePath = new Path (subDir, @@ -222,7 +228,25 @@ public class TrackerDistributedCacheMana } return localizedPath; } - + + /** + * This module checks whether file is present or not. + * + * @param cachePath + * @param conf + * @return + * @throws IOException + */ + boolean checkPathExists(Path cachePath, Configuration conf) + throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + boolean isPresent = true; + if (!localFs.exists(cachePath)) { + isPresent = false; + } + return isPresent; + } + /** * This is the opposite of getlocalcache. When you are done with * using the cache, you need to release the cache Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=1355197&r1=1355196&r2=1355197&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Thu Jun 28 23:47:04 2012 @@ -218,6 +218,80 @@ public class TestTrackerDistributedCache manager.purgeCache(); assertFalse(pathToFile(cachedFirstFile).exists()); } + + /** + * This is when somebody deletes the cache + * + * @throws IOException + * @throws LoginException + * @throws InterruptedException + */ + @SuppressWarnings("deprecation") + public void testCacheConsistency() throws IOException, LoginException, InterruptedException { + if (!canRun()) { + return; + } + // ****** Imitate JobClient code + // Configures a task/job with both a regular file and a "classpath" file. + Configuration subConf = new Configuration(conf); + String userName = getJobOwnerName(); + subConf.set(MRJobConfig.USER_NAME, userName); + JobID jobid = new JobID("jt", 1); + DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf); + TrackerDistributedCacheManager.determineTimestamps(subConf); + TrackerDistributedCacheManager.determineCacheVisibilities(subConf); + // ****** End of imitating JobClient code + + Path jobFile = new Path(TEST_ROOT_DIR, "job.xml"); + FileOutputStream os = new FileOutputStream(new File(jobFile.toString())); + subConf.writeXml(os); + os.close(); + + // ****** Imitate TaskRunner code. + TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager( + conf); + TaskDistributedCacheManager handle = manager + .newTaskDistributedCacheManager(jobid, subConf); + assertNull(null, DistributedCache.getLocalCacheFiles(subConf)); + handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(), + TaskTracker.getPrivateDistributedCacheDir(userName)); + JobLocalizer.downloadPrivateCache(subConf); + // ****** End of imitating TaskRunner code + + Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf); + assertNotNull(null, localCacheFiles); + assertEquals(1, localCacheFiles.length); + Path cachedFirstFile = localCacheFiles[0]; + assertFileLengthEquals(firstCacheFile, cachedFirstFile); + assertFalse("Paths should be different.", + firstCacheFile.equals(cachedFirstFile)); + + File f1 = new File(cachedFirstFile.toString()); + assertTrue(f1.delete()); + + // ****** Imitate JobClient code + // Configures a task/job with both a regular file and a "classpath" file. + subConf = new Configuration(conf); + userName = getJobOwnerName(); + subConf.set(MRJobConfig.USER_NAME, userName); + DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf); + TrackerDistributedCacheManager.determineTimestamps(subConf); + TrackerDistributedCacheManager.determineCacheVisibilities(subConf); + JobID jobidnew = new JobID("jt", 2); + handle = manager.newTaskDistributedCacheManager(jobidnew, subConf); + assertNull(null, DistributedCache.getLocalCacheFiles(subConf)); + handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(), + TaskTracker.getPrivateDistributedCacheDir(userName)); + JobLocalizer.downloadPrivateCache(subConf); + // ****** End of imitating TaskRunner code + Path[] localCacheFilesagain = DistributedCache.getLocalCacheFiles(subConf); + assertNotNull(null, localCacheFilesagain); + assertEquals(1, localCacheFilesagain.length); + Path cachedFirstFileAgain = localCacheFilesagain[0]; + assertFileLengthEquals(firstCacheFile, cachedFirstFileAgain); + assertFalse("Paths should be different.", + firstCacheFile.equals(cachedFirstFileAgain)); + } /** * This DistributedCacheManager fails in localizing firstCacheFile.