JoshRosen commented on code in PR #36683:
URL: https://github.com/apache/spark/pull/36683#discussion_r883134877


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##########
@@ -190,32 +191,30 @@ private[sql] object ArrowConverters {
   }
 
   /**
-   * Create a DataFrame from an RDD of serialized ArrowRecordBatches.
+   * Create a DataFrame from an iterator of serialized ArrowRecordBatches.
    */
-  private[sql] def toDataFrame(
-      arrowBatchRDD: JavaRDD[Array[Byte]],
+  def toDataFrame(
+      arrowBatches: Iterator[Array[Byte]],
       schemaString: String,
       session: SparkSession): DataFrame = {
-    val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
-    val timeZoneId = session.sessionState.conf.sessionLocalTimeZone
-    val rdd = arrowBatchRDD.rdd.mapPartitions { iter =>
-      val context = TaskContext.get()
-      ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context)
-    }
-    session.internalCreateDataFrame(rdd.setName("arrow"), schema)
+    val attrs = 
DataType.fromJson(schemaString).asInstanceOf[StructType].toAttributes
+    val data = ArrowConverters.fromBatchIterator(
+      arrowBatches,
+      DataType.fromJson(schemaString).asInstanceOf[StructType],
+      session.sessionState.conf.sessionLocalTimeZone,
+      TaskContext.get())
+    // Project it. Otherwise, the Arrow column vectors will be closed and 
released out.
+    val proj = UnsafeProjection.create(attrs, attrs)
+    Dataset.ofRows(session, LocalRelation(attrs, data.map(r => 
proj(r).copy()).toArray))

Review Comment:
   I agree that the `ConvertToLocalRelation` optimization can greatly improve 
performance for small tables, especially because local relations have an 
accurate implementation of `computeStats()` which allows us to benefit from 
other optimizations like auto broadcast joins.
   
   I suppose it's possible that Arrow batches could be _much_ more compressible 
than UnsafeRows such that we experience a huge inflation in memory usage. For 
example, let's say I have an Arrow row batch containing a huge number of rows 
with all boolean fields: these booleans will be encoded space-efficiently in 
Arrow but will experience huge bloat when converted into UnsafeRows.
   
   I don't have any sense of whether users are doing this today, though. 
_Probably_ most cases are small enough that this wouldn't be an issue, but 
that's just speculation (especially because I'm not super familiar with 
Pandas-on-Spark usecases).
   
   If we _are_ concerned about this, though, then here's some brainstorming on 
potential options:
   
   - We could add a configuration flag so that users can opt-in to revert back 
to the old behavior. Users would have to somehow know/discover this flag after 
they hit performance problems, though.
   - We might be able to come up with reasonably accurate heuristic estimates 
of the dataset's UnsafeRow size based on the schema and then choose which path 
to use based on the size threshold. Our new flag could be a size threshold flag 
(like autobroadcast join) rather than a boolean flag.
     - As a strawman, the threshold could be 10 MB (the default autobroadcast 
join threshold).
     - For the estimation, we can use 
`EstimationUtils.getSizePerRow(attributes) * numRows`. 
       - As an aside, I think that some of these estimates are potentially 
inaccurate because they're based on `DataType.defaultSize` which I think tends 
to underestimate UnsafeRow overheads for small primitive fields (e.g. it 
estimates the cost of a boolean as 1 byte when in reality it's 8 bytes in an 
UnsafeRow). That said, I think we should probably use the same estimation logic 
everywhere and should separately investigate if we want to re-tune that method 
for accuracy. Changes to that estimation would have wide-rippling effects on 
whether we autobroadcast join, so we'd need careful investigation to make sure 
we don't significantly regress existing queries if we change the estimation.
   
   Do you think it's worth it to worry about this edge-case and add 
thresholding? Or is the risk/impact low enough that the extra complexity isn't 
justified?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to