I ran itemsimilarity on the epinions data on a small 3 machine cluster and 
found that it ran in 20 some minutes using the old hadoop version, while the 
spark-itemsimilarity ran in 2+ minutes so it looks like 10x the performance. 
This is using the same “cheats” in both cases. 

100x is not likely to be seen except in special cases that fit hadoop’s 
mapreduce poorly and Spark’s much better. For example if a pipeline is long, 
requiring lots of serialization and IO on hadoop and none using Spark’s 
in-memory RDDs. I doubt we’ll see that with RSJ, which is fairly simple.

Other comments inline

> On Aug 5, 2014, at 9:38 PM, Reinis Vicups <[email protected]> wrote:
> 
> Yes, that would make useage of threshold and the expected(?) quality of the 
> result better.

It may also have some of the same side effects as the current threshold, like 
having no similar items for some. But it does produce an average “quality” 
better than other sub-sampling techniques.

> 
> The configuration tho was not the main point for us to try to implement 
> RowSimilarityJob with spark, but rather
> - promise of spark to work 100x times faster than hadoops map-reduce,
> - existence of "cheats" in current RowSimilarityJob (observations, threshold, 
> max similarities per row), we are willing to have a super performant "loss 
> less" solution.
> 
> With our initial implementation we observe that spark implementation is 
> performing worse than original RSJ and I was wondering if community could 
> hint us on why so.
> 
> I am making couple of guesses myself:
> - we're not using any checkpointing, caching, sharing, broadcasting of 
> intermediate results and I am kinda unsure if that is applicable to RSJ;

It does apply, Mahout uses caching to keep some data in memory and there is an 
optimizer (BLAS type) build in to the DRM math calculations. Scala supports 
lazy evaluation, which allows the optimizer to work invisibly in the 
background. This means a “slim” A’A to seed the item similarity calc.

> - we're not using kryoserializer, possibly that could have some reasonable 
> impact on performance;

We are using kryo. Can’t speak to the speed comparison but maybe others can. 

> - we're using only standard scala collections. I did use breeze.linalg 
> SparseVector at some point but I couldn't observe any performance increase. 
> Do you guys have any experience on using specialized linalg collections 
> versus LinearSeq of scala versus IndexedSeq of scala?

Mahout uses Scala extensions and conversions to make Scala style use of the 
Mahout Java optimized SparseVector types and iterators. The DRM has been 
completely implemented for Spark and has some optimizations. One recent 
optimization was in the way non-zero elements of a sparse vector are iterated.

If you’d like to use any of this I suggest reading the docs on the Mahout Scala 
DSL here: http://mahout.apache.org/users/sparkbindings/home.html and the full 
PDF here: http://mahout.apache.org/users/sparkbindings/ScalaSparkBindings.pdf

You can even fire up the Spark/Scala shell and play with your code 
interactively.

