[ https://issues.apache.org/jira/browse/SPARK-29852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean R. Owen resolved SPARK-29852. ---------------------------------- Resolution: Duplicate > Implement parallel preemptive RDD.toLocalIterator and Dataset.toLocalIterator > ----------------------------------------------------------------------------- > > Key: SPARK-29852 > URL: https://issues.apache.org/jira/browse/SPARK-29852 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL > Affects Versions: 2.4.4, 3.0.0 > Reporter: Peng Cheng > Priority: Major > Original Estimate: 0h > Remaining Estimate: 0h > > Both RDD and Dataset APIs have 2 methods of collecting data from executors to > driver: > > # .collect() setup multiple threads in a job and dump all data from executor > into drivers memory. This is great if data on driver needs to be accessible > ASAP, but not as efficient if access to partitions can only happen > sequentially, and outright risky if driver doesn't have enough memory to hold > all data. > - the solution for issue SPARK-25224 partially alleviate this by delaying > deserialisation of data in InternalRow format, such that only the much > smaller serialised data needs to be entirely hold by driver memory. This > solution does not abide O(1) memory consumption, thus does not scale to > arbitrarily large dataset > # .toLocalIterator() fetch one partition in 1 job at a time, and fetching of > the next partition does not start until sequential access to previous > partition has concluded. This action abides O(1) memory consumption and is > great if access to data is sequential and significantly slower than the speed > where partitions can be shipped from a single executor, with 1 thread. It > becomes inefficient when the sequential access to data has to wait for a > relatively long time for the shipping of the next partition > The proposed solution is a crossover between two existing implementations: a > concurrent subroutine that is both CPU and memory bounded. The solution > allocate a fixed sized resource pool (by default = number of available CPU > cores) that serves the shipping of partitions concurrently, and block > sequential access to partitions' data until shipping is finished (which > usually happens without blocking for partitionID >=2 due to the fact that > shipping start much earlier and preemptively). Tenants of the resource pool > can be GC'ed and evicted once sequential access to it's data has finished, > which allows more partitions to be fetched much earlier than they are > accessed. The maximum memory consumption is O(m * n), where m is the > predefined concurrency and n is the size of the largest partition. > The following scala code snippet demonstrates a simple implementation: > > (requires scala 2.11 + and ScalaTests) > > {code:java} > package org.apache.spark.spike > import java.util.concurrent.ArrayBlockingQueue > import org.apache.spark.rdd.RDD > import org.apache.spark.sql.SparkSession > import org.apache.spark.{FutureAction, SparkContext} > import org.scalatest.FunSpec > import scala.concurrent.Future > import scala.language.implicitConversions > import scala.reflect.ClassTag > import scala.util.{Failure, Success, Try} > class ToLocalIteratorPreemptivelySpike extends FunSpec { > import ToLocalIteratorPreemptivelySpike._ > lazy val sc: SparkContext = > SparkSession.builder().master("local[*]").getOrCreate().sparkContext > it("can be much faster than toLocalIterator") { > val max = 80 > val delay = 100 > val slowRDD = sc.parallelize(1 to max, 8).map { v => > Thread.sleep(delay) > v > } > val (r1, t1) = timed { > slowRDD.toLocalIterator.toList > } > val capacity = 4 > val (r2, t2) = timed { > slowRDD.toLocalIteratorPreemptively(capacity).toList > } > assert(r1 == r2) > println(s"linear: $t1, preemptive: $t2") > assert(t1 > t2 * 2) > assert(t2 > max * delay / capacity) > } > } > object ToLocalIteratorPreemptivelySpike { > case class PartitionExecution[T: ClassTag]( > @transient self: RDD[T], > id: Int > ) { > def eager: this.type = { > AsArray.future > this > } > case object AsArray { > @transient lazy val future: FutureAction[Array[T]] = { > var result: Array[T] = null > val future = self.context.submitJob[T, Array[T], Array[T]]( > self, > _.toArray, > Seq(id), { (_, data) => > result = data > }, > result > ) > future > } > @transient lazy val now: Array[T] = future.get() > } > } > implicit class RDDFunctions[T: ClassTag](self: RDD[T]) { > import scala.concurrent.ExecutionContext.Implicits.global > def _toLocalIteratorPreemptively(capacity: Int): Iterator[Array[T]] = { > val executions = self.partitions.indices.map { ii => > PartitionExecution(self, ii) > } > val buffer = new > ArrayBlockingQueue[Try[PartitionExecution[T]]](capacity) > Future { > executions.foreach { exe => > buffer.put(Success(exe)) // may be blocking due to capacity > exe.eager // non-blocking > } > }.onFailure { > case e: Throwable => > buffer.put(Failure(e)) > } > self.partitions.indices.toIterator.map { _ => > val exe = buffer.take().get > exe.AsArray.now > } > } > def toLocalIteratorPreemptively(capacity: Int): Iterator[T] = { > _toLocalIteratorPreemptively(capacity).flatten > } > } > def timed[T](fn: => T): (T, Long) = { > val startTime = System.currentTimeMillis() > val result = fn > val endTime = System.currentTimeMillis() > (result, endTime - startTime) > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org