Re: Pairwise Processing of a List
AFAIK ordering is not strictly guaranteed unless the RDD is the product of a sort. I think that in practice, you'll never find elements of a file read in some random order, for example (although see the recent issue about partition ordering potentially depending on how the local file system lists them). Likewise I can't imagine you encounter elements from one Kafka partition out of order. One receiver hears one partition and create one block per block interval. What I'm not 100% clear on is whether you get undefined ordering when you have multiple threads listening in one receiver. You can always sort RDDs by a timestamp of some sort to be sure, although that has overheads. I'm also curious about what if anything is guaranteed here without a sort. On Mon, Jan 26, 2015 at 1:33 AM, Tobias Pfeiffer t...@preferred.jp wrote: Sean, On Mon, Jan 26, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote: Note that RDDs don't really guarantee anything about ordering though, so this only makes sense if you've already sorted some upstream RDD by a timestamp or sequence number. Speaking of order, is there some reading on guarantees and non-guarantees about order in RDDs? For example, when reading a file and doing zipWithIndex, can I assume that the lines are numbered in order? Does this hold for receiving data from Kafka, too? Tobias - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pairwise Processing of a List
So you’ve got a point A and you want the sum of distances between it and all other points? Or am I misunderstanding you? // target point, can be Broadcast global sent to all workers val tarPt = (10,20) val pts = Seq((2,2),(3,3),(2,3),(10,2)) val rdd= sc.parallelize(pts) rdd.map( pt = Math.sqrt( Math.pow(tarPt._1 - pt._1,2) + Math.pow(tarPt._2 - pt._2,2)) ).reduce( (d1,d2) = d1+d2) -Joe From: Steve Nunez snu...@hortonworks.commailto:snu...@hortonworks.com Date: Sunday, January 25, 2015 at 7:32 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Pairwise Processing of a List Spark Experts, I’ve got a list of points: List[(Float, Float)]) that represent (x,y) coordinate pairs and need to sum the distance. It’s easy enough to compute the distance: case class Point(x: Float, y: Float) { def distance(other: Point): Float = sqrt(pow(x - other.x, 2) + pow(y - other.y, 2)).toFloat } (in this case I create a ‘Point’ class, but the maths are the same). What I can’t figure out is the ‘right’ way to sum distances between all the points. I can make this work by traversing the list with a for loop and using indices, but this doesn’t seem right. Anyone know a clever way to process List[(Float, Float)]) in a pairwise fashion? Regards, - Steve
Re: Pairwise Processing of a List
Hi, On Mon, Jan 26, 2015 at 9:32 AM, Steve Nunez snu...@hortonworks.com wrote: I’ve got a list of points: List[(Float, Float)]) that represent (x,y) coordinate pairs and need to sum the distance. It’s easy enough to compute the distance: Are you saying you want all combinations (N^2) of distances? That should be possible with rdd.cartesian(): val points = sc.parallelize(List((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))) points.cartesian(points).collect -- Array[((Double, Double), (Double, Double))] = Array(((1.0,2.0),(1.0,2.0)), ((1.0,2.0),(3.0,4.0)), ((1.0,2.0),(5.0,6.0)), ((3.0,4.0),(1.0,2.0)), ((3.0,4.0),(3.0,4.0)), ((3.0,4.0),(5.0,6.0)), ((5.0,6.0),(1.0,2.0)), ((5.0,6.0),(3.0,4.0)), ((5.0,6.0),(5.0,6.0))) I guess this is a very expensive operation, though. Tobias
Re: Pairwise Processing of a List
If this is really about just Scala Lists, then a simple answer (using tuples of doubles) is: val points: List[(Double,Double)] = ... val distances = for (p1 - points; p2 - points) yield { val dx = p1._1 - p2._1 val dy = p1._2 - p2._2 math.sqrt(dx*dx + dy*dy) } distances.sum / 2 It's / 2 since this counts every pair twice. You could double the speed of that, with a slightly more complex formulation using indices, that avoids comparing points to themselves and makes each comparison just once. If you really need the sum of all pairwise distances, I don't think you can do better than that (modulo dealing with duplicates intelligently). If we're talking RDDs, then the simple answer is similar: val pointsRDD: RDD[(Double,Double)] = ... val distancesRDD = pointsRDD.cartesian(pointsRDD).map { case (p1, p2) = ... } distancesRDD.sum / 2 It takes more work to make the same optimization, and involves zipWithIndex, but is possible. If the reason we're talking about Lists is that the set of points is still fairly small, but big enough that all-pairs deserves distributed computation, then I'd parallelize the List into an RDD, and also broadcast it, and then implement a hybrid of these two approaches. You'd have the outer loop over points happening in parallel via the RDD, and inner loop happening locally over the local broadcasted copy in memory. ... and if the use case isn't really to find all-pairs distances and their sum, maybe there are faster ways still to do what you need to. On Mon, Jan 26, 2015 at 12:32 AM, Steve Nunez snu...@hortonworks.com wrote: Spark Experts, I’ve got a list of points: List[(Float, Float)]) that represent (x,y) coordinate pairs and need to sum the distance. It’s easy enough to compute the distance: case class Point(x: Float, y: Float) { def distance(other: Point): Float = sqrt(pow(x - other.x, 2) + pow(y - other.y, 2)).toFloat } (in this case I create a ‘Point’ class, but the maths are the same). What I can’t figure out is the ‘right’ way to sum distances between all the points. I can make this work by traversing the list with a for loop and using indices, but this doesn’t seem right. Anyone know a clever way to process List[(Float, Float)]) in a pairwise fashion? Regards, - Steve - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pairwise Processing of a List
Not combinations, linear distances, e.g., given: List[ (x1,y1), (x2,y2), (x3,y3) ], compute the sum of: distance (x1,y2) and (x2,y2) and distance (x2,y2) and (x3,y3) Imagine that the list of coordinate point comes from a GPS and describes a trip. - Steve From: Joseph Lust jl...@mc10inc.commailto:jl...@mc10inc.com Date: Sunday, January 25, 2015 at 17:17 To: Steve Nunez snu...@hortonworks.commailto:snu...@hortonworks.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Pairwise Processing of a List So you've got a point A and you want the sum of distances between it and all other points? Or am I misunderstanding you? // target point, can be Broadcast global sent to all workers val tarPt = (10,20) val pts = Seq((2,2),(3,3),(2,3),(10,2)) val rdd= sc.parallelize(pts) rdd.map( pt = Math.sqrt( Math.pow(tarPt._1 - pt._1,2) + Math.pow(tarPt._2 - pt._2,2)) ).reduce( (d1,d2) = d1+d2) -Joe From: Steve Nunez snu...@hortonworks.commailto:snu...@hortonworks.com Date: Sunday, January 25, 2015 at 7:32 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Pairwise Processing of a List Spark Experts, I've got a list of points: List[(Float, Float)]) that represent (x,y) coordinate pairs and need to sum the distance. It's easy enough to compute the distance: case class Point(x: Float, y: Float) { def distance(other: Point): Float = sqrt(pow(x - other.x, 2) + pow(y - other.y, 2)).toFloat } (in this case I create a 'Point' class, but the maths are the same). What I can't figure out is the 'right' way to sum distances between all the points. I can make this work by traversing the list with a for loop and using indices, but this doesn't seem right. Anyone know a clever way to process List[(Float, Float)]) in a pairwise fashion? Regards, - Steve CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: Pairwise Processing of a List
(PS the Scala code I posted is a poor way to do it -- it would materialize the entire cartesian product in memory. You can use .iterator or .view to fix that.) Ah, so you want sum of distances between successive points. val points: List[(Double,Double)] = ... points.sliding(2).map { case List(p1,p2) = distance(p1,p2) }.sum If you import org.apache.spark.mllib.rdd.RDDFunctions._ you should have access to something similar in Spark over an RDD. It gives you a sliding() function that produces Arrays of sequential elements. Note that RDDs don't really guarantee anything about ordering though, so this only makes sense if you've already sorted some upstream RDD by a timestamp or sequence number. On Mon, Jan 26, 2015 at 1:21 AM, Steve Nunez snu...@hortonworks.com wrote: Not combinations, linear distances, e.g., given: List[ (x1,y1), (x2,y2), (x3,y3) ], compute the sum of: distance (x1,y2) and (x2,y2) and distance (x2,y2) and (x3,y3) Imagine that the list of coordinate point comes from a GPS and describes a trip. - Steve From: Joseph Lust jl...@mc10inc.com Date: Sunday, January 25, 2015 at 17:17 To: Steve Nunez snu...@hortonworks.com, user@spark.apache.org user@spark.apache.org Subject: Re: Pairwise Processing of a List So you’ve got a point A and you want the sum of distances between it and all other points? Or am I misunderstanding you? // target point, can be Broadcast global sent to all workers val tarPt = (10,20) val pts = Seq((2,2),(3,3),(2,3),(10,2)) val rdd= sc.parallelize(pts) rdd.map( pt = Math.sqrt( Math.pow(tarPt._1 - pt._1,2) + Math.pow(tarPt._2 - pt._2,2)) ).reduce( (d1,d2) = d1+d2) -Joe From: Steve Nunez snu...@hortonworks.com Date: Sunday, January 25, 2015 at 7:32 PM To: user@spark.apache.org user@spark.apache.org Subject: Pairwise Processing of a List Spark Experts, I’ve got a list of points: List[(Float, Float)]) that represent (x,y) coordinate pairs and need to sum the distance. It’s easy enough to compute the distance: case class Point(x: Float, y: Float) { def distance(other: Point): Float = sqrt(pow(x - other.x, 2) + pow(y - other.y, 2)).toFloat } (in this case I create a ‘Point’ class, but the maths are the same). What I can’t figure out is the ‘right’ way to sum distances between all the points. I can make this work by traversing the list with a for loop and using indices, but this doesn’t seem right. Anyone know a clever way to process List[(Float, Float)]) in a pairwise fashion? Regards, - Steve CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pairwise Processing of a List
Sean, On Mon, Jan 26, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote: Note that RDDs don't really guarantee anything about ordering though, so this only makes sense if you've already sorted some upstream RDD by a timestamp or sequence number. Speaking of order, is there some reading on guarantees and non-guarantees about order in RDDs? For example, when reading a file and doing zipWithIndex, can I assume that the lines are numbered in order? Does this hold for receiving data from Kafka, too? Tobias