> 
> thanks
> reinis
> 
> On 06.08.2014 03:04, Pat Ferrel wrote:
>> Cooccurrence is implemented on Spark and a ticket for doing the 
>> RowSimilarityJob was entered today. Should be fairly easy since 
>> ItemSimilarity is implemented. Will use only LLR for now. The driver reads 
>> text files (a text version of DRM).
>> 
>> If you want to wrap Spark cooccurrence yourself you can read in a DRM from 
>> the Mahout hadoop tf-idf code’s sequence file (it’s a one liner) If you want 
>> to try any of this let me know and I’ll give some pointers.
>> 
>> I’ve been wondering about the threshold too. Seems like an absolute 
>> threshold is very hard to use. Was thinking that some % of values might be a 
>> better measure. Imagine you want 100 similar items per item, so 100 per row 
>> in the similarity matrix—set 100 per row as your threshold in this future 
>> Spark-RSJ. But you’d get 100 on average, it wouldn’t be guaranteed for any 
>> particular item. Over all it would be 100*number of rows that you’d get and 
>> they’d be the items with highest similarity scores of the total created. Do 
>> you think this would help?
>> 
>> On Aug 5, 2014, at 4:50 PM, Reinis Vicups <[email protected]> wrote:
>> 
>> Hi,
>> 
>> we have had good results with RowSimilarityJob in our Use Case with some 
>> quality loss due to pruning and decline in performance if setting thresholds 
>> too high/low.
>> 
>> The current work on mahout integration with spark done by dlyubimov, pferrel 
>> and others is just amazing (although I would love to see more inline 
>> comments especially for classes like SparkEngine.scala or what the heck are 
>> those in org.apache.mahout.sparkbindings.blas :D )!
>> 
>> Either I haven't looked hard enough, or the RowSimilarityJob for spark is 
>> not implemented just yet (will it be at some point)? So my two colleagues 
>> (hi Nadine, hi Wolfgang) and me attempted to replicate RowSimilarityJob (to 
>> some extent) in spark.
>> 
>> I am providing the very first and overly simplified version of 
>> implementation below and would greatly appreciate any feedback on whatever 
>> aspect of out implementation you would like to comment on!
>> 
>> Some fun facts:
>> We have roughly 10k support tickets containing textual information (title, 
>> description, comments of participating agents, solutions, attachment texts 
>> that we are also extracting). The raw data we import from relational DB into 
>> HDFS is roughly 1.5 GB. The resulting TFIDF data is roughly 80 MB and 
>> contains 10k vectors with the dimensionality of roughly 300k.
>> 
>> The job is executed on our test cluster consisting of dual core machines 
>> with 32GB ram for 1 master node and 16GB for 4 worker nodes. As you will see 
>> in implementation, currently we do no pruning, no limitations of 
>> observations, no thresholds - so it runs on whole corpus. And completes in 
>> 18 to 22 minutes. We have observed some non-linear horizontal scalability 
>> (started with one node, then three, then four and the execution time reduced 
>> slightly). The mahout's RowSimilarityJob with maxed out observations 
>> completes in 12 minutes for the same data set.
>> 
>> "--similarityClassname", "SIMILARITY_COSINE",
>> "-m", "100000",
>> "--maxObservationsPerRow", "900000",
>> "--maxObservationsPerColumn", "900000",
>> "-ess", "true"
>> 
>> Thanks, guys, for your time reading this and, again, any feedback 
>> (especially on how to improve the implementation ;) ) is greatly appreciated
>> reinis
>> 
>> ---
>> The code:
>> 
>> import com.google.common.io.ByteStreams
>> import our.hbase.HBaseConversions._
>> import org.apache.hadoop.hbase.HBaseConfiguration
>> import org.apache.hadoop.hbase.client.Result
>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>> import org.apache.hadoop.hbase.util.Bytes
>> import org.apache.mahout.math.VectorWritable
>> import org.apache.spark.SparkContext.rddToPairRDDFunctions
>> import org.apache.spark.{SparkConf, SparkContext}
>> 
>> import scala.annotation.tailrec
>> import scala.collection.LinearSeq
>> 
>> object RowSimilaritySparkJob {
>> def main(args: Array[String]) {
>>    val sConf = new SparkConf().setAppName("RowSimilaritySparkJob ")
>>    val sc = new SparkContext(sConf)
>> 
>>    @transient val hConf = HBaseConfiguration.create()
>>    val fullTableName = "sparsevectortfidfvectors"
>>    hConf.set(TableInputFormat.INPUT_TABLE, fullTableName)
>> 
>>    val rdd = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat], 
>> classOf[ImmutableBytesWritable], classOf[Result])
>> 
>>    val documentRows = rdd.map { get =>
>>      // read-in tfidf from HBase
>>      val vectorWritable = new VectorWritable()
>>      val vectorByteArray = get._2.getValue("d", "vector")
>> vectorWritable.readFields(ByteStreams.newDataInput(vectorByteArray))
>>      val vector = vectorWritable.get()
>> 
>>      // output tuple (documentId, tfidfVector)
>>      (bytesToInt(get._1.get()), vector)
>>    }
>> 
>>    /* Stage 1 : normalize & transpose */
>>    val termRows = documentRows
>> 
>>       // we do no normalization since we use cosine similarity and 
>> seq2sparse already generates normalized tfidf values
>>      // write in what documents the given termid appears in (term-document 
>> co-occurrence) as a tuple (termid, (documentid, tfidf of term in document))
>>      .flatMap { case(documentId, tfidfVector) =>
>>        import scala.collection.JavaConversions._
>>        for( element <- tfidfVector.nonZeroes() ) yield {
>>          // output multiple tuples (termid, (documentid, tfidf of term in 
>> document))
>>          element.index() -> (documentId -> element.get())
>>        }
>>      }
>> 
>>      // combine term-document co-occurrence fragment by merging the vectors 
>> and thus creating
>>      // full vector (or in our case linear sequence) of documents the termid 
>> appears in
>>      // (termid -> Seq((documentId -> term-tfidf in document), 
>> (anotherDocumentId -> term-tfidf in that document), ...))
>>      .combineByKey[LinearSeq[(Int, Double)]](
>>        (x: (Int, Double)) => { LinearSeq[(Int, Double)](x._1 -> x._2) },
>>        (v: LinearSeq[(Int, Double)], t: (Int, Double)) => { v :+ t },
>>        (combiner: LinearSeq[(Int, Double)], combinee: LinearSeq[(Int, 
>> Double)]) => { combiner ++ combinee // concatenating Seq's from different 
>> workers?},
>>        40 // this is just number of partitions depending on available 
>> hardware (cpus))
>> 
>>      /* Stage 2 : co-occurrence triangular matrix */
>>      // write upper half of the document co-occurrence matrix (triangular 
>> matrix) one matrix per termid.
>>      // we take vectors (sequences) generated in the previous step and 
>> traverse vector dimensions (containing tuple documentid -> tfidf)
>>      // building triangular matrix elements (rowid -> (colid -> row-col 
>> value)). Here the rowid and colid are documentids from the vector
>>      // of the term and row-col value is term-tfidf of both documents 
>> multiplied with each other (in our case cosine similarity).
>>      // result is sequence of tuples (leftDocumentId -> 
>> LinearSeq((rightDocumentId -> left-tfidf * right-tfidf), 
>> (anotherRightDocumentId -> left-tfidf * right-tfidf))
>>      // representing rows of upper triangular matrix, one per termid
>>      .flatMap { x: (Int, LinearSeq[(Int, Double)]) => 
>> triangularCoOccurrenceMatrix(LinearSeq[((Int, Int), Double)](), x._2) }
>> 
>>      // tuples from the previous step are reduced by key (leftDocumentId, 
>> rightDocumentId) over all termid matrixes.
>>      // since most of the documents contain multiple terms, there will be 
>> rows for given leftdocumentid, rightdocumentid in multiple matrixes
>>      // reduce all leftDocumentId, rightDocuemtnId co-occurrences over all 
>> common terms (it is ensured through reducing by key) summing the 
>> similarities up
>>      // this results into sequence representing triangular matrix aggregated 
>> by (leftDocumentId, rightDocumentId):
>>      // ((leftDocumentId, rightDocumentId), sum of all term-tfidfs), 
>> ((leftDocumentId, anotherRightDocId), sum of all term-tfidfs), ...)
>>      .reduceByKey(_ + _, 40)
>> 
>>      /* Stage 3 : create similarities */
>>      // symetrify document co-occurrence matrix by generating lower 
>> triangular matrix from the upper triangular matrix through
>>      // swapping left document id and right document id (original 
>> (leftdocid, rightdocid), tfidfsum) is retained aswell ofcourse
>>      .flatMap { x: ((Int, Int), Double) =>
>>          LinearSeq(x, ((x._1._2, x._1._1), x._2))
>>      }
>> 
>>      // save in hadoop
>>      .saveAsTextFile("row-similarity-test")
>> }
>> 
>> /**
>> * Produces LinearSeq representing triangular matrix of document 
>> co-occurrences
>> * @param accumulator is a temporary variable used by recursion (enables 
>> tail-recursion)
>> * @param corpus shall contain at least two elements (2 document 
>> occurrences), otherwise an empty Seq will be returned
>> * @return Seq of document co-occurrences formated as ((leftDocumentId -> 
>> rightDocumentId) -> leftTFIDF * rightTFIDF)
>> */
>> @tailrec private def triangularCoOccurrenceMatrix(accumulator: 
>> LinearSeq[((Int, Int), Double)], corpus: LinearSeq[(Int, Double)]): 
>> LinearSeq[((Int, Int), Double)] = corpus match {
>>  case h +: Seq() => accumulator
>>  case (h +: t) => triangularCoOccurrenceMatrix(accumulator ++ (for (e <- t) 
>> yield ((h._1, e._1), e._2 * h._2)), t)
>> }
>> }
> 
> 

Reply via email to