Author: aching
Date: Tue Mar 27 20:49:39 2012
New Revision: 1306014

URL: http://svn.apache.org/viewvc?rev=1306014&view=rev
Log:
GIRAPH-144: GiraphJob should not extend Job (users should not be able
to call Job methods like waitForCompletion or setMapper..etc)
(aching).

Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java
    
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1306014&r1=1306013&r2=1306014&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Tue Mar 27 20:49:39 2012
@@ -2,6 +2,10 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-144: GiraphJob should not extend Job (users should not be
+  able to call Job methods like waitForCompletion or setMapper..etc)
+  (aching).
+
   GIRAPH-159: Case insensitive file/directory name matching will
   produce errors on M/R jar unpack (bfem via aching).
 

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java?rev=1306014&r1=1306013&r2=1306014&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java 
(original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/GiraphRunner.java 
Tue Mar 27 20:49:39 2012
@@ -121,7 +121,8 @@ public class GiraphRunner implements Too
     job.setVertexOutputFormatClass(Class.forName(cmd.getOptionValue("of")));
 
     if (cmd.hasOption("ip")) {
-      FileInputFormat.addInputPath(job, new Path(cmd.getOptionValue("ip")));
+      FileInputFormat.addInputPath(job.getInternalJob(),
+                                   new Path(cmd.getOptionValue("ip")));
     } else {
       if (LOG.isInfoEnabled()) {
         LOG.info("No input path specified. Ensure your InputFormat does" +
@@ -130,7 +131,8 @@ public class GiraphRunner implements Too
     }
 
     if (cmd.hasOption("op")) {
-      FileOutputFormat.setOutputPath(job, new Path(cmd.getOptionValue("op")));
+      FileOutputFormat.setOutputPath(job.getInternalJob(),
+                                     new Path(cmd.getOptionValue("op")));
     } else {
       if (LOG.isInfoEnabled()) {
         LOG.info("No output path specified. Ensure your OutputFormat does" +

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1306014&r1=1306013&r2=1306014&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
 Tue Mar 27 20:49:39 2012
@@ -226,8 +226,8 @@ public class SimpleCheckpointVertex exte
     int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
     bspJob.setWorkerConfiguration(minWorkers, maxWorkers, 100.0f);
 
-    FileOutputFormat.setOutputPath(bspJob,
-        new Path(cmd.getOptionValue('o')));
+    FileOutputFormat.setOutputPath(bspJob.getInternalJob(),
+                                   new Path(cmd.getOptionValue('o')));
     boolean verbose = false;
     if (cmd.hasOption('v')) {
       verbose = true;

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java?rev=1306014&r1=1306013&r2=1306014&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
 Tue Mar 27 20:49:39 2012
@@ -262,8 +262,10 @@ public class SimpleShortestPathsVertex e
         SimpleShortestPathsVertexInputFormat.class);
     job.setVertexOutputFormatClass(
         SimpleShortestPathsVertexOutputFormat.class);
-    FileInputFormat.addInputPath(job, new Path(argArray[0]));
-    FileOutputFormat.setOutputPath(job, new Path(argArray[1]));
+    FileInputFormat.addInputPath(job.getInternalJob(),
+                                 new Path(argArray[0]));
+    FileOutputFormat.setOutputPath(job.getInternalJob(),
+                                   new Path(argArray[1]));
     job.getConfiguration().setLong(SimpleShortestPathsVertex.SOURCE_ID,
         Long.parseLong(argArray[2]));
     job.setWorkerConfiguration(Integer.parseInt(argArray[3]),

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1306014&r1=1306013&r2=1306014&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java 
(original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java 
Tue Mar 27 20:49:39 2012
@@ -29,10 +29,11 @@ import org.apache.log4j.Logger;
 import java.io.IOException;
 
 /**
- * Limits the functions that can be called by the user.  Job is too flexible
- * for our needs.  For instance, our job should not have any reduce tasks.
+ * Generates an appropriate internal {@link Job} for using Giraph in Hadoop.
+ * Uses composition to avoid unwanted {@link Job} methods from exposure
+ * to the user.
  */
-public class GiraphJob extends Job {
+public class GiraphJob {
   static {
     Configuration.addDefaultResource("giraph-site.xml");
   }
@@ -345,6 +346,12 @@ public class GiraphJob extends Job {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(GiraphJob.class);
 
+  /** Internal job that actually is submitted */
+  private final Job job;
+  /** Helper configuration from the job */
+  private final Configuration conf;
+
+
   /**
    * Constructor that will instantiate the configuration
    *
@@ -352,7 +359,7 @@ public class GiraphJob extends Job {
    * @throws IOException
    */
   public GiraphJob(String jobName) throws IOException {
-    super(new Configuration(), jobName);
+    this(new Configuration(), jobName);
   }
 
   /**
@@ -363,10 +370,30 @@ public class GiraphJob extends Job {
    * @throws IOException
    */
   public GiraphJob(Configuration conf, String jobName) throws IOException {
-    super(conf, jobName);
+    job = new Job(conf, jobName);
+    this.conf = job.getConfiguration();
   }
 
   /**
+   * Get the configuration from the internal job.
+   *
+   * @return Configuration used by the job.
+   */
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  /**
+   * Be very cautious when using this method as it returns the internal job
+   * of {@link GiraphJob}.  This should only be used for methods that require
+   * access to the actual {@link Job}, i.e. FileInputFormat#addInputPath().
+   *
+   * @return Internal job that will actually be submitted to Hadoop.
+   */
+  public Job getInternalJob() {
+    return job;
+  }
+  /**
    * Make sure the configuration is set properly by the user prior to
    * submitting the job.
    */
@@ -580,7 +607,7 @@ public class GiraphJob extends Job {
     throws IOException, InterruptedException, ClassNotFoundException {
     checkConfiguration();
     checkLocalJobRunnerConfiguration(conf);
-    setNumReduceTasks(0);
+    job.setNumReduceTasks(0);
     // Most users won't hit this hopefully and can set it higher if desired
     setIntConfIfDefault("mapreduce.job.counters.limit", 512);
 
@@ -596,16 +623,16 @@ public class GiraphJob extends Job {
     // (DEFAULT_PING_INTERVAL)
     Client.setPingInterval(conf, 60000 * 5);
 
-    if (getJar() == null) {
-      setJarByClass(GiraphJob.class);
+    if (job.getJar() == null) {
+      job.setJarByClass(GiraphJob.class);
     }
     // Should work in MAPREDUCE-1938 to let the user jars/classes
     // get loaded first
     conf.setBoolean("mapreduce.user.classpath.first", true);
 
-    setMapperClass(GraphMapper.class);
-    setInputFormatClass(BspInputFormat.class);
-    setOutputFormatClass(BspOutputFormat.class);
-    return waitForCompletion(verbose);
+    job.setMapperClass(GraphMapper.class);
+    job.setInputFormatClass(BspInputFormat.class);
+    job.setOutputFormatClass(BspOutputFormat.class);
+    return job.waitForCompletion(verbose);
   }
 }

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1306014&r1=1306013&r2=1306014&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
 Tue Mar 27 20:49:39 2012
@@ -129,8 +129,10 @@ public class InternalVertexRunner {
         conf.set(param.getKey(), param.getValue());
       }
 
-      FileInputFormat.addInputPath(job, new Path(inputFile.toString()));
-      FileOutputFormat.setOutputPath(job, new Path(outputDir.toString()));
+      FileInputFormat.addInputPath(job.getInternalJob(),
+                                   new Path(inputFile.toString()));
+      FileOutputFormat.setOutputPath(job.getInternalJob(),
+                                     new Path(outputDir.toString()));
 
       // configure a local zookeeper instance
       Properties zkProperties = new Properties();

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java?rev=1306014&r1=1306013&r2=1306014&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java 
(original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java Tue Mar 
27 20:49:39 2012
@@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -41,215 +40,213 @@ import junit.framework.TestCase;
  * Extended TestCase for making setting up Bsp testing.
  */
 public class BspCase extends TestCase implements Watcher {
-    /** JobTracker system property */
-    private final String jobTracker =
-        System.getProperty("prop.mapred.job.tracker");
-    /** Jar location system property */
-    private final String jarLocation =
-        System.getProperty("prop.jarLocation", "");
-    /** Number of actual processes for the BSP application */
-    private int numWorkers = 1;
-    /** ZooKeeper list system property */
-    private final String zkList = System.getProperty("prop.zookeeper.list");
-
-    /**
-     * Adjust the configuration to the basic test case
-     */
-    public final void setupConfiguration(GiraphJob job) {
-        Configuration conf = job.getConfiguration();
-        conf.set("mapred.jar", getJarLocation());
-
-        // Allow this test to be run on a real Hadoop setup
-        if (getJobTracker() != null) {
-            System.out.println("setup: Sending job to job tracker " +
-                       getJobTracker() + " with jar path " + getJarLocation()
-                       + " for " + getName());
-            conf.set("mapred.job.tracker", getJobTracker());
-            job.setWorkerConfiguration(getNumWorkers(),
-                                       getNumWorkers(),
-                                       100.0f);
-        }
-        else {
-            System.out.println("setup: Using local job runner with " +
-                               "location " + getJarLocation() + " for "
-                               + getName());
-            job.setWorkerConfiguration(1, 1, 100.0f);
-            // Single node testing
-            conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false);
-        }
-        conf.setInt(GiraphJob.POLL_ATTEMPTS, 10);
-        conf.setInt(GiraphJob.POLL_MSECS, 3*1000);
-        conf.setInt(GiraphJob.ZOOKEEPER_SERVERLIST_POLL_MSECS, 500);
-        if (getZooKeeperList() != null) {
-            job.setZooKeeperConfiguration(getZooKeeperList());
-        }
-        // GeneratedInputSplit will generate 5 vertices
-        conf.setLong(GeneratedVertexReader.READER_VERTICES, 5);
-    }
-
-    /**
-     * Create the test case
-     *
-     * @param testName name of the test case
-     */
-    public BspCase(String testName) {
-        super(testName);
-
-    }
-
-    /**
-     * Get the number of workers used in the BSP application
-     *
-     * @param numProcs number of processes to use
-     */
-    public int getNumWorkers() {
-        return numWorkers;
-    }
-
-    /**
-     * Get the ZooKeeper list
-     */
-    public String getZooKeeperList() {
-        return zkList;
-    }
-
-    /**
-     * Get the jar location
-     *
-     * @return location of the jar file
-     */
-    String getJarLocation() {
-        return jarLocation;
-    }
-
-    /**
-     * Get the job tracker location
-     *
-     * @return job tracker location as a string
-     */
-    String getJobTracker() {
-        return jobTracker;
-    }
-
-    /**
-     * Get the single part file status and make sure there is only one part
-     *
-     * @param fs Filesystem to look for the part file
-     * @param partDirPath Directory where the single part file should exist
-     * @return Single part file status
-     * @throws IOException
-     */
-    public static FileStatus getSinglePartFileStatus(Job job,
-                                                     Path partDirPath)
-            throws IOException {
-        FileSystem fs = FileSystem.get(job.getConfiguration());
-        FileStatus[] statusArray = fs.listStatus(partDirPath);
-        FileStatus singlePartFileStatus = null;
-        int partFiles = 0;
-        for (FileStatus fileStatus : statusArray) {
-            if (fileStatus.getPath().getName().equals("part-m-00000")) {
-                singlePartFileStatus = fileStatus;
+  /** JobTracker system property */
+  private final String jobTracker =
+      System.getProperty("prop.mapred.job.tracker");
+  /** Jar location system property */
+  private final String jarLocation =
+      System.getProperty("prop.jarLocation", "");
+  /** Number of actual processes for the BSP application */
+  private int numWorkers = 1;
+  /** ZooKeeper list system property */
+  private final String zkList = System.getProperty("prop.zookeeper.list");
+
+  /**
+   * Adjust the configuration to the basic test case
+   */
+  public final void setupConfiguration(GiraphJob job) {
+    Configuration conf = job.getConfiguration();
+    conf.set("mapred.jar", getJarLocation());
+
+    // Allow this test to be run on a real Hadoop setup
+    if (getJobTracker() != null) {
+      System.out.println("setup: Sending job to job tracker " +
+          getJobTracker() + " with jar path " + getJarLocation()
+          + " for " + getName());
+      conf.set("mapred.job.tracker", getJobTracker());
+      job.setWorkerConfiguration(getNumWorkers(),
+          getNumWorkers(),
+          100.0f);
+    }
+    else {
+      System.out.println("setup: Using local job runner with " +
+          "location " + getJarLocation() + " for "
+          + getName());
+      job.setWorkerConfiguration(1, 1, 100.0f);
+      // Single node testing
+      conf.setBoolean(GiraphJob.SPLIT_MASTER_WORKER, false);
+    }
+    conf.setInt(GiraphJob.POLL_ATTEMPTS, 10);
+    conf.setInt(GiraphJob.POLL_MSECS, 3*1000);
+    conf.setInt(GiraphJob.ZOOKEEPER_SERVERLIST_POLL_MSECS, 500);
+    if (getZooKeeperList() != null) {
+      job.setZooKeeperConfiguration(getZooKeeperList());
+    }
+    // GeneratedInputSplit will generate 5 vertices
+    conf.setLong(GeneratedVertexReader.READER_VERTICES, 5);
+  }
+
+  /**
+   * Create the test case
+   *
+   * @param testName name of the test case
+   */
+  public BspCase(String testName) {
+    super(testName);
+
+  }
+
+  /**
+   * Get the number of workers used in the BSP application
+   *
+   * @param numProcs number of processes to use
+   */
+  public int getNumWorkers() {
+    return numWorkers;
+  }
+
+  /**
+   * Get the ZooKeeper list
+   */
+  public String getZooKeeperList() {
+    return zkList;
+  }
+
+  /**
+   * Get the jar location
+   *
+   * @return location of the jar file
+   */
+  String getJarLocation() {
+    return jarLocation;
+  }
+
+  /**
+   * Get the job tracker location
+   *
+   * @return job tracker location as a string
+   */
+  String getJobTracker() {
+    return jobTracker;
+  }
+
+  /**
+   * Get the single part file status and make sure there is only one part
+   *
+   * @param job Job to get the file system from
+   * @param partDirPath Directory where the single part file should exist
+   * @return Single part file status
+   * @throws IOException
+   */
+  public static FileStatus getSinglePartFileStatus(GiraphJob job,
+      Path partDirPath) throws IOException {
+    FileSystem fs = FileSystem.get(job.getConfiguration());
+    FileStatus[] statusArray = fs.listStatus(partDirPath);
+    FileStatus singlePartFileStatus = null;
+    int partFiles = 0;
+    for (FileStatus fileStatus : statusArray) {
+      if (fileStatus.getPath().getName().equals("part-m-00000")) {
+        singlePartFileStatus = fileStatus;
+      }
+      if (fileStatus.getPath().getName().startsWith("part-m-")) {
+        ++partFiles;
+      }
+    }
+    if (partFiles != 1) {
+      throw new ArithmeticException(
+          "getSinglePartFile: Part file count should be 1, but is " +
+              partFiles);
+    }
+    return singlePartFileStatus;
+  }
+
+  @Override
+  public void setUp() {
+    if (jobTracker != null) {
+      System.out.println("Setting tasks to 3 for " + getName() +
+          " since JobTracker exists...");
+      numWorkers = 3;
+    }
+    try {
+      Configuration conf = new Configuration();
+      FileSystem hdfs = FileSystem.get(conf);
+      // Since local jobs always use the same paths, remove them
+      Path oldLocalJobPaths = new Path(
+          GiraphJob.ZOOKEEPER_MANAGER_DIR_DEFAULT);
+      FileStatus[] fileStatusArr;
+      try {
+        fileStatusArr = hdfs.listStatus(oldLocalJobPaths);
+        for (FileStatus fileStatus : fileStatusArr) {
+          if (fileStatus.isDir() &&
+              fileStatus.getPath().getName().contains("job_local")) {
+            System.out.println("Cleaning up local job path " +
+                fileStatus.getPath().getName());
+            hdfs.delete(oldLocalJobPaths, true);
+          }
+        }
+      } catch (FileNotFoundException e) {
+        // ignore this FileNotFound exception and continue.
+      }
+      if (zkList == null) {
+        return;
+      }
+      ZooKeeperExt zooKeeperExt =
+          new ZooKeeperExt(zkList, 30*1000, this);
+      List<String> rootChildren = zooKeeperExt.getChildren("/", false);
+      for (String rootChild : rootChildren) {
+        if (rootChild.startsWith("_hadoopBsp")) {
+          List<String> children =
+              zooKeeperExt.getChildren("/" + rootChild, false);
+          for (String child: children) {
+            if (child.contains("job_local_")) {
+              System.out.println("Cleaning up /_hadoopBsp/" +
+                  child);
+              zooKeeperExt.deleteExt(
+                  "/_hadoopBsp/" + child, -1, true);
             }
-            if (fileStatus.getPath().getName().startsWith("part-m-")) {
-                ++partFiles;
-            }
-        }
-        if (partFiles != 1) {
-            throw new ArithmeticException(
-                "getSinglePartFile: Part file count should be 1, but is " +
-                partFiles);
-        }
-        return singlePartFileStatus;
-    }
-
-    @Override
-    public void setUp() {
-        if (jobTracker != null) {
-            System.out.println("Setting tasks to 3 for " + getName() +
-                               " since JobTracker exists...");
-            numWorkers = 3;
+          }
         }
-        try {
-            Configuration conf = new Configuration();
-            FileSystem hdfs = FileSystem.get(conf);
-            // Since local jobs always use the same paths, remove them
-            Path oldLocalJobPaths = new Path(
-                GiraphJob.ZOOKEEPER_MANAGER_DIR_DEFAULT);
-            FileStatus[] fileStatusArr;
-            try {
-                fileStatusArr = hdfs.listStatus(oldLocalJobPaths);
-                for (FileStatus fileStatus : fileStatusArr) {
-                    if (fileStatus.isDir() &&
-                        fileStatus.getPath().getName().contains("job_local")) {
-                        System.out.println("Cleaning up local job path " +
-                            fileStatus.getPath().getName());
-                        hdfs.delete(oldLocalJobPaths, true);
-                    }
-                }
-            } catch (FileNotFoundException e) {
-                // ignore this FileNotFound exception and continue.
-            }
-            if (zkList == null) {
-                return;
-            }
-            ZooKeeperExt zooKeeperExt =
-                new ZooKeeperExt(zkList, 30*1000, this);
-            List<String> rootChildren = zooKeeperExt.getChildren("/", false);
-            for (String rootChild : rootChildren) {
-                if (rootChild.startsWith("_hadoopBsp")) {
-                    List<String> children =
-                        zooKeeperExt.getChildren("/" + rootChild, false);
-                    for (String child: children) {
-                        if (child.contains("job_local_")) {
-                            System.out.println("Cleaning up /_hadoopBsp/" +
-                                               child);
-                            zooKeeperExt.deleteExt(
-                                "/_hadoopBsp/" + child, -1, true);
-                        }
-                    }
-                }
-            }
-            zooKeeperExt.close();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-        // Do nothing
-    }
-
-    /**
-     * Helper method to remove an old output directory if it exists,
-     * and set the output path for any VertexOutputFormat that uses
-     * FileOutputFormat.
-     *
-     * @param job Job to set the output path for
-     * @param outputPathString Path to output as a string
-     * @throws IOException
-     */
-    public static void removeAndSetOutput(GiraphJob job,
-                                          Path outputPath)
-            throws IOException {
-        remove(job.getConfiguration(), outputPath);
-        FileOutputFormat.setOutputPath(job, outputPath);
-    }
-    
-    /**
-     * Helper method to remove a path if it exists.
-     * 
-     * @param conf Configuration to load FileSystem from
-     * @param path Path to remove
-     * @throws IOException
-     */
-    public static void remove(Configuration conf, Path path) 
-            throws IOException {
-        FileSystem hdfs = FileSystem.get(conf);
-        hdfs.delete(path, true);
-    }
-
-    public static String getCallingMethodName() {
-        return Thread.currentThread().getStackTrace()[2].getMethodName();
-    }
+      }
+      zooKeeperExt.close();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    // Do nothing
+  }
+
+  /**
+   * Helper method to remove an old output directory if it exists,
+   * and set the output path for any VertexOutputFormat that uses
+   * FileOutputFormat.
+   *
+   * @param job Job to set the output path for
+   * @param outputPathString Path to output as a string
+   * @throws IOException
+   */
+  public static void removeAndSetOutput(GiraphJob job,
+      Path outputPath) throws IOException {
+    remove(job.getConfiguration(), outputPath);
+    FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath);
+  }
+
+  /**
+   * Helper method to remove a path if it exists.
+   *
+   * @param conf Configuration to load FileSystem from
+   * @param path Path to remove
+   * @throws IOException
+   */
+  public static void remove(Configuration conf, Path path)
+      throws IOException {
+    FileSystem hdfs = FileSystem.get(conf);
+    hdfs.delete(path, true);
+  }
+
+  public static String getCallingMethodName() {
+    return Thread.currentThread().getStackTrace()[2].getMethodName();
+  }
 }

Modified: 
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java?rev=1306014&r1=1306013&r2=1306014&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java
 (original)
+++ 
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java
 Tue Mar 27 20:49:39 2012
@@ -84,7 +84,7 @@ public class TestJsonBase64Format extend
     job.setVertexInputFormatClass(JsonBase64VertexInputFormat.class);
     job.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
     job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 3);
-    FileInputFormat.setInputPaths(job, outputPath);
+    FileInputFormat.setInputPaths(job.getInternalJob(), outputPath);
     Path outputPath2 = new Path("/tmp/" + getCallingMethodName() + "2");
     removeAndSetOutput(job, outputPath2);
     assertTrue(job.run(true));


Reply via email to