Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1381#discussion_r16913280
  
    --- Diff: 
core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---
    @@ -67,4 +67,41 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
           }
         }, preservesPartitioning = true)
       }
    +
    +  /**
    +   * Returns an RDD with any elements not in the inclusive range `lower` 
to `upper` filtered out.
    +   * If the RDD has been partitioned using the `RangePartitioner` this is 
an operation that can be
    +   * done efficiently. If not a standard `filter` is used.
    +   */
    +  def filterByRange(lower: K, upper: K) : RDD[P] = {
    +
    +    def rangeFilter(iter:Iterator[P]) = {
    +      for {
    +        (k, v) <- iter
    +        if ordering.gteq(k, lower) && ordering.lteq(k, upper)
    +      } yield (k, v).asInstanceOf[P]
    +    }
    +
    +    val partitionIndicies =
    +      self.partitioner match {
    +        case Some(p) => {
    +          p match {
    +            case rp: RangePartitioner[K, V] => {
    +              (rp.getPartition(lower), rp.getPartition(upper)) match {
    +                case (l, u) if l <= u => l to u
    +                case (l, u) if l > u => u to l
    +              }
    +            }
    +            case _ => {
    +              0 until p.numPartitions
    +            }
    +          }
    +        }
    +        case None => {
    +          return self.mapPartitions(rangeFilter)
    --- End diff --
    
    This is better, but I think it could be further simplified.  It's confusing 
to have a `match` statement where we `return` from one branch and keep going 
from another.  Here's what I have in mind (I think this works, but haven't 
checked; this is psuedocode):
    
    ```scala
    self.partitioner match {
       case Some(rp: RangePartitioner) => {
           val partitionIndicies = {
              // compute indicies here using `rp`
           }
          val prunedRdd = PartitionPruningRDD.create(self, 
partitionIndicies.contains)
          prunedRdd.mapPartitions(rangeFilter)
        }
        case _ =>
           self.mapPartitions(rangeFilter)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to