HyukjinKwon commented on a change in pull request #23787: [SPARK-26830][SQL][R] 
Vectorized R dapply() implementation
URL: https://github.com/apache/spark/pull/23787#discussion_r257095975
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
 ##########
 @@ -193,6 +194,87 @@ case class MapPartitionsExec(
   }
 }
 
+/**
+ * Similar with [[MapPartitionsExec]] and
+ * [[org.apache.spark.sql.execution.r.MapPartitionsRWrapper]] but serializes 
and deserializes
+ * input/output in Arrow format.
+ *
+ * This is somewhat similar with 
[[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]]
+ */
+case class MapPartitionsInRWithArrowExec(
+    func: Array[Byte],
+    packageNames: Array[Byte],
+    broadcastVars: Array[Broadcast[Object]],
+    inputSchema: StructType,
+    output: Seq[Attribute],
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val batchSize = conf.arrowMaxRecordsPerBatch
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    child.execute().mapPartitionsInternal { iter =>
+      // Hyukjin: seems to convert from Arrow batches to Arrow batches 
directly here
+      // causes segmentation fault intermittently. For instance, when child is 
directly
+      // originated from Arrow batches. Input projection was added to avoid it 
based on
+      // my speculation (and it does not happen anymore). It is potentially an 
issue in Arrow
+      // optimization in my humble opinion.
+      val inputProject = UnsafeProjection.create(child.output, child.output)
+      val inputIter = iter.map(inputProject)
+
+      val outputTypes = schema.map(_.dataType)
+
+      // DO NOT use iter.grouped(). See BatchIterator.
+      val batchIter =
+        if (batchSize > 0) new BatchIterator(inputIter, batchSize) else 
Iterator(inputIter)
+
+      val runner = new ArrowRRunner(func, packageNames, broadcastVars, 
inputSchema,
+        SQLConf.get.sessionLocalTimeZone, RRunnerModes.DATAFRAME_DAPPLY)
+
+      // The communication mechanism is as follows:
+      //
+      //    JVM side                           R side
+      //
+      // 1. Internal rows            --------> Arrow record batches
 
 Review comment:
   One difference comparing to Scalar Pandas UDF here is, it buffers Arrow 
batches (namely R Data Frames at R side) and applies the function at once. This 
difference is because dapply explicitly meantions that the function is applied 
to each partition:
   
   
https://github.com/apache/spark/blob/2f6e88fecb455a02c4c08c41290e2f338e979543/R/pkg/R/DataFrame.R#L1461
   
   FYI, @mengxr per SPARK-26412

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to