Carter,
Just as a quick & simple starting point for Spark. (caveats - lots of
improvements reqd for scaling, graceful and efficient handling of RDD et
al):
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import scala.collection.immutable.ListMap
import scala.collection.immutable.SortedMap
object TopK {
//
def getCurrentDirectory = new java.io.File( "." ).getCanonicalPath
//
def distance(x1:List[Int],x2:List[Int]):Double = {
val dist:Double = math.sqrt(math.pow(x1(1)-x2(1),2) + math.pow(x1(2)-x2(
2),2))
dist
}
//
def main(args: Array[String]): Unit = {
//
println(getCurrentDirectory)
val sc = new SparkContext("local","TopK",
"spark://USS-Defiant.local:7077")
println(s"Running Spark Version ${sc.version}")
val file = sc.textFile("data01.csv")
//
val data = file
.map(line => line.split(","))
.map(x1 => List(x1(0).toInt,x1(1).toInt,x1(2).toInt))
//val data1 = data.collect
println("data")
for (d <- data) {
println(d)
println(d(0))
}
//
val distList = for (d <- data) yield {d(0)}
//for (d <- distList) (println(d))
val zipList = for (a <- distList.collect; b <- distList.collect)
yield{ List(
a,b)}
zipList.foreach(println(_))
//
val dist = for (l <- zipList) yield {
println(s"${l(0)} = ${l(1)}")
val x1a:Array[List[Int]] = data.filter(d => d(0) == l(0)).collect
val x2a:Array[List[Int]] = data.filter(d => d(0) == l(1)).collect
val x1:List[Int] = x1a(0)
val x2:List[Int] = x2a(0)
val dist = distance(x1,x2)
Map ( dist -> l )
}
dist.foreach(println(_)) // sort this for topK
//
}
}
data01.csv
1,68,93
2,12,90
3,45,76
4,86,54
HTH.
Cheers
<k/>
On Tue, May 27, 2014 at 4:10 AM, Carter <[email protected]> wrote:
> Any suggestion is very much appreciated.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/K-nearest-neighbors-search-in-Spark-tp6393p6421.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>