HyukjinKwon commented on a change in pull request #23787: [SPARK-26830][SQL][R] 
Vectorized R dapply() implementation
URL: https://github.com/apache/spark/pull/23787#discussion_r257095448
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
 ##########
 @@ -193,6 +194,87 @@ case class MapPartitionsExec(
   }
 }
 
+/**
+ * Similar with [[MapPartitionsExec]] and
+ * [[org.apache.spark.sql.execution.r.MapPartitionsRWrapper]] but serializes 
and deserializes
+ * input/output in Arrow format.
+ *
+ * This is somewhat similar with 
[[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]]
+ */
+case class MapPartitionsInRWithArrowExec(
+    func: Array[Byte],
+    packageNames: Array[Byte],
+    broadcastVars: Array[Broadcast[Object]],
+    inputSchema: StructType,
+    output: Seq[Attribute],
+    child: SparkPlan) extends UnaryExecNode {
+
+  private val batchSize = conf.arrowMaxRecordsPerBatch
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    child.execute().mapPartitionsInternal { iter =>
+      // Hyukjin: seems to convert from Arrow batches to Arrow batches 
directly here
+      // causes segmentation fault intermittently. For instance, when child is 
directly
+      // originated from Arrow batches. Input projection was added to avoid it 
based on
+      // my speculation (and it does not happen anymore). It is potentially an 
issue in Arrow
+      // optimization in my humble opinion.
+      val inputProject = UnsafeProjection.create(child.output, child.output)
+      val inputIter = iter.map(inputProject)
+
+      val outputTypes = schema.map(_.dataType)
+
+      // DO NOT use iter.grouped(). See BatchIterator.
+      val batchIter =
+        if (batchSize > 0) new BatchIterator(inputIter, batchSize) else 
Iterator(inputIter)
+
+      val runner = new ArrowRRunner(func, packageNames, broadcastVars, 
inputSchema,
+        SQLConf.get.sessionLocalTimeZone, RRunnerModes.DATAFRAME_DAPPLY)
+
+      // The communication mechanism is as follows:
+      //
+      //    JVM side                           R side
+      //
+      // 1. Internal rows            --------> Arrow record batches
+      // 2.                                    Converts each Arrow record 
batch to each R data frame
+      // 3.                                    Combine R data frames into one 
R data frame
+      // 4.                                    Computes R native function on 
the data frame
+      // 5.                                    Converts the R data frame to 
Arrow record batches
+      // 6. Columnar batches         <-------- Arrow record batches
+      // 7. Each row from each batch
+      //
+      // Note that, unlike Python vectorization implementation, R side sends 
Arrow formatted
+      // binary in a batch due to the limitation of R API. See also ARROW-4512.
+      val columnarBatchIter = runner.compute(batchIter, -1)
+      val outputProject = UnsafeProjection.create(output, output)
+      new Iterator[InternalRow] {
 
 Review comment:
   This is almost as is from Scalar Pandas's iterator.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to