You may be able to benefit from Soundcloud's open source implementation, either as a solution or as a reference implementation.
https://github.com/soundcloud/cosine-lsh-join-spark Thanks, Kevin On Sun, Nov 13, 2016 at 2:07 PM, Meeraj Kunnumpurath < [email protected]> wrote: > That was a bit of a brute force search, so I changed the code to use a UDF > to create the dot product between the two IDF vectors, and do a sort on the > new column. > > package com.ss.ml.clustering > > import org.apache.spark.sql.{DataFrame, SparkSession} > import org.apache.spark.sql.functions._ > import org.apache.spark.ml.feature.{IDF, Tokenizer, HashingTF} > import org.apache.spark.ml.linalg.Vector > > object ClusteringBasics extends App { > > val spark = SparkSession.builder().appName("Clustering > Basics").master("local").getOrCreate() > import spark.implicits._ > > val df = spark.read.option("header", "false").csv("data") > > val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words") > val tf = new HashingTF().setInputCol("words").setOutputCol("tf") > val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf") > > val df1 = tf.transform(tk.transform(df)) > val idfs = idf.fit(df1).transform(df1) > > val nn = nearestNeighbour("<http://dbpedia.org/resource/Barack_Obama>", > idfs) > println(nn) > > def nearestNeighbour(uri: String, ds: DataFrame) : String = { > val tfIdfSrc = ds.filter(s"_c0 == > '$uri'").take(1)(0).getAs[Vector]("tf-idf") > def dorProduct(vectorA: Vector) = { > var dp = 0.0 > var index = vectorA.size - 1 > for (i <- 0 to index) { > dp += vectorA(i) * tfIdfSrc(i) > } > dp > } > val dpUdf = udf((v1: Vector, v2: Vector) => dorProduct(v1)) > ds.filter(s"_c0 != '$uri'").withColumn("dp", > dpUdf('tf-idf)).sort("dp").take(1)(0).getString(1) > } > > } > > > However, that is generating the exception below, > > Exception in thread "main" java.lang.RuntimeException: Unsupported literal > type class org.apache.spark.ml.feature.IDF idf_e49381a285dd > at org.apache.spark.sql.catalyst.expressions.Literal$.apply( > literals.scala:57) > at org.apache.spark.sql.functions$.lit(functions.scala:101) > at org.apache.spark.sql.Column.$minus(Column.scala:672) > at com.ss.ml.clustering.ClusteringBasics$.nearestNeighbour( > ClusteringBasics.scala:36) > at com.ss.ml.clustering.ClusteringBasics$.delayedEndpoint$com$ss$ml$ > clustering$ClusteringBasics$1(ClusteringBasics.scala:22) > at com.ss.ml.clustering.ClusteringBasics$delayedInit$ > body.apply(ClusteringBasics.scala:8) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at scala.runtime.AbstractFunction0.apply$mcV$ > sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.generic.TraversableForwarder$class. > foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at com.ss.ml.clustering.ClusteringBasics$.main(ClusteringBasics.scala:8) > at com.ss.ml.clustering.ClusteringBasics.main(ClusteringBasics.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > > On Sun, Nov 13, 2016 at 10:56 PM, Meeraj Kunnumpurath < > [email protected]> wrote: > >> This is what I have done, is there a better way of doing this? >> >> val df = spark.read.option("header", "false").csv("data") >> >> >> val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words") >> >> val tf = new HashingTF().setInputCol("words").setOutputCol("tf") >> >> val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf") >> >> >> val df1 = tf.transform(tk.transform(df)) >> >> val idfs = idf.fit(df1).transform(df1) >> >> >> println(nearestNeighbour("http://dbpedia.org/resource/Barack_Obama", >> idfs)) >> >> >> def nearestNeighbour(uri: String, ds: DataFrame) : String = { >> >> var res : Row = null >> >> var metric : Double = 0 >> >> val tfIdfSrc = ds.filter(s"_c0 == '$uri'").take(1)(0).getAs[Vect >> or]("tf-idf") >> >> ds.filter("_c0 != '" + uri + "'").foreach { r => >> >> val tfIdfDst = r.getAs[Vector]("tf-idf") >> >> val dp = dorProduct(tfIdfSrc, tfIdfDst) >> >> if (dp > metric) { >> >> res = r >> >> metric = dp >> >> } >> >> } >> >> return res.getAs[String]("_c1") >> >> } >> >> >> def cosineSimilarity(vectorA: Vector, vectorB: Vector) = { >> >> var dotProduct = 0.0 >> >> var normA = 0.0 >> >> var normB = 0.0 >> >> var index = vectorA.size - 1 >> >> for (i <- 0 to index) { >> >> dotProduct += vectorA(i) * vectorB(i) >> >> normA += Math.pow(vectorA(i), 2) >> >> normB += Math.pow(vectorB(i), 2) >> >> } >> >> (dotProduct / (Math.sqrt(normA) * Math.sqrt(normB))) >> >> } >> >> >> def dorProduct(vectorA: Vector, vectorB: Vector) = { >> >> var dp = 0.0 >> >> var index = vectorA.size - 1 >> >> for (i <- 0 to index) { >> >> dp += vectorA(i) * vectorB(i) >> >> } >> >> dp >> >> } >> >> On Sun, Nov 13, 2016 at 7:04 PM, Meeraj Kunnumpurath < >> [email protected]> wrote: >> >>> Hello, >>> >>> I have a dataset containing TF-IDF vectors for a corpus of documents. >>> How do I perform a nearest neighbour search on the dataset, using cosine >>> similarity? >>> >>> val df = spark.read.option("header", "false").csv("data") >>> >>> val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words") >>> >>> val tf = new HashingTF().setInputCol("words").setOutputCol("tf") >>> >>> val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf") >>> >>> val df1 = tf.transform(tk.transform(df)) >>> >>> idf.fit(df1).transform(df1).select("tf-idf").show(10) >>> Thank you >>> >>> -- >>> *Meeraj Kunnumpurath* >>> >>> >>> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597* >>> >>> *00 971 50 409 [email protected] >>> <[email protected]>* >>> >> >> >> >> -- >> *Meeraj Kunnumpurath* >> >> >> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597* >> >> *00 971 50 409 [email protected] >> <[email protected]>* >> > > > > -- > *Meeraj Kunnumpurath* > > > *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597* > > *00 971 50 409 [email protected] <[email protected]>* >
