That was a bit of a brute force search, so I changed the code to use a UDF
to create the dot product between the two IDF vectors, and do a sort on the
new column.
package com.ss.ml.clustering
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.ml.feature.{IDF, Tokenizer, HashingTF}
import org.apache.spark.ml.linalg.Vector
object ClusteringBasics extends App {
val spark = SparkSession.builder().appName("Clustering
Basics").master("local").getOrCreate()
import spark.implicits._
val df = spark.read.option("header", "false").csv("data")
val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
val df1 = tf.transform(tk.transform(df))
val idfs = idf.fit(df1).transform(df1)
val nn = nearestNeighbour("<http://dbpedia.org/resource/Barack_Obama>", idfs)
println(nn)
def nearestNeighbour(uri: String, ds: DataFrame) : String = {
val tfIdfSrc = ds.filter(s"_c0 ==
'$uri'").take(1)(0).getAs[Vector]("tf-idf")
def dorProduct(vectorA: Vector) = {
var dp = 0.0
var index = vectorA.size - 1
for (i <- 0 to index) {
dp += vectorA(i) * tfIdfSrc(i)
}
dp
}
val dpUdf = udf((v1: Vector, v2: Vector) => dorProduct(v1))
ds.filter(s"_c0 != '$uri'").withColumn("dp",
dpUdf('tf-idf)).sort("dp").take(1)(0).getString(1)
}
}
However, that is generating the exception below,
Exception in thread "main" java.lang.RuntimeException: Unsupported literal
type class org.apache.spark.ml.feature.IDF idf_e49381a285dd
at
org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
at org.apache.spark.sql.functions$.lit(functions.scala:101)
at org.apache.spark.sql.Column.$minus(Column.scala:672)
at
com.ss.ml.clustering.ClusteringBasics$.nearestNeighbour(ClusteringBasics.scala:36)
at
com.ss.ml.clustering.ClusteringBasics$.delayedEndpoint$com$ss$ml$clustering$ClusteringBasics$1(ClusteringBasics.scala:22)
at
com.ss.ml.clustering.ClusteringBasics$delayedInit$body.apply(ClusteringBasics.scala:8)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.ss.ml.clustering.ClusteringBasics$.main(ClusteringBasics.scala:8)
at com.ss.ml.clustering.ClusteringBasics.main(ClusteringBasics.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
On Sun, Nov 13, 2016 at 10:56 PM, Meeraj Kunnumpurath <
[email protected]> wrote:
> This is what I have done, is there a better way of doing this?
>
> val df = spark.read.option("header", "false").csv("data")
>
>
> val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>
> val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>
> val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>
>
> val df1 = tf.transform(tk.transform(df))
>
> val idfs = idf.fit(df1).transform(df1)
>
>
> println(nearestNeighbour("http://dbpedia.org/resource/Barack_Obama",
> idfs))
>
>
> def nearestNeighbour(uri: String, ds: DataFrame) : String = {
>
> var res : Row = null
>
> var metric : Double = 0
>
> val tfIdfSrc = ds.filter(s"_c0 == '$uri'").take(1)(0).getAs[
> Vector]("tf-idf")
>
> ds.filter("_c0 != '" + uri + "'").foreach { r =>
>
> val tfIdfDst = r.getAs[Vector]("tf-idf")
>
> val dp = dorProduct(tfIdfSrc, tfIdfDst)
>
> if (dp > metric) {
>
> res = r
>
> metric = dp
>
> }
>
> }
>
> return res.getAs[String]("_c1")
>
> }
>
>
> def cosineSimilarity(vectorA: Vector, vectorB: Vector) = {
>
> var dotProduct = 0.0
>
> var normA = 0.0
>
> var normB = 0.0
>
> var index = vectorA.size - 1
>
> for (i <- 0 to index) {
>
> dotProduct += vectorA(i) * vectorB(i)
>
> normA += Math.pow(vectorA(i), 2)
>
> normB += Math.pow(vectorB(i), 2)
>
> }
>
> (dotProduct / (Math.sqrt(normA) * Math.sqrt(normB)))
>
> }
>
>
> def dorProduct(vectorA: Vector, vectorB: Vector) = {
>
> var dp = 0.0
>
> var index = vectorA.size - 1
>
> for (i <- 0 to index) {
>
> dp += vectorA(i) * vectorB(i)
>
> }
>
> dp
>
> }
>
> On Sun, Nov 13, 2016 at 7:04 PM, Meeraj Kunnumpurath <
> [email protected]> wrote:
>
>> Hello,
>>
>> I have a dataset containing TF-IDF vectors for a corpus of documents. How
>> do I perform a nearest neighbour search on the dataset, using cosine
>> similarity?
>>
>> val df = spark.read.option("header", "false").csv("data")
>>
>> val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words")
>>
>> val tf = new HashingTF().setInputCol("words").setOutputCol("tf")
>>
>> val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf")
>>
>> val df1 = tf.transform(tk.transform(df))
>>
>> idf.fit(df1).transform(df1).select("tf-idf").show(10)
>> Thank you
>>
>> --
>> *Meeraj Kunnumpurath*
>>
>>
>> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*
>>
>> *00 971 50 409 [email protected]
>> <[email protected]>*
>>
>
>
>
> --
> *Meeraj Kunnumpurath*
>
>
> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*
>
> *00 971 50 409 [email protected] <[email protected]>*
>
--
*Meeraj Kunnumpurath*
*Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*
*00 971 50 409 [email protected] <[email protected]>*