HyukjinKwon commented on a change in pull request #23787: [SPARK-26830][SQL][R] 
Vectorized R dapply() implementation
URL: https://github.com/apache/spark/pull/23787#discussion_r259602502
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/r/ArrowRRunner.scala
 ##########
 @@ -55,13 +56,32 @@ class ArrowRRunner(
     numPartitions = -1,
     isDataFrame = true,
     schema.fieldNames,
-    RRunnerModes.DATAFRAME_GAPPLY) {
+    mode) {
+
+  // TODO: it needs to refactor to share the same code with RRunner, and have 
separate
+  // ArrowRRunners.
+  private val getNextBatch = {
+    if (mode == RRunnerModes.DATAFRAME_GAPPLY) {
+      // gapply
+      (inputIterator: Iterator[_], keys: 
collection.mutable.ArrayBuffer[Array[Byte]]) => {
+        val (key, nextBatch) = inputIterator
+          .asInstanceOf[Iterator[(Array[Byte], Iterator[InternalRow])]].next()
+        keys.append(key)
+        nextBatch
+      }
+    } else {
+      // dapply
+      (inputIterator: Iterator[_], keys: 
collection.mutable.ArrayBuffer[Array[Byte]]) => {
+        inputIterator
+          .asInstanceOf[Iterator[Iterator[InternalRow]]].next()
+      }
+    }
+  }
 
   protected override def writeData(
       dataOut: DataOutputStream,
       printOut: PrintStream,
-      iter: Iterator[_]): Unit = if (iter.hasNext) {
-    val inputIterator = iter.asInstanceOf[Iterator[(Array[Byte], 
Iterator[InternalRow])]]
+      inputIterator: Iterator[_]): Unit = if (inputIterator.hasNext) {
 
 Review comment:
   and .. I target to add more type specification test at SPARK-26920 (for 
instance, map type and struct type are not supported). I think this could 
alleviate the concern about corner cases. 
   
   I tried to only switch data transfer logic so checking empty rows, non-empty 
rows, columns, type specifications might be okay enough. (BTW, existing dapply 
itself is a bit flaky with some holes IMHO so it's a bit tricky to compare 
both).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to