I am trying to write a short script to cluster my data via clojure (calling
Mahout classes though). I have my input data in this format (which is an output
from a [php script][1])
format: (tag) (image) (frequency)
tag_sit image_a 0
tag_sit image_b 1
tag_lorem image_a 1
tag_lorem image_b 0
tag_dolor image_a 0
tag_dolor image_b 1
tag_ipsum image_a 1
tag_ipsum image_b 1
tag_amit image_a 1
tag_amit image_b 0
... (more)
Then I write them into a SequenceFile using this script (clojure)
#!./bin/clj
(ns sensei.sequence.core)
(require 'clojure.string)
(require 'clojure.java.io)
(import org.apache.hadoop.conf.Configuration)
(import org.apache.hadoop.fs.FileSystem)
(import org.apache.hadoop.fs.Path)
(import org.apache.hadoop.io.SequenceFile)
(import org.apache.hadoop.io.Text)
(import org.apache.mahout.math.VectorWritable)
(import org.apache.mahout.math.SequentialAccessSparseVector)
(let [output_path (new Path "test/sensei")
hadoop_configuration ((fn []
(let [conf (new Configuration)]
(. conf set "fs.default.name"
"hdfs://127.0.0.1:9000/")
conf)))
hadoop_fs (FileSystem/get hadoop_configuration)
writer (SequenceFile/createWriter
hadoop_fs
hadoop_configuration
output_path
Text
VectorWritable)
write-vector (fn [input]
(let [[tag photo_list] input
output_vector (new VectorWritable)]
(. writer append
(new Text tag)
(let []
(. output_vector set
((fn [row_vector index frequency_list]
(if (empty? frequency_list)
row_vector
(let []
(. row_vector set
index
(Integer/parseInt (first
frequency_list)))
(recur
row_vector
(inc index)
(rest frequency_list)))))
(new SequentialAccessSparseVector (count
(vals photo_list)))
0
(vals photo_list)))
output_vector))))
process_lines (fn [lines, processor]
(processor
(reduce
(fn [current next_line]
(let [[tag photo frequency]
(clojure.string/split next_line #" ")]
(if (nil? (first current))
(vector tag {photo frequency})
(if (= (first current) tag)
(vector (first current) (assoc (first
(rest current)) photo frequency))
(let [] (processor current) (vector
tag {photo frequency}))))))
[]
lines)))
process_file (fn [file_name]
(with-open [rdr (clojure.java.io/reader file_name)]
(process_lines (line-seq rdr)
write-vector)))]
(process_file *in*))
Basically it turns the input into sequence file, in this format
key (Text): $tag_uri
value (VectorWritable): a vector (cardinality = number of documents) with
numeric index and the respective frequency <0:1 1:0 2:0 3:1 4:0 ...>
EDIT: apparently cardinality needs to be 1, need to figure out how to do it
Then I proceed to do the actual cluster with this script (by referring to this
[blog post][2])
#!./bin/clj
(ns sensei.clustering.fkmeans)
(import org.apache.hadoop.conf.Configuration)
(import org.apache.hadoop.fs.Path)
(import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver)
(import org.apache.mahout.common.distance.EuclideanDistanceMeasure)
(import org.apache.mahout.clustering.kmeans.RandomSeedGenerator)
(let [hadoop_configuration ((fn []
(let [conf (new Configuration)]
(. conf set "fs.default.name"
"hdfs://127.0.0.1:9000/")
conf)))
input_path (new Path "test/sensei")
output_path (new Path "test/clusters")
clusters_in_path (new Path "test/clusters/cluster-0")]
(FuzzyKMeansDriver/run
hadoop_configuration
input_path
(RandomSeedGenerator/buildRandom
hadoop_configuration
input_path
clusters_in_path
(int 2)
(new EuclideanDistanceMeasure))
output_path
(new EuclideanDistanceMeasure)
(double 0.5)
(int 10)
(float 5.0)
true
false
(double 0.0)
false))
However I am getting output like this
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
11/08/25 15:20:16 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new compressor
11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new decompressor
11/08/25 15:20:17 WARN mapred.JobClient: Use GenericOptionsParser for
parsing the arguments. Applications should implement Tool for the same.
11/08/25 15:20:17 INFO input.FileInputFormat: Total input paths to process
: 1
11/08/25 15:20:17 INFO mapred.JobClient: Running job: job_local_0001
11/08/25 15:20:17 INFO mapred.MapTask: io.sort.mb = 100
11/08/25 15:20:17 INFO mapred.MapTask: data buffer = 79691776/99614720
11/08/25 15:20:17 INFO mapred.MapTask: record buffer = 262144/327680
11/08/25 15:20:17 WARN mapred.LocalJobRunner: job_local_0001
java.lang.IllegalStateException: No clusters found. Check your -c path.
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansMapper.setup(FuzzyKMeansMapper.java:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
11/08/25 15:20:18 INFO mapred.JobClient: map 0% reduce 0%
11/08/25 15:20:18 INFO mapred.JobClient: Job complete: job_local_0001
11/08/25 15:20:18 INFO mapred.JobClient: Counters: 0
Exception in thread "main" java.lang.RuntimeException:
java.lang.InterruptedException: Fuzzy K-Means Iteration failed processing
test/clusters/cluster-0/part-randomSeed
at clojure.lang.Util.runtimeException(Util.java:153)
at clojure.lang.Compiler.eval(Compiler.java:6417)
at clojure.lang.Compiler.load(Compiler.java:6843)
at clojure.lang.Compiler.loadFile(Compiler.java:6804)
at clojure.main$load_script.invoke(main.clj:282)
at clojure.main$script_opt.invoke(main.clj:342)
at clojure.main$main.doInvoke(main.clj:426)
at clojure.lang.RestFn.invoke(RestFn.java:436)
at clojure.lang.Var.invoke(Var.java:409)
at clojure.lang.AFn.applyToHelper(AFn.java:167)
at clojure.lang.Var.applyTo(Var.java:518)
at clojure.main.main(main.java:37)
Caused by: java.lang.InterruptedException: Fuzzy K-Means Iteration failed
processing test/clusters/cluster-0/part-randomSeed
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.runIteration(FuzzyKMeansDriver.java:252)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersMR(FuzzyKMeansDriver.java:421)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(FuzzyKMeansDriver.java:345)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:295)
at sensei.clustering.fkmeans$eval17.invoke(fkmeans.clj:35)
at clojure.lang.Compiler.eval(Compiler.java:6406)
... 10 more
EDIT: Turns out clustering via command line tool fails too
$ bin/mahout fkmeans --input test/sensei --output test/clusters --clusters
test/clusters/clusters-0 --clustering --overwrite --emitMostLikely false
--numClusters 10 --maxIter 10 --m 5
Running on hadoop, using HADOOP_HOME=/home/jeffrey04/Applications/hadoop
HADOOP_CONF_DIR=/home/jeffrey04/Applications/hadoop/conf
MAHOUT-JOB:
/home/jeffrey04/Applications/mahout/examples/target/mahout-examples-0.6-SNAPSHOT-job.jar
11/08/25 15:59:43 INFO common.AbstractJob: Command line arguments:
{--clustering=null, --clusters=test/clusters/clusters-0,
--convergenceDelta=0.5,
--distanceMeasure=org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure,
--emitMostLikely=false, --endPhase=2147483647, --input=test/sensei, --m=5,
--maxIter=10, --method=mapreduce, --numClusters=10, --output=test/clusters,
--overwrite=null, --startPhase=0, --tempDir=temp, --threshold=0}
11/08/25 15:59:44 INFO util.NativeCodeLoader: Loaded the native-hadoop
library
11/08/25 15:59:44 INFO zlib.ZlibFactory: Successfully loaded & initialized
native-zlib library
11/08/25 15:59:44 INFO compress.CodecPool: Got brand-new compressor
11/08/25 15:59:44 INFO compress.CodecPool: Got brand-new decompressor
11/08/25 15:59:44 INFO kmeans.RandomSeedGenerator: Wrote 10 vectors to
test/clusters/clusters-0/part-randomSeed
11/08/25 15:59:44 INFO fuzzykmeans.FuzzyKMeansDriver: Fuzzy K-Means
Iteration 1
11/08/25 15:59:44 INFO input.FileInputFormat: Total input paths to process
: 1
11/08/25 15:59:45 INFO mapred.JobClient: Running job: job_201108241756_0001
11/08/25 15:59:46 INFO mapred.JobClient: map 0% reduce 0%
11/08/25 16:00:02 INFO mapred.JobClient: Task Id :
attempt_201108241756_0001_m_000000_0, Status : FAILED
org.apache.mahout.math.CardinalityException: Required cardinality 1 but got
10
at
org.apache.mahout.math.AbstractVector.getDistanceSquared(AbstractVector.java:251)
at
org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure.distance(SquaredEuclideanDistanceMeasure.java:52)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansClusterer.emitPointProbToCluster(FuzzyKMeansClusterer.java:165)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansMapper.map(FuzzyKMeansMapper.java:45)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansMapper.map(FuzzyKMeansMapper.java:35)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
11/08/25 16:00:08 INFO mapred.JobClient: Task Id :
attempt_201108241756_0001_m_000000_1, Status : FAILED
org.apache.mahout.math.CardinalityException: Required cardinality 1 but got
10
at
org.apache.mahout.math.AbstractVector.getDistanceSquared(AbstractVector.java:251)
at
org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure.distance(SquaredEuclideanDistanceMeasure.java:52)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansClusterer.emitPointProbToCluster(FuzzyKMeansClusterer.java:165)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansMapper.map(FuzzyKMeansMapper.java:45)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansMapper.map(FuzzyKMeansMapper.java:35)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
11/08/25 16:00:14 INFO mapred.JobClient: Task Id :
attempt_201108241756_0001_m_000000_2, Status : FAILED
org.apache.mahout.math.CardinalityException: Required cardinality 1 but got
10
at
org.apache.mahout.math.AbstractVector.getDistanceSquared(AbstractVector.java:251)
at
org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure.distance(SquaredEuclideanDistanceMeasure.java:52)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansClusterer.emitPointProbToCluster(FuzzyKMeansClusterer.java:165)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansMapper.map(FuzzyKMeansMapper.java:45)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansMapper.map(FuzzyKMeansMapper.java:35)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
11/08/25 16:00:26 INFO mapred.JobClient: Job complete: job_201108241756_0001
11/08/25 16:00:26 INFO mapred.JobClient: Counters: 7
11/08/25 16:00:26 INFO mapred.JobClient: Job Counters
11/08/25 16:00:26 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=26728
11/08/25 16:00:26 INFO mapred.JobClient: Total time spent by all
reduces waiting after reserving slots (ms)=0
11/08/25 16:00:26 INFO mapred.JobClient: Total time spent by all maps
waiting after reserving slots (ms)=0
11/08/25 16:00:26 INFO mapred.JobClient: Launched map tasks=4
11/08/25 16:00:26 INFO mapred.JobClient: Data-local map tasks=4
11/08/25 16:00:26 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
11/08/25 16:00:26 INFO mapred.JobClient: Failed map tasks=1
Exception in thread "main" java.lang.InterruptedException: Fuzzy K-Means
Iteration failed processing test/clusters/clusters-0/part-randomSeed
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.runIteration(FuzzyKMeansDriver.java:252)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersMR(FuzzyKMeansDriver.java:421)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(FuzzyKMeansDriver.java:345)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:295)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:125)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at
org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.main(FuzzyKMeansDriver.java:66)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:616)
at
org.apache.hadoop.util.ProgramDriver$ProgramDescription.invoke(ProgramDriver.java:68)
at
org.apache.hadoop.util.ProgramDriver.driver(ProgramDriver.java:139)
at org.apache.mahout.driver.MahoutDriver.main(MahoutDriver.java:188)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:616)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Please let me know if I miss out anything
[1]:
http://stackoverflow.com/questions/7075045/manipulating-data-to-matrix-like-format-in-php
[2]:
http://dedcode.wordpress.com/2010/11/20/k-means-clustering-with-hadoop-and-mahout/
Cross-posted to stackoverflow
<http://stackoverflow.com/questions/7186663/clustering-fkmeans-with-mahout-using-clojure>