HyukjinKwon commented on a change in pull request #23787: [SPARK-26830][SQL][R]
Vectorized R dapply() implementation
URL: https://github.com/apache/spark/pull/23787#discussion_r257095975
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
##########
@@ -193,6 +194,87 @@ case class MapPartitionsExec(
}
}
+/**
+ * Similar with [[MapPartitionsExec]] and
+ * [[org.apache.spark.sql.execution.r.MapPartitionsRWrapper]] but serializes
and deserializes
+ * input/output in Arrow format.
+ *
+ * This is somewhat similar with
[[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]]
+ */
+case class MapPartitionsInRWithArrowExec(
+ func: Array[Byte],
+ packageNames: Array[Byte],
+ broadcastVars: Array[Broadcast[Object]],
+ inputSchema: StructType,
+ output: Seq[Attribute],
+ child: SparkPlan) extends UnaryExecNode {
+
+ private val batchSize = conf.arrowMaxRecordsPerBatch
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ child.execute().mapPartitionsInternal { iter =>
+ // Hyukjin: seems to convert from Arrow batches to Arrow batches
directly here
+ // causes segmentation fault intermittently. For instance, when child is
directly
+ // originated from Arrow batches. Input projection was added to avoid it
based on
+ // my speculation (and it does not happen anymore). It is potentially an
issue in Arrow
+ // optimization in my humble opinion.
+ val inputProject = UnsafeProjection.create(child.output, child.output)
+ val inputIter = iter.map(inputProject)
+
+ val outputTypes = schema.map(_.dataType)
+
+ // DO NOT use iter.grouped(). See BatchIterator.
+ val batchIter =
+ if (batchSize > 0) new BatchIterator(inputIter, batchSize) else
Iterator(inputIter)
+
+ val runner = new ArrowRRunner(func, packageNames, broadcastVars,
inputSchema,
+ SQLConf.get.sessionLocalTimeZone, RRunnerModes.DATAFRAME_DAPPLY)
+
+ // The communication mechanism is as follows:
+ //
+ // JVM side R side
+ //
+ // 1. Internal rows --------> Arrow record batches
Review comment:
One difference comparing to Scalar Pandas UDF here is, it buffers Arrow
batches (namely R Data Frames at R side) and applies the function at once. This
difference is because dapply explicitly meantions that the function is applied
to each partition:
https://github.com/apache/spark/blob/2f6e88fecb455a02c4c08c41290e2f338e979543/R/pkg/R/DataFrame.R#L1461
FYI, @mengxr per SPARK-26412
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]