Author: cdouglas Date: Fri Mar 14 17:41:04 2014 New Revision: 1577620 URL: http://svn.apache.org/r1577620 Log: YARN-1771. Reduce the number of NameNode operations during localization of public resources using a cache. Contributed by Sangjin Lee
Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java Modified: hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java?rev=1577620&r1=1577619&r2=1577620&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java Fri Mar 14 17:41:04 2014 @@ -18,19 +18,26 @@ package org.apache.hadoop.mapred; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileNotFoundException; -import java.io.FileWriter; import java.io.IOException; import java.net.URI; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -38,7 +45,6 @@ import org.apache.hadoop.mapreduce.filec import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -89,8 +95,26 @@ public class TestLocalDistributedCacheMa public void cleanup() throws Exception { delete(localDir); } - - @SuppressWarnings("rawtypes") + + /** + * Mock input stream based on a byte array so that it can be used by a + * FSDataInputStream. + */ + private static class MockInputStream extends ByteArrayInputStream + implements Seekable, PositionedReadable { + public MockInputStream(byte[] buf) { + super(buf); + } + + // empty implementation for unused methods + public int read(long position, byte[] buffer, int offset, int length) { return -1; } + public void readFully(long position, byte[] buffer, int offset, int length) {} + public void readFully(long position, byte[] buffer) {} + public void seek(long position) {} + public long getPos() { return 0; } + public boolean seekToNewSource(long targetPos) { return false; } + } + @Test public void testDownload() throws Exception { JobConf conf = new JobConf(); @@ -123,28 +147,22 @@ public class TestLocalDistributedCacheMa } } }); - - doAnswer(new Answer() { + + when(mockfs.getConf()).thenReturn(conf); + final FSDataInputStream in = + new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes())); + when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() { @Override - public Object answer(InvocationOnMock args) throws Throwable { - //Ignored boolean overwrite = (Boolean) args.getArguments()[0]; - Path src = (Path)args.getArguments()[1]; - Path dst = (Path)args.getArguments()[2]; - if("file.txt".equals(src.getName())) { - File f = new File(dst.toUri().getPath()); - FileWriter writer = new FileWriter(f); - try { - writer.append("This is a test file\n"); - } finally { - if(writer != null) writer.close(); - } + public FSDataInputStream answer(InvocationOnMock args) throws Throwable { + Path src = (Path)args.getArguments()[0]; + if ("file.txt".equals(src.getName())) { + return in; } else { throw new FileNotFoundException(src+" not supported by mocking"); } - return null; } - }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class)); - + }); + DistributedCache.addCacheFile(file, conf); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "201"); @@ -159,8 +177,7 @@ public class TestLocalDistributedCacheMa } assertFalse(link.exists()); } - - @SuppressWarnings("rawtypes") + @Test public void testEmptyDownload() throws Exception { JobConf conf = new JobConf(); @@ -184,16 +201,16 @@ public class TestLocalDistributedCacheMa throw new FileNotFoundException(p+" not supported by mocking"); } }); - - doAnswer(new Answer() { + + when(mockfs.getConf()).thenReturn(conf); + when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() { @Override - public Object answer(InvocationOnMock args) throws Throwable { - //Ignored boolean overwrite = (Boolean) args.getArguments()[0]; - Path src = (Path)args.getArguments()[1]; + public FSDataInputStream answer(InvocationOnMock args) throws Throwable { + Path src = (Path)args.getArguments()[0]; throw new FileNotFoundException(src+" not supported by mocking"); } - }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class)); - + }); + conf.set(MRJobConfig.CACHE_FILES, ""); conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); @@ -203,9 +220,8 @@ public class TestLocalDistributedCacheMa manager.close(); } } - - - @SuppressWarnings("rawtypes") + + @Test public void testDuplicateDownload() throws Exception { JobConf conf = new JobConf(); @@ -238,28 +254,22 @@ public class TestLocalDistributedCacheMa } } }); - - doAnswer(new Answer() { + + when(mockfs.getConf()).thenReturn(conf); + final FSDataInputStream in = + new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes())); + when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() { @Override - public Object answer(InvocationOnMock args) throws Throwable { - //Ignored boolean overwrite = (Boolean) args.getArguments()[0]; - Path src = (Path)args.getArguments()[1]; - Path dst = (Path)args.getArguments()[2]; - if("file.txt".equals(src.getName())) { - File f = new File(dst.toUri().getPath()); - FileWriter writer = new FileWriter(f); - try { - writer.append("This is a test file\n"); - } finally { - if(writer != null) writer.close(); - } + public FSDataInputStream answer(InvocationOnMock args) throws Throwable { + Path src = (Path)args.getArguments()[0]; + if ("file.txt".equals(src.getName())) { + return in; } else { throw new FileNotFoundException(src+" not supported by mocking"); } - return null; } - }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class)); - + }); + DistributedCache.addCacheFile(file, conf); DistributedCache.addCacheFile(file, conf); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101");