HyukjinKwon commented on a change in pull request #23787: [SPARK-26830][SQL][R]
Vectorized R dapply() implementation
URL: https://github.com/apache/spark/pull/23787#discussion_r257095448
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
##########
@@ -193,6 +194,87 @@ case class MapPartitionsExec(
}
}
+/**
+ * Similar with [[MapPartitionsExec]] and
+ * [[org.apache.spark.sql.execution.r.MapPartitionsRWrapper]] but serializes
and deserializes
+ * input/output in Arrow format.
+ *
+ * This is somewhat similar with
[[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]]
+ */
+case class MapPartitionsInRWithArrowExec(
+ func: Array[Byte],
+ packageNames: Array[Byte],
+ broadcastVars: Array[Broadcast[Object]],
+ inputSchema: StructType,
+ output: Seq[Attribute],
+ child: SparkPlan) extends UnaryExecNode {
+
+ private val batchSize = conf.arrowMaxRecordsPerBatch
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ child.execute().mapPartitionsInternal { iter =>
+ // Hyukjin: seems to convert from Arrow batches to Arrow batches
directly here
+ // causes segmentation fault intermittently. For instance, when child is
directly
+ // originated from Arrow batches. Input projection was added to avoid it
based on
+ // my speculation (and it does not happen anymore). It is potentially an
issue in Arrow
+ // optimization in my humble opinion.
+ val inputProject = UnsafeProjection.create(child.output, child.output)
+ val inputIter = iter.map(inputProject)
+
+ val outputTypes = schema.map(_.dataType)
+
+ // DO NOT use iter.grouped(). See BatchIterator.
+ val batchIter =
+ if (batchSize > 0) new BatchIterator(inputIter, batchSize) else
Iterator(inputIter)
+
+ val runner = new ArrowRRunner(func, packageNames, broadcastVars,
inputSchema,
+ SQLConf.get.sessionLocalTimeZone, RRunnerModes.DATAFRAME_DAPPLY)
+
+ // The communication mechanism is as follows:
+ //
+ // JVM side R side
+ //
+ // 1. Internal rows --------> Arrow record batches
+ // 2. Converts each Arrow record
batch to each R data frame
+ // 3. Combine R data frames into one
R data frame
+ // 4. Computes R native function on
the data frame
+ // 5. Converts the R data frame to
Arrow record batches
+ // 6. Columnar batches <-------- Arrow record batches
+ // 7. Each row from each batch
+ //
+ // Note that, unlike Python vectorization implementation, R side sends
Arrow formatted
+ // binary in a batch due to the limitation of R API. See also ARROW-4512.
+ val columnarBatchIter = runner.compute(batchIter, -1)
+ val outputProject = UnsafeProjection.create(output, output)
+ new Iterator[InternalRow] {
Review comment:
This is almost as is from Scalar Pandas's iterator.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]