Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/21913#discussion_r206704548
--- Diff: core/src/main/scala/org/apache/spark/util/ThreadUtils.scala ---
@@ -254,4 +254,49 @@ private[spark] object ThreadUtils {
executor.shutdownNow()
}
}
+
+ /**
+ * Transforms input collection by applying the given function to each
element in parallel fashion.
+ *
+ * @param in - the input collection which should be transformed in
parallel.
+ * @param prefix - the prefix assigned to the underlying thread pool.
+ * @param maxThreads - maximum number of thread can be created during
execution.
+ * @param f - the lambda function will be applied to each element of
`in`.
+ * @tparam I - the type of elements in the input collection.
+ * @tparam O - the type of elements in resulted collection.
+ * @return new collection in which each element was given from the input
collection `in` by
+ * applying the lambda function `f`.
+ */
+ def parmap[I, O](
+ in: TraversableOnce[I],
--- End diff --
You can use Scala generic to make this method return the same type as `in`.
Such as
```scala
import scala.collection.TraversableLike
import scala.collection.generic.CanBuildFrom
import scala.language.higherKinds
def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]]
(in: Col[I], prefix: String, maxThreads: Int)
(f: I => O)
(implicit
cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map
cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]] // for Future.sequence
): Col[O] = {
val pool = newForkJoinPool(prefix, maxThreads)
try {
implicit val ec = ExecutionContext.fromExecutor(pool)
val futures: Col[Future[O]] = in.map(x => Future(f(x)))
val futureSeq: Future[Col[O]] = Future.sequence(futures)
awaitResult(futureSeq, Duration.Inf)
} finally {
pool.shutdownNow()
}
}
```
Then the caller side doesn't need to call `toSeq`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]