HyukjinKwon commented on a change in pull request #23760: [SPARK-26762][SQL][R] 
Arrow optimization for conversion from Spark DataFrame to R DataFrame
URL: https://github.com/apache/spark/pull/23760#discussion_r256218676
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
 ##########
 @@ -3198,9 +3199,66 @@ class Dataset[T] private[sql](
   }
 
   /**
-   * Collect a Dataset as Arrow batches and serve stream to PySpark.
+   * Collect a Dataset as Arrow batches and serve stream to SparkR. It sends
+   * arrow batches in an ordered manner with buffering. This is inevitable
+   * due to missing R API that reads batches from socket directly. See 
ARROW-4512.
+   * Eventually, this code should be deduplicated by `collectAsArrowToPython`.
    */
-  private[sql] def collectAsArrowToPython(): Array[Any] = {
+  private[sql] def collectAsArrowToR(): Array[Any] = {
+    val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
+
+    withAction("collectAsArrowToR", queryExecution) { plan =>
+      RRDD.serveToStream("serve-Arrow") { outputStream =>
 
 Review comment:
   Yes, kind of. To be more explicit, I am buffering the batches in JVM side in 
an order by the reasons below:
   
   1.. R API does not look supporting to read Arrow data from socket 
(ARROW-4512).
   For instance, we cannot `read_record_batch(socketConn)`
   
   ```
   JVM side                                              R side    
   [Arrow Batch][Arrow Batch][Arrow Batch] ---socket---> [Arrow Batch]...
   ```
   
   Here, R side cannot read batch by batch. I tried to manually hack this by 
reading message only first but It is unable to only read batch's message by 
partially reading the batch (IMHO it's an issue in R API but didn't file a JIRA 
yet). There looks no way to know the batch's length in R side.
   
   
   ```
   JVM side                                              R side    
   [Arrow Stream Format (Arrow Batches)]   ---socket---> [Arrow Stream Format 
(Arrow Batches)]
   ```
   
   Here is the current approach. I fully read the binary, and then read it via 
R API (`read_arrow(readRaw(socketConn))`)
   
   
   2.. Yes, I also actually failed to find a way to construct Arrow table from 
Arrow batches in stream format via R APIs. However, I was thinking it's already 
blocked by the reason above so didn't investigate and file an issue at Arrow 
side.
   
   3.. Since I need to buffer batches in JVM anyway, I wanted to get rid of 
batch order handling in R side.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to