Hi. What I would do in your case would be something like this..
Lets call the two datasets, qs and ds, where qs is an array of vectors and ds is an RDD[(dsID: Long, Vector)]. Do the following: 1) create a k-NN class that can keep track of the k-Nearest Neighbors so far. It must have a qsID and some structure for the k nearest neighbors Seq[(dsID:Long, Distance: Long)] and the function .add( nn : (Long, Vector) ) that will do the distance calc and update the kNN when appropriate. 2) collect the qs and key-it as well, so each qs has an ID, i.e. qs = Array[(qsID : Long, Vector)] Now what you want to do is not create all the distance stuff, but just the k-NNs. To do this we will actually create a few k-NN for each query vector, one for each partition, and then merge them later. 3) do a ds.mapPartition() and inside the function you create a k-NN for the each qs, scan the ds points of the partition and output an iterator pointing to the set of k-NNs created. val k = 100 val qs = new Array[(KNNClass)]() val ds = RDD[(Long, Vector)]() val knnResults = ds.mapPartitions( itr => { val knns = qs.map( qp => (qp._1, new KNNClass(k, qp) ) itr.foreach( dp => { knns.foreach( knn => knn.add( dp )) } ) knns.iterator }) Now you have one k-NN per partition for each query point, but this we can simply fix by doing a reduceByKey and merge all the k-NNs for each qpID into a single k-NN. val knnResultFinal = knnResults.reduceByKey( (a, b) => KNNClass.merge( a, b) ) Where you have a static function that merges the two k-NNs, i.e. we simply concatenate them and sort on distance, and then take the k top values and returns them as a new knn class. If you want to control how many k-NNs are create you can always repartition ds. How does that sound? Does this make any sense? :) Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-Nearest-Neighbours-tp23759p23899.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org