LSH-based NN search and similarity join should be out in Spark 2.1 - there's a little work being done still to clear up the APIs and some functionality.
Check out https://issues.apache.org/jira/browse/SPARK-5992 On Mon, 14 Nov 2016 at 16:12, Kevin Mellott <[email protected]> wrote: > You may be able to benefit from Soundcloud's open source implementation, > either as a solution or as a reference implementation. > > https://github.com/soundcloud/cosine-lsh-join-spark > > Thanks, > Kevin > > On Sun, Nov 13, 2016 at 2:07 PM, Meeraj Kunnumpurath < > [email protected]> wrote: > > That was a bit of a brute force search, so I changed the code to use a UDF > to create the dot product between the two IDF vectors, and do a sort on the > new column. > > package com.ss.ml.clustering > > import org.apache.spark.sql.{DataFrame, SparkSession} > import org.apache.spark.sql.functions._ > import org.apache.spark.ml.feature.{IDF, Tokenizer, HashingTF} > import org.apache.spark.ml.linalg.Vector > > object ClusteringBasics extends App { > > val spark = SparkSession.builder().appName("Clustering > Basics").master("local").getOrCreate() > import spark.implicits._ > > val df = spark.read.option("header", "false").csv("data") > > val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words") > val tf = new HashingTF().setInputCol("words").setOutputCol("tf") > val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf") > > val df1 = tf.transform(tk.transform(df)) > val idfs = idf.fit(df1).transform(df1) > > val nn = nearestNeighbour("<http://dbpedia.org/resource/Barack_Obama>", > idfs) > println(nn) > > def nearestNeighbour(uri: String, ds: DataFrame) : String = { > val tfIdfSrc = ds.filter(s"_c0 == > '$uri'").take(1)(0).getAs[Vector]("tf-idf") > def dorProduct(vectorA: Vector) = { > var dp = 0.0 > var index = vectorA.size - 1 > for (i <- 0 to index) { > dp += vectorA(i) * tfIdfSrc(i) > } > dp > } > val dpUdf = udf((v1: Vector, v2: Vector) => dorProduct(v1)) > ds.filter(s"_c0 != '$uri'").withColumn("dp", > dpUdf('tf-idf)).sort("dp").take(1)(0).getString(1) > } > > } > > > However, that is generating the exception below, > > Exception in thread "main" java.lang.RuntimeException: Unsupported literal > type class org.apache.spark.ml.feature.IDF idf_e49381a285dd > at > org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57) > at org.apache.spark.sql.functions$.lit(functions.scala:101) > at org.apache.spark.sql.Column.$minus(Column.scala:672) > at > com.ss.ml.clustering.ClusteringBasics$.nearestNeighbour(ClusteringBasics.scala:36) > at > com.ss.ml.clustering.ClusteringBasics$.delayedEndpoint$com$ss$ml$clustering$ClusteringBasics$1(ClusteringBasics.scala:22) > at > com.ss.ml.clustering.ClusteringBasics$delayedInit$body.apply(ClusteringBasics.scala:8) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at com.ss.ml.clustering.ClusteringBasics$.main(ClusteringBasics.scala:8) > at com.ss.ml.clustering.ClusteringBasics.main(ClusteringBasics.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > > On Sun, Nov 13, 2016 at 10:56 PM, Meeraj Kunnumpurath < > [email protected]> wrote: > > This is what I have done, is there a better way of doing this? > > val df = spark.read.option("header", "false").csv("data") > > > val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words") > > val tf = new HashingTF().setInputCol("words").setOutputCol("tf") > > val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf") > > > val df1 = tf.transform(tk.transform(df)) > > val idfs = idf.fit(df1).transform(df1) > > > println(nearestNeighbour("http://dbpedia.org/resource/Barack_Obama", > idfs)) > > > def nearestNeighbour(uri: String, ds: DataFrame) : String = { > > var res : Row = null > > var metric : Double = 0 > > val tfIdfSrc = ds.filter(s"_c0 == > '$uri'").take(1)(0).getAs[Vector]("tf-idf") > > ds.filter("_c0 != '" + uri + "'").foreach { r => > > val tfIdfDst = r.getAs[Vector]("tf-idf") > > val dp = dorProduct(tfIdfSrc, tfIdfDst) > > if (dp > metric) { > > res = r > > metric = dp > > } > > } > > return res.getAs[String]("_c1") > > } > > > def cosineSimilarity(vectorA: Vector, vectorB: Vector) = { > > var dotProduct = 0.0 > > var normA = 0.0 > > var normB = 0.0 > > var index = vectorA.size - 1 > > for (i <- 0 to index) { > > dotProduct += vectorA(i) * vectorB(i) > > normA += Math.pow(vectorA(i), 2) > > normB += Math.pow(vectorB(i), 2) > > } > > (dotProduct / (Math.sqrt(normA) * Math.sqrt(normB))) > > } > > > def dorProduct(vectorA: Vector, vectorB: Vector) = { > > var dp = 0.0 > > var index = vectorA.size - 1 > > for (i <- 0 to index) { > > dp += vectorA(i) * vectorB(i) > > } > > dp > > } > > On Sun, Nov 13, 2016 at 7:04 PM, Meeraj Kunnumpurath < > [email protected]> wrote: > > Hello, > > I have a dataset containing TF-IDF vectors for a corpus of documents. How > do I perform a nearest neighbour search on the dataset, using cosine > similarity? > > val df = spark.read.option("header", "false").csv("data") > > val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words") > > val tf = new HashingTF().setInputCol("words").setOutputCol("tf") > > val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf") > > val df1 = tf.transform(tk.transform(df)) > > idf.fit(df1).transform(df1).select("tf-idf").show(10) > Thank you > > -- > *Meeraj Kunnumpurath* > > > *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597* > > *00 971 50 409 [email protected] <[email protected]>* > > > > > -- > *Meeraj Kunnumpurath* > > > *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597* > > *00 971 50 409 [email protected] <[email protected]>* > > > > > -- > *Meeraj Kunnumpurath* > > > *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597* > > *00 971 50 409 [email protected] <[email protected]>* > > >
