Product similarity with TF/IDF and Cosine similarity (DIMSUM)

2016-01-30 Thread Alan Prando
Hi Folks!

I am trying to implement a spark job to calculate the similarity of my database 
products, using only name and descriptions.
I would like to use TF-IDF to represent my text data and cosine similarity to 
calculate all similarities.

My goal is, after job completes, get all similarities as a list. 
For example:
Prod1 = ((Prod2, 0.98), (Prod3, 0.88))
Prod2 = ((Prod1, 0.98), (Prod4, 0.53))
Prod3 = ((Prod1, 0.98))
Prod4 = ((Prod1, 0.53))

However, I am new with Spark and I am having issues to use understanding what 
cosine similarity returns!

My code:
val documents: RDD[Seq[String]] = sc.textFile(filename).map(_.split(" 
").toSeq)

val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documents)
tf.cache()

val idf = new IDF(minDocFreq = 2).fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)

val mat = new RowMatrix(tfidf)

// Compute similar columns perfectly, with brute force.
val exact = mat.columnSimilarities()

// Compute similar columns with estimation using DIMSUM
val approx = mat.columnSimilarities(0.1)

val exactEntries = exact.entries.map { case MatrixEntry(i, j, u) => ((i, 
j), u) }
val approxEntries = approx.entries.map { case MatrixEntry(i, j, v) => ((i, 
j), v) }

The file is just products name and description in each row.

The return I got:
approxEntries.first()
res18: ((Long, Long), Double) = ((1638,966248),0.632455532033676)

How can I figure out  what row this return is about?

Thanks in advance! =]



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark saveAsText file size

2014-11-24 Thread Alan Prando
Hi Folks!

I'm running a spark JOB on a cluster with 9 slaves and 1 master (250GB RAM,
32 cores each and 1TB of storage each).

This job generates 1.200 TB of data on a RDD with 1200 partitions.
When I call saveAsTextFile(hdfs://...), spark creates 1200 files named
part-000* on HDFS's folder. However, just a few files have content (~450
files has 2.3GB) and all others with no content (0 bytes).

Is there any explanation for this file size (2.3GB)?
Shouldn't spark saves 1200 files with 1GB each?

Thanks in advance.

---
Regards,
Alan Vidotti Prando.


MLIB KMeans Exception

2014-11-20 Thread Alan Prando
Hi Folks!

I'm running a Python Spark job on a cluster with 1 master and 10 slaves
(64G RAM and 32 cores each machine).
This job reads a file with 1.2 terabytes and 1128201847 lines on HDFS and
call Kmeans method as following:

# SLAVE CODE - Reading features from HDFS
def get_features_from_images_hdfs(self, timestamp):
def shallow(lista):
for row in lista:
for col in row:
yield col

features = self.sc.textFile(hdfs://999.999.99:/FOLDER/)
return features.map(lambda row: eval(row)[1]).mapPartitions(shallow)


 # SLAVE CODE - Extract centroids with Kmeans
 def extract_centroids_on_slaves(self, features, kmeans_clusters,
kmeans_max_iterations, kmeans_mode):

#Error line
clusters = KMeans.train(
features,
kmeans_clusters,
maxIterations=kmeans_max_iterations,
runs=1,
initializationMode=kmeans_mode
)
return clusters.clusterCenters

# MASTER CODE - Main
features = get_features_from_images_hdfs(kwargs.get(timestamp))
kmeans_clusters = 1
kmeans_max_interations  = 13
kmeans_mode = random

centroids = extract_centroids_on_slaves(
features,
kmeans_clusters,
kmeans_max_interations,
   kmeans_mode
)

centroids_rdd = sc.parallelize(centroids)


I'm getting the following exception when I call KMeans.train:

 14/11/20 13:19:34 INFO TaskSetManager: Starting task 2539.0 in stage 0.0
(TID 2327, ip-172-31-7-120.ec2.internal, NODE_LOCAL, 1649 bytes)
14/11/20 13:19:34 WARN TaskSetManager: Lost task 2486.0 in stage 0.0 (TID
2257, ip-172-31-7-120.ec2.internal): java.io.IOException: Filesystem closed
org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:765)

org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:783)
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:844)
java.io.DataInputStream.read(DataInputStream.java:100)
org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)

org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)

org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246)

org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:220)
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:189)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)

org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)

org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)

org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1314)

org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2174 in memory on
ip-172-31-7-121.ec2.internal:57211 (size: 5.3 MB, free: 23.0 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2349 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 47.6 MB, free: 23.5 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2386 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 46.0 MB, free: 23.5 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2341 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 47.3 MB, free: 23.4 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2279 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 5.1 MB, free: 23.4 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2324 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 46.1 MB, free: 23.4 GB)
14/11/20 13:19:34 INFO TaskSetManager: Starting task 2525.0 in stage 0.0
(TID 2328, ip-172-31-7-122.ec2.internal, NODE_LOCAL, 1649 bytes)
14/11/20 13:19:34 INFO TaskSetManager: Finished task 2351.0 in stage 0.0
(TID 2103) in 77554 ms on ip-172-31-7-122.ec2.internal (1484/10220)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_2_2373 in memory on
ip-172-31-7-122.ec2.internal:37861 (size: 48.1 MB, free: 23.2 GB)
14/11/20 13:19:34 INFO BlockManagerInfo: Added rdd_4_2319 in memory on
ip-172-31-7-124.ec2.internal:56258 (size: 5.2 MB, free: 23.4 GB)
14/11/20 13:19:35 INFO BlockManagerInfo: Added rdd_4_2302 in memory on
ip-172-31-7-122.ec2.internal:37861 

