Product similarity with TF/IDF and Cosine similarity (DIMSUM)
Hi Folks! I am trying to implement a spark job to calculate the similarity of my database products, using only name and descriptions. I would like to use TF-IDF to represent my text data and cosine similarity to calculate all similarities. My goal is, after job completes, get all similarities as a list. For example: Prod1 = ((Prod2, 0.98), (Prod3, 0.88)) Prod2 = ((Prod1, 0.98), (Prod4, 0.53)) Prod3 = ((Prod1, 0.98)) Prod4 = ((Prod1, 0.53)) However, I am new with Spark and I am having issues to use understanding what cosine similarity returns! My code: val documents: RDD[Seq[String]] = sc.textFile(filename).map(_.split(" ").toSeq) val hashingTF = new HashingTF() val tf: RDD[Vector] = hashingTF.transform(documents) tf.cache() val idf = new IDF(minDocFreq = 2).fit(tf) val tfidf: RDD[Vector] = idf.transform(tf) val mat = new RowMatrix(tfidf) // Compute similar columns perfectly, with brute force. val exact = mat.columnSimilarities() // Compute similar columns with estimation using DIMSUM val approx = mat.columnSimilarities(0.1) val exactEntries = exact.entries.map { case MatrixEntry(i, j, u) => ((i, j), u) } val approxEntries = approx.entries.map { case MatrixEntry(i, j, v) => ((i, j), v) } The file is just products name and description in each row. The return I got: approxEntries.first() res18: ((Long, Long), Double) = ((1638,966248),0.632455532033676) How can I figure out what row this return is about? Thanks in advance! =] - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark saveAsText file size
Hi Folks! I'm running a spark JOB on a cluster with 9 slaves and 1 master (250GB RAM, 32 cores each and 1TB of storage each). This job generates 1.200 TB of data on a RDD with 1200 partitions. When I call saveAsTextFile(hdfs://...), spark creates 1200 files named part-000* on HDFS's folder. However, just a few files have content (~450 files has 2.3GB) and all others with no content (0 bytes). Is there any explanation for this file size (2.3GB)? Shouldn't spark saves 1200 files with 1GB each? Thanks in advance. --- Regards, Alan Vidotti Prando.
MLIB KMeans Exception
Hi Folks! I'm running a Python Spark job on a cluster with 1 master and 10 slaves (64G RAM and 32 cores each machine). This job reads a file with 1.2 terabytes and 1128201847 lines on HDFS and call Kmeans method as following: # SLAVE CODE - Reading features from HDFS def get_features_from_images_hdfs(self, timestamp): def shallow(lista): for row in lista: for col in row: yield col features = self.sc.textFile(hdfs://999.999.99:/FOLDER/) return features.map(lambda row: eval(row)[1]).mapPartitions(shallow) # SLAVE CODE - Extract centroids with Kmeans def extract_centroids_on_slaves(self, features, kmeans_clusters, kmeans_max_iterations, kmeans_mode): #Error line clusters = KMeans.train( features, kmeans_clusters, maxIterations=kmeans_max_iterations, runs=1, initializationMode=kmeans_mode ) return clusters.clusterCenters # MASTER CODE - Main features = get_features_from_images_hdfs(kwargs.get(timestamp)) kmeans_clusters = 1 kmeans_max_interations = 13 kmeans_mode = random centroids = extract_centroids_on_slaves( features, kmeans_clusters, kmeans_max_interations, kmeans_mode ) centroids_rdd = sc.parallelize(centroids) I'm getting the following exception when I call KMeans.train: 14/11/20 13:19:34 INFO TaskSetManager: Starting task 2539.0 in stage 0.0 (TID 2327, ip-172-31-7-120.ec2.internal, NODE_LOCAL, 1649 bytes) 14/11/20 13:19:34 WARN TaskSetManager: Lost task 2486.0 in stage 0.0 (TID 2257, ip-172-31-7-120.ec2.internal): java.io.IOException: Filesystem closed org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:765) org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:783) org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:844) java.io.DataInputStream.read(DataInputStream.java:100) org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246) org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:220) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:189) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1314) org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2174 in memory on ip-172-31-7-121.ec2.internal:57211 (size: 5.3 MB, free: 23.0 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2349 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 47.6 MB, free: 23.5 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2386 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 46.0 MB, free: 23.5 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2341 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 47.3 MB, free: 23.4 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2279 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 5.1 MB, free: 23.4 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2324 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 46.1 MB, free: 23.4 GB) 14/11/20 13:19:34 INFO TaskSetManager: Starting task 2525.0 in stage 0.0 (TID 2328, ip-172-31-7-122.ec2.internal, NODE_LOCAL, 1649 bytes) 14/11/20 13:19:34 INFO TaskSetManager: Finished task 2351.0 in stage 0.0 (TID 2103) in 77554 ms on ip-172-31-7-122.ec2.internal (1484/10220) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2373 in memory on ip-172-31-7-122.ec2.internal:37861 (size: 48.1 MB, free: 23.2 GB) 14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2319 in memory on ip-172-31-7-124.ec2.internal:56258 (size: 5.2 MB, free: 23.4 GB) 14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2302 in memory on ip-172-31-7-122.ec2.internal:37861
Re: Spark on YARN
Hi all! Thanks for answering! @Sean, I tried to run with 30 executor-cores , and 1 machine still without processing. @Vanzin, I checked RM's web UI, and all nodes were detecteds and RUNNING. The interesting fact is that available memory and available core of 1 node was different of other 2, with just 1 available core and 1 available gig ram. @All, I created a new cluster with 10 slaves and 1 master, and now 9 of my slaves are working, and 1 still without processing. It's fine by me! I'm just wondering why YARN's doing it... Does anyone know the answer? 2014-11-18 16:18 GMT-02:00 Sean Owen so...@cloudera.com: My guess is you're asking for all cores of all machines but the driver needs at least one core, so one executor is unable to find a machine to fit on. On Nov 18, 2014 7:04 PM, Alan Prando a...@scanboo.com.br wrote: Hi Folks! I'm running Spark on YARN cluster installed with Cloudera Manager Express. The cluster has 1 master and 3 slaves, each machine with 32 cores and 64G RAM. My spark's job is working fine, however it seems that just 2 of 3 slaves are working (htop shows 2 slaves working 100% on 32 cores, and 1 slaves without any processing). I'm using this command: ./spark-submit --master yarn --num-executors 3 --executor-cores 32 --executor-memory 32g feature_extractor.py -r 390 Additionaly, spark's log testify communications with 2 slaves only: 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-180.ec2.internal:33177/user/Executor#-113177469] with ID 1 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-180.ec2.internal to /default 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-179.ec2.internal:51859/user/Executor#-323896724] with ID 2 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-179.ec2.internal to /default 14/11/18 17:19:38 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-180.ec2.internal:50959 with 16.6 GB RAM 14/11/18 17:19:39 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-179.ec2.internal:53557 with 16.6 GB RAM 14/11/18 17:19:51 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 3(ms) Is there a configuration to call spark's job on YARN cluster with all slaves? Thanks in advance! =] --- Regards Alan Vidotti Prando.
Spark on YARN
Hi Folks! I'm running Spark on YARN cluster installed with Cloudera Manager Express. The cluster has 1 master and 3 slaves, each machine with 32 cores and 64G RAM. My spark's job is working fine, however it seems that just 2 of 3 slaves are working (htop shows 2 slaves working 100% on 32 cores, and 1 slaves without any processing). I'm using this command: ./spark-submit --master yarn --num-executors 3 --executor-cores 32 --executor-memory 32g feature_extractor.py -r 390 Additionaly, spark's log testify communications with 2 slaves only: 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-180.ec2.internal:33177/user/Executor#-113177469] with ID 1 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-180.ec2.internal to /default 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-179.ec2.internal:51859/user/Executor#-323896724] with ID 2 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-179.ec2.internal to /default 14/11/18 17:19:38 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-180.ec2.internal:50959 with 16.6 GB RAM 14/11/18 17:19:39 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-179.ec2.internal:53557 with 16.6 GB RAM 14/11/18 17:19:51 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 3(ms) Is there a configuration to call spark's job on YARN cluster with all slaves? Thanks in advance! =] --- Regards Alan Vidotti Prando.
Reading from Hbase using python
Hi all, I'm trying to read an hbase table using this an example from github ( https://github.com/apache/spark/blob/master/examples/src/main/python/hbase_inputformat.py), however I have two qualifiers in a column family. Ex.: ROW COLUMN+CELL row1 column=f1:1, timestamp=1401883411986, value=value1 row1 column=f1:2, timestamp=1401883415212, value=value2 row2 column=f1:1, timestamp=1401883417858, value=value3 row3 column=f1:1, timestamp=1401883420805, value=value4 When I run the code hbase_inputformat.py, the following loop print row1 just once: output = hbase_rdd.collect() for (k, v) in output: print (k, v) Am I doing anything wrong? Thanks in advance.