Github user witgo commented on the pull request:

    https://github.com/apache/spark/pull/760#issuecomment-43433310
  
    A simple solution
    ```scala
    object ParallelCollectionRDD {
      /**
       * Slice a collection into numSlices sub-collections. One extra thing we 
do here is to treat Range
       * collections specially, encoding the slices as other Ranges to minimize 
memory cost. This makes
       * it efficient to run Spark over RDDs representing large sets of numbers.
       */
      def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
        if (numSlices < 1) {
          throw new IllegalArgumentException("Positive number of slices 
required")
        }
        seq match {
          case r: Range.Inclusive => {
            val sign = if (r.step < 0) {
              -1
            } else {
              1
            }
            slice(new Range(
              r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
          }
          case r: Range => {
            (0 until numSlices).map(i => {
              val start = ((i * r.length.toLong) / numSlices).toInt
              val end = (((i + 1) * r.length.toLong) / numSlices).toInt
              new Range(r.start + start * r.step, r.start + end * r.step, 
r.step)
            }).asInstanceOf[Seq[Seq[T]]]
          }
          case nr: NumericRange[_] => {
            if (nr.isInstanceOf[NumericRange[Double]]) {
              val dr = nr.asInstanceOf[NumericRange[Double]]
              slice(0.until(dr.length),numSlices).map { vs =>
                vs.map(v => dr.start + v * dr.step)
              }.asInstanceOf[Seq[Seq[T]]]
            }
            else {
              // For ranges of Long, Double, BigInteger, etc
              val slices = new ArrayBuffer[Seq[T]](numSlices)
              val sliceSize = (nr.size + numSlices - 1) / numSlices // Round up 
to catch everything
              var r = nr
              for (i <- 0 until numSlices) {
                slices += r.take(sliceSize).asInstanceOf[Seq[T]]
                r = r.drop(sliceSize)
              }
              slices
            }
          }
          case _ => {
            val array = seq.toArray // To prevent O(n^2) operations for List etc
            (0 until numSlices).map(i => {
              val start = ((i * array.length.toLong) / numSlices).toInt
              val end = (((i + 1) * array.length.toLong) / numSlices).toInt
              array.slice(start, end).toSeq
            })
          }
        }
      }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to