Re: Spark on YARN

2014-11-19 Thread Alan Prando
Hi all!

Thanks for answering!

@Sean, I tried to run with 30 executor-cores , and 1 machine still without
processing.
@Vanzin, I checked RM's web UI, and all nodes were detecteds and RUNNING.
The interesting fact is that available
memory and available core of 1 node was different of other 2, with just 1
available core and 1 available gig ram.

@All, I created a new cluster with 10 slaves and 1 master, and now 9 of my
slaves are working, and 1 still without processing.

It's fine by me! I'm just wondering why YARN's doing it... Does anyone know
the answer?

2014-11-18 16:18 GMT-02:00 Sean Owen so...@cloudera.com:

 My guess is you're asking for all cores of all machines but the driver
 needs at least one core, so one executor is unable to find a machine to fit
 on.
 On Nov 18, 2014 7:04 PM, Alan Prando a...@scanboo.com.br wrote:

 Hi Folks!

 I'm running Spark on YARN cluster installed with Cloudera Manager Express.
 The cluster has 1 master and 3 slaves, each machine with 32 cores and 64G
 RAM.

 My spark's job is working fine, however it seems that just 2 of 3 slaves
 are working (htop shows 2 slaves working 100% on 32 cores, and 1 slaves
 without any processing).

 I'm using this command:
 ./spark-submit --master yarn --num-executors 3 --executor-cores 32
  --executor-memory 32g feature_extractor.py -r 390

 Additionaly, spark's log testify communications with 2 slaves only:
 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor:
 Actor[akka.tcp://sparkExecutor@ip-172-31-13-180.ec2.internal:33177/user/Executor#-113177469]
 with ID 1
 14/11/18 17:19:38 INFO RackResolver: Resolved
 ip-172-31-13-180.ec2.internal to /default
 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor:
 Actor[akka.tcp://sparkExecutor@ip-172-31-13-179.ec2.internal:51859/user/Executor#-323896724]
 with ID 2
 14/11/18 17:19:38 INFO RackResolver: Resolved
 ip-172-31-13-179.ec2.internal to /default
 14/11/18 17:19:38 INFO BlockManagerMasterActor: Registering block manager
 ip-172-31-13-180.ec2.internal:50959 with 16.6 GB RAM
 14/11/18 17:19:39 INFO BlockManagerMasterActor: Registering block manager
 ip-172-31-13-179.ec2.internal:53557 with 16.6 GB RAM
 14/11/18 17:19:51 INFO YarnClientSchedulerBackend: SchedulerBackend is
 ready for scheduling beginning after waiting
 maxRegisteredResourcesWaitingTime: 3(ms)

 Is there a configuration to call spark's job on YARN cluster with all
 slaves?

 Thanks in advance! =]

 ---
 Regards
 Alan Vidotti Prando.





Spark on YARN

2014-11-18 Thread Alan Prando
Hi Folks!

I'm running Spark on YARN cluster installed with Cloudera Manager Express.
The cluster has 1 master and 3 slaves, each machine with 32 cores and 64G
RAM.

My spark's job is working fine, however it seems that just 2 of 3 slaves
are working (htop shows 2 slaves working 100% on 32 cores, and 1 slaves
without any processing).

I'm using this command:
./spark-submit --master yarn --num-executors 3 --executor-cores 32
 --executor-memory 32g feature_extractor.py -r 390

Additionaly, spark's log testify communications with 2 slaves only:
14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor:
Actor[akka.tcp://sparkExecutor@ip-172-31-13-180.ec2.internal:33177/user/Executor#-113177469]
with ID 1
14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-180.ec2.internal
to /default
14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor:
Actor[akka.tcp://sparkExecutor@ip-172-31-13-179.ec2.internal:51859/user/Executor#-323896724]
with ID 2
14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-179.ec2.internal
to /default
14/11/18 17:19:38 INFO BlockManagerMasterActor: Registering block manager
ip-172-31-13-180.ec2.internal:50959 with 16.6 GB RAM
14/11/18 17:19:39 INFO BlockManagerMasterActor: Registering block manager
ip-172-31-13-179.ec2.internal:53557 with 16.6 GB RAM
14/11/18 17:19:51 INFO YarnClientSchedulerBackend: SchedulerBackend is
ready for scheduling beginning after waiting
maxRegisteredResourcesWaitingTime: 3(ms)

Is there a configuration to call spark's job on YARN cluster with all
slaves?

Thanks in advance! =]

---
Regards
Alan Vidotti Prando.


Reading from Hbase using python

2014-11-12 Thread Alan Prando
Hi all,

I'm trying to read an hbase table using this an example from github (
https://github.com/apache/spark/blob/master/examples/src/main/python/hbase_inputformat.py),
however I have two qualifiers in a column family.

Ex.:

 ROW COLUMN+CELL  row1 column=f1:1, timestamp=1401883411986, value=value1  row1
column=f1:2, timestamp=1401883415212, value=value2  row2 column=f1:1,
timestamp=1401883417858, value=value3  row3 column=f1:1,
timestamp=1401883420805, value=value4
When I run the code hbase_inputformat.py, the following loop print row1
just once:

output = hbase_rdd.collect()  for (k, v) in output:  print (k, v)
Am I doing anything wrong?

Thanks in advance.