From: [email protected]
Date: July 13, 2009 12:38:52 PM EDT
To: [email protected]
Subject: svn commit: r793620 - /lucene/mahout/trunk/core/src/main/
java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
Reply-To: [email protected]
Author: jeastman
Date: Mon Jul 13 16:38:52 2009
New Revision: 793620
URL: http://svn.apache.org/viewvc?rev=793620&view=rev
Log:
- modified KMeaansDriver.isConverged() to iterate over all part
files in the clusters directories
- removed '/part-0000' append from runIteration()
- unit test no longer throws exceptions
- example synthetic control job runs
- still some formatting differences between Eclipse and JBuilder
Modified:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/
clustering/kmeans/KMeansDriver.java
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/
clustering/kmeans/KMeansDriver.java
URL:
http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java?rev=793620&r1=793619&r2=793620&view=diff
=
=
=
=
=
=
=
=
======================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/
clustering/kmeans/KMeansDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/
clustering/kmeans/KMeansDriver.java Mon Jul 13 16:38:52 2009
@@ -16,6 +16,8 @@
*/
package org.apache.mahout.clustering.kmeans;
+import java.io.IOException;
+
import org.apache.commons.cli2.CommandLine;
import org.apache.commons.cli2.Group;
import org.apache.commons.cli2.Option;
@@ -24,6 +26,7 @@
import org.apache.commons.cli2.builder.DefaultOptionBuilder;
import org.apache.commons.cli2.builder.GroupBuilder;
import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
@@ -43,8 +46,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
public class KMeansDriver {
/** The name of the directory used to output final results. */
@@ -56,58 +57,68 @@
}
/** @param args Expects 7 args and they all correspond to the
order of the params in {...@link #runJob} */
- public static void main(String[] args) throws
ClassNotFoundException, IOException, IllegalAccessException,
InstantiationException {
+ public static void main(String[] args) throws
ClassNotFoundException, IOException, IllegalAccessException,
+ InstantiationException {
DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
ArgumentBuilder abuilder = new ArgumentBuilder();
GroupBuilder gbuilder = new GroupBuilder();
Option inputOpt =
obuilder.withLongName("input").withRequired(true).withArgument(
-
abuilder.withName("input").withMinimum(1).withMaximum(1).create()).
- withDescription("The Path for input Vectors. Must be a
SequenceFile of Writable, Vector").withShortName("i").create();
+
abuilder
.withName
("input").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The Path for input Vectors. Must be a SequenceFile of
Writable, Vector").withShortName("i").create();
- Option clustersOpt =
obuilder.withLongName("clusters").withRequired(true).withArgument(
-
abuilder.withName("clusters").withMinimum(1).withMaximum(1).create()).
- withDescription("The input centroids, as Vectors. Must be
a SequenceFile of Writable, Cluster/Canopy. " +
- "If k is also specified, then a random set of vectors
will be selected and written out to this path
first").withShortName("c").create();
-
- Option kOpt =
obuilder.withLongName("k").withRequired(false).withArgument(
-
abuilder.withName("k").withMinimum(1).withMaximum(1).create()).
- withDescription("The k in k-Means. If specified, then a
random selection of k Vectors will be chosen as the Centroid and
written to the clusters output path.").withShortName("k").create();
+ Option clustersOpt = obuilder
+ .withLongName("clusters")
+ .withRequired(true)
+
.withArgument
(abuilder.withName("clusters").withMinimum(1).withMaximum(1).create())
+ .withDescription(
+ "The input centroids, as Vectors. Must be a
SequenceFile of Writable, Cluster/Canopy. "
+ + "If k is also specified, then a random set of
vectors will be selected and written out to this path first")
+ .withShortName("c").create();
+
+ Option kOpt = obuilder
+ .withLongName("k")
+ .withRequired(false)
+
.withArgument
(abuilder.withName("k").withMinimum(1).withMaximum(1).create())
+ .withDescription(
+ "The k in k-Means. If specified, then a random
selection of k Vectors will be chosen as the Centroid and written to
the clusters output path.")
+ .withShortName("k").create();
Option outputOpt =
obuilder.withLongName("output").withRequired(true).withArgument(
-
abuilder.withName("output").withMinimum(1).withMaximum(1).create()).
- withDescription("The Path to put the output
in").withShortName("o").create();
+
abuilder
.withName
("output").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The Path to put the output in").withShortName("o").create();
- Option overwriteOutput =
obuilder.withLongName("overwrite").withRequired(false).
- withDescription("If set, overwrite the output
directory").withShortName("w").create();
+ Option overwriteOutput =
obuilder
.withLongName("overwrite").withRequired(false).withDescription(
+ "If set, overwrite the output
directory").withShortName("w").create();
Option measureClassOpt =
obuilder.withLongName("distance").withRequired(false).withArgument(
-
abuilder.withName("distance").withMinimum(1).withMaximum(1).create()).
- withDescription("The Distance Measure to use. Default is
SquaredEuclidean").withShortName("m").create();
+
abuilder
.withName
("distance").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The Distance Measure to use. Default is
SquaredEuclidean").withShortName("m").create();
Option convergenceDeltaOpt =
obuilder.withLongName("convergence").withRequired(false).withArgument(
-
abuilder
.withName("convergence").withMinimum(1).withMaximum(1).create()).
- withDescription("The threshold below which the clusters are
considered to be converged. Default is
0.5").withShortName("d").create();
+
abuilder
.withName
("convergence
").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The threshold below which the clusters are considered to
be converged. Default is 0.5").withShortName("d")
+ .create();
Option maxIterationsOpt =
obuilder.withLongName("max").withRequired(false).withArgument(
-
abuilder.withName("max").withMinimum(1).withMaximum(1).create()).
- withDescription("The maximum number of iterations to
perform. Default is 20").withShortName("x").create();
+
abuilder
.withName
("max").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The maximum number of iterations to perform. Default is
20").withShortName("x").create();
Option vectorClassOpt =
obuilder.withLongName("vectorClass").withRequired(false).withArgument(
-
abuilder
.withName("vectorClass").withMinimum(1).withMaximum(1).create()).
- withDescription("The Vector implementation class name.
Default is SparseVector.class").withShortName("v").create();
+
abuilder
.withName
("vectorClass
").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The Vector implementation class name. Default is
SparseVector.class").withShortName("v").create();
Option numReduceTasksOpt =
obuilder.withLongName("numReduce").withRequired(false).withArgument(
-
abuilder
.withName("numReduce").withMinimum(1).withMaximum(1).create()).
- withDescription("The number of reduce
tasks").withShortName("r").create();
+
abuilder
.withName
("numReduce").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The number of reduce tasks").withShortName("r").create();
- Option helpOpt = obuilder.withLongName("help").
- withDescription("Print out
help").withShortName("h").create();
+ Option helpOpt =
obuilder.withLongName("help").withDescription("Print out
help").withShortName("h").create();
- Group group =
gbuilder
.withName
("Options
").withOption
(inputOpt
).withOption
(clustersOpt).withOption(outputOpt).withOption(measureClassOpt)
- .withOption
(convergenceDeltaOpt
).withOption
(maxIterationsOpt).withOption(numReduceTasksOpt).withOption(kOpt)
- .withOption
(vectorClassOpt
).withOption(overwriteOutput).withOption(helpOpt).create();
+ Group group =
gbuilder
.withName
("Options
").withOption(inputOpt).withOption(clustersOpt).withOption(outputOpt)
+
.withOption
(measureClassOpt
).withOption
(convergenceDeltaOpt).withOption(maxIterationsOpt).withOption(
+
numReduceTasksOpt
).withOption
(kOpt
).withOption(vectorClassOpt).withOption(overwriteOutput).withOption(
+ helpOpt).create();
try {
Parser parser = new Parser();
parser.setGroup(group);
@@ -129,11 +140,9 @@
convergenceDelta =
Double.parseDouble(cmdLine.getValue(convergenceDeltaOpt).toString());
}
- Class<? extends Vector> vectorClass =
cmdLine.hasOption(vectorClassOpt) == false ?
- SparseVector.class
+ Class<? extends Vector> vectorClass =
cmdLine.hasOption(vectorClassOpt) == false ? SparseVector.class
: (Class<? extends Vector>)
Class.forName(cmdLine.getValue(vectorClassOpt).toString());
-
int maxIterations = 20;
if (cmdLine.hasOption(maxIterationsOpt)) {
maxIterations =
Integer.parseInt(cmdLine.getValue(maxIterationsOpt).toString());
@@ -146,36 +155,35 @@
HadoopUtil.overwriteOutput(output);
}
if (cmdLine.hasOption(kOpt)) {
- clusters = RandomSeedGenerator.buildRandom(input, clusters,
Integer.parseInt(cmdLine.getValue(kOpt).toString())).toString();
+ clusters = RandomSeedGenerator
+ .buildRandom(input, clusters,
Integer.parseInt(cmdLine.getValue(kOpt).toString())).toString();
}
- runJob(input, clusters, output, measureClass, convergenceDelta,
- maxIterations, numReduceTasks, vectorClass);
+ runJob(input, clusters, output, measureClass,
convergenceDelta, maxIterations, numReduceTasks, vectorClass);
} catch (OptionException e) {
log.error("Exception", e);
CommandLineUtil.printHelp(group);
}
}
-
/**
* Run the job using supplied arguments
- *
- * @param input the directory pathname for input points
- * @param clustersIn the directory pathname for initial &
computed clusters
- * @param output the directory pathname for output points
- * @param measureClass the classname of the DistanceMeasure
+ *
+ * @param input the directory pathname for input points
+ * @param clustersIn the directory pathname for initial &
computed clusters
+ * @param output the directory pathname for output points
+ * @param measureClass the classname of the DistanceMeasure
* @param convergenceDelta the convergence delta value
- * @param maxIterations the maximum number of iterations
- * @param numReduceTasks the number of reducers
+ * @param maxIterations the maximum number of iterations
+ * @param numReduceTasks the number of reducers
*/
- public static void runJob(String input, String clustersIn, String
output,
- String measureClass, double
convergenceDelta, int maxIterations,
- int numReduceTasks, Class<? extends
Vector> vectorClass) {
+ public static void runJob(String input, String clustersIn, String
output, String measureClass,
+ double convergenceDelta, int maxIterations, int
numReduceTasks, Class<? extends Vector> vectorClass) {
// iterate until the clusters converge
String delta = Double.toString(convergenceDelta);
if (log.isInfoEnabled()) {
log.info("Input: " + input + " Clusters In: " + clustersIn + "
Out: " + output + " Distance: " + measureClass);
- log.info("convergence: " + convergenceDelta + " max
Iterations: " + maxIterations + " num Reduce Tasks: " +
numReduceTasks + " Input Vectors: " + vectorClass.getName());
+ log.info("convergence: " + convergenceDelta + " max
Iterations: " + maxIterations + " num Reduce Tasks: "
+ + numReduceTasks + " Input Vectors: " +
vectorClass.getName());
}
boolean converged = false;
int iteration = 0;
@@ -183,8 +191,7 @@
log.info("Iteration {}", iteration);
// point the output to a new directory per iteration
String clustersOut = output + "/clusters-" + iteration;
- converged = runIteration(input, clustersIn, clustersOut,
measureClass,
- delta, numReduceTasks, iteration);
+ converged = runIteration(input, clustersIn, clustersOut,
measureClass, delta, numReduceTasks, iteration);
// now point the input to the old output directory
clustersIn = output + "/clusters-" + iteration;
iteration++;
@@ -196,19 +203,18 @@
/**
* Run the job using supplied arguments
- *
- * @param input the directory pathname for input points
- * @param clustersIn the directory pathname for input
clusters
- * @param clustersOut the directory pathname for output
clusters
- * @param measureClass the classname of the DistanceMeasure
+ *
+ * @param input the directory pathname for input points
+ * @param clustersIn the directory pathname for input clusters
+ * @param clustersOut the directory pathname for output clusters
+ * @param measureClass the classname of the DistanceMeasure
* @param convergenceDelta the convergence delta value
- * @param numReduceTasks the number of reducer tasks
- * @param iteration The iteration number
+ * @param numReduceTasks the number of reducer tasks
+ * @param iteration The iteration number
* @return true if the iteration successfully runs
*/
- private static boolean runIteration(String input, String
clustersIn,
- String clustersOut, String
measureClass, String convergenceDelta,
- int numReduceTasks, int
iteration) {
+ private static boolean runIteration(String input, String
clustersIn, String clustersOut, String measureClass,
+ String convergenceDelta, int numReduceTasks, int iteration) {
JobConf conf = new JobConf(KMeansDriver.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(KMeansInfo.class);
@@ -229,11 +235,10 @@
conf.set(Cluster.CLUSTER_CONVERGENCE_KEY, convergenceDelta);
conf.setInt(Cluster.ITERATION_NUMBER, iteration);
-
try {
JobClient.runJob(conf);
FileSystem fs = FileSystem.get(outPath.toUri(), conf);
- return isConverged(clustersOut + "/part-00000", conf, fs);
+ return isConverged(clustersOut, conf, fs);
} catch (IOException e) {
log.warn(e.toString(), e);
return true;
@@ -242,15 +247,15 @@
/**
* Run the job using supplied arguments
- *
- * @param input the directory pathname for input points
- * @param clustersIn the directory pathname for input
clusters
- * @param output the directory pathname for output points
- * @param measureClass the classname of the DistanceMeasure
+ *
+ * @param input the directory pathname for input points
+ * @param clustersIn the directory pathname for input clusters
+ * @param output the directory pathname for output points
+ * @param measureClass the classname of the DistanceMeasure
* @param convergenceDelta the convergence delta value
*/
- private static void runClustering(String input, String clustersIn,
- String output, String
measureClass, String convergenceDelta, Class<? extends Vector>
vectorClass) {
+ private static void runClustering(String input, String
clustersIn, String output, String measureClass,
+ String convergenceDelta, Class<? extends Vector> vectorClass) {
if (log.isInfoEnabled()) {
log.info("Running Clustering");
log.info("Input: " + input + " Clusters In: " + clustersIn + "
Out: " + output + " Distance: " + measureClass);
@@ -263,7 +268,7 @@
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(vectorClass);
conf.setOutputKeyClass(Text.class);
- //the output is the cluster id
+ // the output is the cluster id
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, new Path(input));
@@ -284,33 +289,36 @@
}
/**
- * Return if all of the Clusters in the filePath have converged
or not
- *
+ * Return if all of the Clusters in the parts in the filePath
have converged or not
+ *
* @param filePath the file path to the single file containing the
clusters
- * @param conf the JobConf
- * @param fs the FileSystem
+ * @param conf the JobConf
+ * @param fs the FileSystem
* @return true if all Clusters are converged
* @throws IOException if there was an IO error
*/
- private static boolean isConverged(String filePath, JobConf conf,
FileSystem fs)
- throws IOException {
- Path outPart = new Path(filePath + "/*");
- SequenceFile.Reader reader = new SequenceFile.Reader(fs,
outPart, conf);
- Writable key;
- try {
- key = (Writable) reader.getKeyClass().newInstance();
- } catch (InstantiationException e) {//shouldn't happen
- log.error("Exception", e);
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- log.error("Exception", e);
- throw new RuntimeException(e);
- }
- Cluster value = new Cluster();
- boolean converged = true;
- while (converged && reader.next(key, value)) {
- converged = value.isConverged();
- }
- return converged;
+ private static boolean isConverged(String filePath, JobConf conf,
FileSystem fs) throws IOException {
+ FileStatus[] parts = fs.listStatus(new Path(filePath));
+ for (FileStatus part : parts)
+ if (!part.getPath().getName().endsWith(".crc")) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs,
part.getPath(), conf);
+ Writable key;
+ try {
+ key = (Writable) reader.getKeyClass().newInstance();
+ } catch (InstantiationException e) {// shouldn't happen
+ log.error("Exception", e);
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ log.error("Exception", e);
+ throw new RuntimeException(e);
+ }
+ Cluster value = new Cluster();
+ while (reader.next(key, value)) {
+ if (value.isConverged() == false) {
+ return false;
+ }
+ }
+ }
+ return true;
}
}