HyukjinKwon commented on a change in pull request #23787: [SPARK-26830][SQL][R] 
Vectorized R dapply() implementation
URL: https://github.com/apache/spark/pull/23787#discussion_r259602083
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/r/ArrowRRunner.scala
 ##########
 @@ -55,13 +56,32 @@ class ArrowRRunner(
     numPartitions = -1,
     isDataFrame = true,
     schema.fieldNames,
-    RRunnerModes.DATAFRAME_GAPPLY) {
+    mode) {
+
+  // TODO: it needs to refactor to share the same code with RRunner, and have 
separate
+  // ArrowRRunners.
+  private val getNextBatch = {
+    if (mode == RRunnerModes.DATAFRAME_GAPPLY) {
+      // gapply
+      (inputIterator: Iterator[_], keys: 
collection.mutable.ArrayBuffer[Array[Byte]]) => {
+        val (key, nextBatch) = inputIterator
+          .asInstanceOf[Iterator[(Array[Byte], Iterator[InternalRow])]].next()
+        keys.append(key)
+        nextBatch
+      }
+    } else {
+      // dapply
+      (inputIterator: Iterator[_], keys: 
collection.mutable.ArrayBuffer[Array[Byte]]) => {
+        inputIterator
+          .asInstanceOf[Iterator[Iterator[InternalRow]]].next()
+      }
+    }
+  }
 
   protected override def writeData(
       dataOut: DataOutputStream,
       printOut: PrintStream,
-      iter: Iterator[_]): Unit = if (iter.hasNext) {
-    val inputIterator = iter.asInstanceOf[Iterator[(Array[Byte], 
Iterator[InternalRow])]]
+      inputIterator: Iterator[_]): Unit = if (inputIterator.hasNext) {
 
 Review comment:
   Oh, yea. I actually manually tested them. I'll fix the tests to check it as 
well.
   
   Actually, I noticed R worker works a bit differently.
   
   If the iterator is empty, it sends `0` first and then the other data is not 
used. (see `isEmpty <- SparkR:::readInt(inputCon)` at `worker.R` and,
   
   ```scala
             if (!iter.hasNext) {
               dataOut.writeInt(0)
             } else {
               dataOut.writeInt(1)
             }
   ```
   
   at `RRunner.scala`).
   
   So, virtually it doesn't matter if we send the data or not. I just simply 
decided to ignore this since it's already ignored at R's worker side.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to