zsxwing commented on a change in pull request #21913: [SPARK-24005][CORE]
Remove usage of Scala’s parallel collection
URL: https://github.com/apache/spark/pull/21913#discussion_r258168424
##########
File path: core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
##########
@@ -254,4 +258,62 @@ private[spark] object ThreadUtils {
executor.shutdownNow()
}
}
+
+ /**
+ * Transforms input collection by applying the given function to each
element in parallel fashion.
+ * Comparing to the map() method of Scala parallel collections, this method
can be interrupted
+ * at any time. This is useful on canceling of task execution, for example.
+ *
+ * @param in - the input collection which should be transformed in parallel.
+ * @param prefix - the prefix assigned to the underlying thread pool.
+ * @param maxThreads - maximum number of thread can be created during
execution.
+ * @param f - the lambda function will be applied to each element of `in`.
+ * @tparam I - the type of elements in the input collection.
+ * @tparam O - the type of elements in resulted collection.
+ * @return new collection in which each element was given from the input
collection `in` by
+ * applying the lambda function `f`.
+ */
+ def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]]
+ (in: Col[I], prefix: String, maxThreads: Int)
+ (f: I => O)
+ (implicit
+ cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map
+ cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]] // for Future.sequence
+ ): Col[O] = {
+ val pool = newForkJoinPool(prefix, maxThreads)
+ try {
+ implicit val ec = ExecutionContext.fromExecutor(pool)
+
+ parmap(in)(f)
+ } finally {
+ pool.shutdownNow()
Review comment:
@ConeyLiu this line interrupts the tasks in the thread pool. Scala `par`
doesn't do this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]