Author: aching Date: Tue Mar 27 20:49:39 2012 New Revision: 1306014 URL: http://svn.apache.org/viewvc?rev=1306014&view=rev Log: GIRAPH-144: GiraphJob should not extend Job (users should not be able to call Job methods like waitForCompletion or setMapper..etc) (aching).
Modified: incubator/giraph/trunk/CHANGELOG incubator/giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java Modified: incubator/giraph/trunk/CHANGELOG URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1306014&r1=1306013&r2=1306014&view=diff ============================================================================== --- incubator/giraph/trunk/CHANGELOG (original) +++ incubator/giraph/trunk/CHANGELOG Tue Mar 27 20:49:39 2012 @@ -2,6 +2,10 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-144: GiraphJob should not extend Job (users should not be + able to call Job methods like waitForCompletion or setMapper..etc) + (aching). + GIRAPH-159: Case insensitive file/directory name matching will produce errors on M/R jar unpack (bfem via aching). Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java?rev=1306014&r1=1306013&r2=1306014&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java Tue Mar 27 20:49:39 2012 @@ -121,7 +121,8 @@ public class GiraphRunner implements Too job.setVertexOutputFormatClass(Class.forName(cmd.getOptionValue("of"))); if (cmd.hasOption("ip")) { - FileInputFormat.addInputPath(job, new Path(cmd.getOptionValue("ip"))); + FileInputFormat.addInputPath(job.getInternalJob(), + new Path(cmd.getOptionValue("ip"))); } else { if (LOG.isInfoEnabled()) { LOG.info("No input path specified. Ensure your InputFormat does" + @@ -130,7 +131,8 @@ public class GiraphRunner implements Too } if (cmd.hasOption("op")) { - FileOutputFormat.setOutputPath(job, new Path(cmd.getOptionValue("op"))); + FileOutputFormat.setOutputPath(job.getInternalJob(), + new Path(cmd.getOptionValue("op"))); } else { if (LOG.isInfoEnabled()) { LOG.info("No output path specified. Ensure your OutputFormat does" + Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1306014&r1=1306013&r2=1306014&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Tue Mar 27 20:49:39 2012 @@ -226,8 +226,8 @@ public class SimpleCheckpointVertex exte int maxWorkers = Integer.parseInt(cmd.getOptionValue('w')); bspJob.setWorkerConfiguration(minWorkers, maxWorkers, 100.0f); - FileOutputFormat.setOutputPath(bspJob, - new Path(cmd.getOptionValue('o'))); + FileOutputFormat.setOutputPath(bspJob.getInternalJob(), + new Path(cmd.getOptionValue('o'))); boolean verbose = false; if (cmd.hasOption('v')) { verbose = true; Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java?rev=1306014&r1=1306013&r2=1306014&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java Tue Mar 27 20:49:39 2012 @@ -262,8 +262,10 @@ public class SimpleShortestPathsVertex e SimpleShortestPathsVertexInputFormat.class); job.setVertexOutputFormatClass( SimpleShortestPathsVertexOutputFormat.class); - FileInputFormat.addInputPath(job, new Path(argArray[0])); - FileOutputFormat.setOutputPath(job, new Path(argArray[1])); + FileInputFormat.addInputPath(job.getInternalJob(), + new Path(argArray[0])); + FileOutputFormat.setOutputPath(job.getInternalJob(), + new Path(argArray[1])); job.getConfiguration().setLong(SimpleShortestPathsVertex.SOURCE_ID, Long.parseLong(argArray[2])); job.setWorkerConfiguration(Integer.parseInt(argArray[3]), Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1306014&r1=1306013&r2=1306014&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Tue Mar 27 20:49:39 2012 @@ -29,10 +29,11 @@ import org.apache.log4j.Logger; import java.io.IOException; /** - * Limits the functions that can be called by the user. Job is too flexible - * for our needs. For instance, our job should not have any reduce tasks. + * Generates an appropriate internal {@link Job} for using Giraph in Hadoop. + * Uses composition to avoid unwanted {@link Job} methods from exposure + * to the user. */ -public class GiraphJob extends Job { +public class GiraphJob { static { Configuration.addDefaultResource("giraph-site.xml"); } @@ -345,6 +346,12 @@ public class GiraphJob extends Job { /** Class logger */ private static final Logger LOG = Logger.getLogger(GiraphJob.class); + /** Internal job that actually is submitted */ + private final Job job; + /** Helper configuration from the job */ + private final Configuration conf; + + /** * Constructor that will instantiate the configuration * @@ -352,7 +359,7 @@ public class GiraphJob extends Job { * @throws IOException */ public GiraphJob(String jobName) throws IOException { - super(new Configuration(), jobName); + this(new Configuration(), jobName); } /** @@ -363,10 +370,30 @@ public class GiraphJob extends Job { * @throws IOException */ public GiraphJob(Configuration conf, String jobName) throws IOException { - super(conf, jobName); + job = new Job(conf, jobName); + this.conf = job.getConfiguration(); } /** + * Get the configuration from the internal job. + * + * @return Configuration used by the job. + */ + public Configuration getConfiguration() { + return conf; + } + + /** + * Be very cautious when using this method as it returns the internal job + * of {@link GiraphJob}. This should only be used for methods that require + * access to the actual {@link Job}, i.e. FileInputFormat#addInputPath(). + * + * @return Internal job that will actually be submitted to Hadoop. + */ + public Job getInternalJob() { + return job; + } + /** * Make sure the configuration is set properly by the user prior to * submitting the job. */ @@ -580,7 +607,7 @@ public class GiraphJob extends Job { throws IOException, InterruptedException, ClassNotFoundException { checkConfiguration(); checkLocalJobRunnerConfiguration(conf); - setNumReduceTasks(0); + job.setNumReduceTasks(0); // Most users won't hit this hopefully and can set it higher if desired setIntConfIfDefault("mapreduce.job.counters.limit", 512); @@ -596,16 +623,16 @@ public class GiraphJob extends Job { // (DEFAULT_PING_INTERVAL) Client.setPingInterval(conf, 60000 * 5); - if (getJar() == null) { - setJarByClass(GiraphJob.class); + if (job.getJar() == null) { + job.setJarByClass(GiraphJob.class); } // Should work in MAPREDUCE-1938 to let the user jars/classes // get loaded first conf.setBoolean("mapreduce.user.classpath.first", true); - setMapperClass(GraphMapper.class); - setInputFormatClass(BspInputFormat.class); - setOutputFormatClass(BspOutputFormat.class); - return waitForCompletion(verbose); + job.setMapperClass(GraphMapper.class); + job.setInputFormatClass(BspInputFormat.class); + job.setOutputFormatClass(BspOutputFormat.class); + return job.waitForCompletion(verbose); } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1306014&r1=1306013&r2=1306014&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java Tue Mar 27 20:49:39 2012 @@ -129,8 +129,10 @@ public class InternalVertexRunner { conf.set(param.getKey(), param.getValue()); } - FileInputFormat.addInputPath(job, new Path(inputFile.toString())); - FileOutputFormat.setOutputPath(job, new Path(outputDir.toString())); + FileInputFormat.addInputPath(job.getInternalJob(), + new Path(inputFile.toString())); + FileOutputFormat.setOutputPath(job.getInternalJob(), + new Path(outputDir.toString())); // configure a local zookeeper instance Properties zkProperties = new Properties(); Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java?rev=1306014&r1=1306013&r2=1306014&view=diff ============================================================================== --- incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java (original) +++ incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java Tue Mar 27 20:49:39 2012 @@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -41,215 +40,213 @@ import junit.framework.TestCase; * Extended TestCase for making setting up Bsp testing. */ public class BspCase extends TestCase implements Watcher { - /** JobTracker system property */ - private final String jobTracker = - System.getProperty("prop.mapred.job.tracker"); - /** Jar location system property */ - private final String jarLocation = - System.getProperty("prop.jarLocation", ""); - /** Number of actual processes for the BSP application */ - private int numWorkers = 1; - /** ZooKeeper list system property */ - private final String zkList = System.getProperty("prop.zookeeper.list"); - - /** - * Adjust the configuration to the basic test case - */ - public final void setupConfiguration(GiraphJob job) { - Configuration conf = job.getConfiguration(); - conf.set("mapred.jar", getJarLocation()); - - // Allow this test to be run on a real Hadoop setup - if (getJobTracker() != null) { - System.out.println("setup: Sending job to job tracker " + - getJobTracker() + " with jar path " + getJarLocation() - + " for " + getName()); - conf.set("mapred.job.tracker", getJobTracker()); - job.setWorkerConfiguration(getNumWorkers(), - getNumWorkers(), - 100.0f); - } - else { - System.out.println("setup: Using local job runner with " + - "location " + getJarLocation() + " for " - + getName()); - job.setWorkerConfiguration(1, 1, 100.0f); - // Single node testing - conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false); - } - conf.setInt(GiraphJob.POLL_ATTEMPTS, 10); - conf.setInt(GiraphJob.POLL_MSECS, 3*1000); - conf.setInt(GiraphJob.ZOOKEEPER_SERVERLIST_POLL_MSECS, 500); - if (getZooKeeperList() != null) { - job.setZooKeeperConfiguration(getZooKeeperList()); - } - // GeneratedInputSplit will generate 5 vertices - conf.setLong(GeneratedVertexReader.READER_VERTICES, 5); - } - - /** - * Create the test case - * - * @param testName name of the test case - */ - public BspCase(String testName) { - super(testName); - - } - - /** - * Get the number of workers used in the BSP application - * - * @param numProcs number of processes to use - */ - public int getNumWorkers() { - return numWorkers; - } - - /** - * Get the ZooKeeper list - */ - public String getZooKeeperList() { - return zkList; - } - - /** - * Get the jar location - * - * @return location of the jar file - */ - String getJarLocation() { - return jarLocation; - } - - /** - * Get the job tracker location - * - * @return job tracker location as a string - */ - String getJobTracker() { - return jobTracker; - } - - /** - * Get the single part file status and make sure there is only one part - * - * @param fs Filesystem to look for the part file - * @param partDirPath Directory where the single part file should exist - * @return Single part file status - * @throws IOException - */ - public static FileStatus getSinglePartFileStatus(Job job, - Path partDirPath) - throws IOException { - FileSystem fs = FileSystem.get(job.getConfiguration()); - FileStatus[] statusArray = fs.listStatus(partDirPath); - FileStatus singlePartFileStatus = null; - int partFiles = 0; - for (FileStatus fileStatus : statusArray) { - if (fileStatus.getPath().getName().equals("part-m-00000")) { - singlePartFileStatus = fileStatus; + /** JobTracker system property */ + private final String jobTracker = + System.getProperty("prop.mapred.job.tracker"); + /** Jar location system property */ + private final String jarLocation = + System.getProperty("prop.jarLocation", ""); + /** Number of actual processes for the BSP application */ + private int numWorkers = 1; + /** ZooKeeper list system property */ + private final String zkList = System.getProperty("prop.zookeeper.list"); + + /** + * Adjust the configuration to the basic test case + */ + public final void setupConfiguration(GiraphJob job) { + Configuration conf = job.getConfiguration(); + conf.set("mapred.jar", getJarLocation()); + + // Allow this test to be run on a real Hadoop setup + if (getJobTracker() != null) { + System.out.println("setup: Sending job to job tracker " + + getJobTracker() + " with jar path " + getJarLocation() + + " for " + getName()); + conf.set("mapred.job.tracker", getJobTracker()); + job.setWorkerConfiguration(getNumWorkers(), + getNumWorkers(), + 100.0f); + } + else { + System.out.println("setup: Using local job runner with " + + "location " + getJarLocation() + " for " + + getName()); + job.setWorkerConfiguration(1, 1, 100.0f); + // Single node testing + conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false); + } + conf.setInt(GiraphJob.POLL_ATTEMPTS, 10); + conf.setInt(GiraphJob.POLL_MSECS, 3*1000); + conf.setInt(GiraphJob.ZOOKEEPER_SERVERLIST_POLL_MSECS, 500); + if (getZooKeeperList() != null) { + job.setZooKeeperConfiguration(getZooKeeperList()); + } + // GeneratedInputSplit will generate 5 vertices + conf.setLong(GeneratedVertexReader.READER_VERTICES, 5); + } + + /** + * Create the test case + * + * @param testName name of the test case + */ + public BspCase(String testName) { + super(testName); + + } + + /** + * Get the number of workers used in the BSP application + * + * @param numProcs number of processes to use + */ + public int getNumWorkers() { + return numWorkers; + } + + /** + * Get the ZooKeeper list + */ + public String getZooKeeperList() { + return zkList; + } + + /** + * Get the jar location + * + * @return location of the jar file + */ + String getJarLocation() { + return jarLocation; + } + + /** + * Get the job tracker location + * + * @return job tracker location as a string + */ + String getJobTracker() { + return jobTracker; + } + + /** + * Get the single part file status and make sure there is only one part + * + * @param job Job to get the file system from + * @param partDirPath Directory where the single part file should exist + * @return Single part file status + * @throws IOException + */ + public static FileStatus getSinglePartFileStatus(GiraphJob job, + Path partDirPath) throws IOException { + FileSystem fs = FileSystem.get(job.getConfiguration()); + FileStatus[] statusArray = fs.listStatus(partDirPath); + FileStatus singlePartFileStatus = null; + int partFiles = 0; + for (FileStatus fileStatus : statusArray) { + if (fileStatus.getPath().getName().equals("part-m-00000")) { + singlePartFileStatus = fileStatus; + } + if (fileStatus.getPath().getName().startsWith("part-m-")) { + ++partFiles; + } + } + if (partFiles != 1) { + throw new ArithmeticException( + "getSinglePartFile: Part file count should be 1, but is " + + partFiles); + } + return singlePartFileStatus; + } + + @Override + public void setUp() { + if (jobTracker != null) { + System.out.println("Setting tasks to 3 for " + getName() + + " since JobTracker exists..."); + numWorkers = 3; + } + try { + Configuration conf = new Configuration(); + FileSystem hdfs = FileSystem.get(conf); + // Since local jobs always use the same paths, remove them + Path oldLocalJobPaths = new Path( + GiraphJob.ZOOKEEPER_MANAGER_DIR_DEFAULT); + FileStatus[] fileStatusArr; + try { + fileStatusArr = hdfs.listStatus(oldLocalJobPaths); + for (FileStatus fileStatus : fileStatusArr) { + if (fileStatus.isDir() && + fileStatus.getPath().getName().contains("job_local")) { + System.out.println("Cleaning up local job path " + + fileStatus.getPath().getName()); + hdfs.delete(oldLocalJobPaths, true); + } + } + } catch (FileNotFoundException e) { + // ignore this FileNotFound exception and continue. + } + if (zkList == null) { + return; + } + ZooKeeperExt zooKeeperExt = + new ZooKeeperExt(zkList, 30*1000, this); + List<String> rootChildren = zooKeeperExt.getChildren("/", false); + for (String rootChild : rootChildren) { + if (rootChild.startsWith("_hadoopBsp")) { + List<String> children = + zooKeeperExt.getChildren("/" + rootChild, false); + for (String child: children) { + if (child.contains("job_local_")) { + System.out.println("Cleaning up /_hadoopBsp/" + + child); + zooKeeperExt.deleteExt( + "/_hadoopBsp/" + child, -1, true); } - if (fileStatus.getPath().getName().startsWith("part-m-")) { - ++partFiles; - } - } - if (partFiles != 1) { - throw new ArithmeticException( - "getSinglePartFile: Part file count should be 1, but is " + - partFiles); - } - return singlePartFileStatus; - } - - @Override - public void setUp() { - if (jobTracker != null) { - System.out.println("Setting tasks to 3 for " + getName() + - " since JobTracker exists..."); - numWorkers = 3; + } } - try { - Configuration conf = new Configuration(); - FileSystem hdfs = FileSystem.get(conf); - // Since local jobs always use the same paths, remove them - Path oldLocalJobPaths = new Path( - GiraphJob.ZOOKEEPER_MANAGER_DIR_DEFAULT); - FileStatus[] fileStatusArr; - try { - fileStatusArr = hdfs.listStatus(oldLocalJobPaths); - for (FileStatus fileStatus : fileStatusArr) { - if (fileStatus.isDir() && - fileStatus.getPath().getName().contains("job_local")) { - System.out.println("Cleaning up local job path " + - fileStatus.getPath().getName()); - hdfs.delete(oldLocalJobPaths, true); - } - } - } catch (FileNotFoundException e) { - // ignore this FileNotFound exception and continue. - } - if (zkList == null) { - return; - } - ZooKeeperExt zooKeeperExt = - new ZooKeeperExt(zkList, 30*1000, this); - List<String> rootChildren = zooKeeperExt.getChildren("/", false); - for (String rootChild : rootChildren) { - if (rootChild.startsWith("_hadoopBsp")) { - List<String> children = - zooKeeperExt.getChildren("/" + rootChild, false); - for (String child: children) { - if (child.contains("job_local_")) { - System.out.println("Cleaning up /_hadoopBsp/" + - child); - zooKeeperExt.deleteExt( - "/_hadoopBsp/" + child, -1, true); - } - } - } - } - zooKeeperExt.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public void process(WatchedEvent event) { - // Do nothing - } - - /** - * Helper method to remove an old output directory if it exists, - * and set the output path for any VertexOutputFormat that uses - * FileOutputFormat. - * - * @param job Job to set the output path for - * @param outputPathString Path to output as a string - * @throws IOException - */ - public static void removeAndSetOutput(GiraphJob job, - Path outputPath) - throws IOException { - remove(job.getConfiguration(), outputPath); - FileOutputFormat.setOutputPath(job, outputPath); - } - - /** - * Helper method to remove a path if it exists. - * - * @param conf Configuration to load FileSystem from - * @param path Path to remove - * @throws IOException - */ - public static void remove(Configuration conf, Path path) - throws IOException { - FileSystem hdfs = FileSystem.get(conf); - hdfs.delete(path, true); - } - - public static String getCallingMethodName() { - return Thread.currentThread().getStackTrace()[2].getMethodName(); - } + } + zooKeeperExt.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void process(WatchedEvent event) { + // Do nothing + } + + /** + * Helper method to remove an old output directory if it exists, + * and set the output path for any VertexOutputFormat that uses + * FileOutputFormat. + * + * @param job Job to set the output path for + * @param outputPathString Path to output as a string + * @throws IOException + */ + public static void removeAndSetOutput(GiraphJob job, + Path outputPath) throws IOException { + remove(job.getConfiguration(), outputPath); + FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath); + } + + /** + * Helper method to remove a path if it exists. + * + * @param conf Configuration to load FileSystem from + * @param path Path to remove + * @throws IOException + */ + public static void remove(Configuration conf, Path path) + throws IOException { + FileSystem hdfs = FileSystem.get(conf); + hdfs.delete(path, true); + } + + public static String getCallingMethodName() { + return Thread.currentThread().getStackTrace()[2].getMethodName(); + } } Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java?rev=1306014&r1=1306013&r2=1306014&view=diff ============================================================================== --- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java (original) +++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java Tue Mar 27 20:49:39 2012 @@ -84,7 +84,7 @@ public class TestJsonBase64Format extend job.setVertexInputFormatClass(JsonBase64VertexInputFormat.class); job.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class); job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 3); - FileInputFormat.setInputPaths(job, outputPath); + FileInputFormat.setInputPaths(job.getInternalJob(), outputPath); Path outputPath2 = new Path("/tmp/" + getCallingMethodName() + "2"); removeAndSetOutput(job, outputPath2); assertTrue(job.run(true));