I've been trying to achieve the same objective, coming up with approaches similar to your method 1 and 2. Method 2 is the slowest for me due to massive amount of data being shuffled around at each matrix operation stage. Method 3 is new to me, so I can't comment much.
I ended up using an approach that is similar to your method 1, which gives reasonable performance in my use case. *#4 Normalizer then UDF (PySpark code)* ``` normaliser = Normalizer(inputCol="vec", outputCol="norm_vec") df_word_norm = normaliser.transform(df_word) dot_udf = F.udf(lambda x,y: float(x.dot(y)), DoubleType()) df_score = df_word_norm.withColumn("score", dot_udf(df_word_norm.norm_vec1, df_word_norm.norm_vec2)) # norm_vec1 and norm_vec2 come from a Cartesian join. Steps to produce them are not shown for brevity. ``` Would be curious to learn how other people solve this problem. Best wishes, Chee Yee On Tue, 24 Sep 2019 at 04:20, Stevens, Clay <clay.stev...@wolterskluwer.com> wrote: > There are several ways I can compute the cosine similarities between a > Spark ML vector to each ML vector in a Spark DataFrame column then sorting > for the highest results. However, I can't come up with a method that is > faster than replacing the `/data/` in a Spark ML Word2Vec model, then using > `.findSynonyms()`. The problem is the Word2Vec model is held entirely in > the driver which can cause memory issues if the data set I want to compare > to gets too big. > > *1.* Is there a more efficient method than the ones I have shown below? > *2.* Could the data for the Word2Vec model be distributed across the > cluster? > *3.* Could the the `.findSynonyms()` [Scala code]( > https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala#L571toL619) > be modified to make a spark sql function that can operate efficiently over > a whole Spark DataFrame? > > > *Methods I have tried:* > > *#1 rdd function:* > ``` > > # vecIn = vector of same dimensions as 'vectors' column > def cosSim(row, vecIn): > return ( > tuple(( Vectors.dense( Vectors.dense(row.vectors.dot(vecIn)) / > > (Vectors.dense(np.sqrt(row.vectors.dot(row.vectors))) * > Vectors.dense(np.sqrt(vecIn.dot(vecIn))))) > ).toArray().tolist())) > > df.rdd.map(lambda row: cosSim(row, > vecIn)).toDF(['CosSim']).show(truncate=False) > > ``` > > *#2 `.toIndexedRowMatrix().columnSimilarities()` then filter the results > (not shown):* > > ``` > > spark.createDataFrame( > IndexedRowMatrix(df.rdd.map(lambda row: (row.vectors.toArray()))) > .toBlockMatrix() > .transpose() > .toIndexedRowMatrix() > .columnSimilarities() > .entries) > > ``` > > > *#3 replace Word2Vec model `/data/` with my own, then load 'revised' model > and use `.findSynonyms()`:* > ``` > > df_words_vectors.schema > ## > StructType(List(StructField(word,StringType,true),StructField(vector,ArrayType(FloatType,true),true))) > > df_words_vectors.write.parquet("exiting_Word2Vec_model/data/", > mode='overwrite') > > new_Word2Vec_model = Word2VecModel.load("exiting_Word2Vec_model") > > ## vecIn = vector of same dimensions as 'vector' column in DataFrame > saved over Word2Vec model /data/ > new_Word2Vec_model.findSynonyms(vecIn, 20).show() > > ``` > > > > > > Clay Stevens >