Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21913#discussion_r206704548
  
    --- Diff: core/src/main/scala/org/apache/spark/util/ThreadUtils.scala ---
    @@ -254,4 +254,49 @@ private[spark] object ThreadUtils {
           executor.shutdownNow()
         }
       }
    +
    +  /**
    +   * Transforms input collection by applying the given function to each 
element in parallel fashion.
    +   *
    +   * @param in - the input collection which should be transformed in 
parallel.
    +   * @param prefix - the prefix assigned to the underlying thread pool.
    +   * @param maxThreads - maximum number of thread can be created during 
execution.
    +   * @param f - the lambda function will be applied to each element of 
`in`.
    +   * @tparam I - the type of elements in the input collection.
    +   * @tparam O - the type of elements in resulted collection.
    +   * @return new collection in which each element was given from the input 
collection `in` by
    +   *         applying the lambda function `f`.
    +   */
    +  def parmap[I, O](
    +      in: TraversableOnce[I],
    --- End diff --
    
    You can use Scala generic to make this method return the same type as `in`. 
Such as 
    ```scala
    import scala.collection.TraversableLike
    import scala.collection.generic.CanBuildFrom
    import scala.language.higherKinds
    
      def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]]
          (in: Col[I], prefix: String, maxThreads: Int)
          (f: I => O)
          (implicit
            cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map
            cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]] // for Future.sequence
          ): Col[O] = {
        val pool = newForkJoinPool(prefix, maxThreads)
        try {
          implicit val ec = ExecutionContext.fromExecutor(pool)
          val futures: Col[Future[O]] = in.map(x => Future(f(x)))
          val futureSeq: Future[Col[O]] = Future.sequence(futures)
          awaitResult(futureSeq, Duration.Inf)
        } finally {
          pool.shutdownNow()
        }
      }
    ```
    Then the caller side doesn't need to call `toSeq`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to