JoshRosen commented on code in PR #36683:
URL: https://github.com/apache/spark/pull/36683#discussion_r883134877
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##########
@@ -190,32 +191,30 @@ private[sql] object ArrowConverters {
}
/**
- * Create a DataFrame from an RDD of serialized ArrowRecordBatches.
+ * Create a DataFrame from an iterator of serialized ArrowRecordBatches.
*/
- private[sql] def toDataFrame(
- arrowBatchRDD: JavaRDD[Array[Byte]],
+ def toDataFrame(
+ arrowBatches: Iterator[Array[Byte]],
schemaString: String,
session: SparkSession): DataFrame = {
- val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
- val timeZoneId = session.sessionState.conf.sessionLocalTimeZone
- val rdd = arrowBatchRDD.rdd.mapPartitions { iter =>
- val context = TaskContext.get()
- ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context)
- }
- session.internalCreateDataFrame(rdd.setName("arrow"), schema)
+ val attrs =
DataType.fromJson(schemaString).asInstanceOf[StructType].toAttributes
+ val data = ArrowConverters.fromBatchIterator(
+ arrowBatches,
+ DataType.fromJson(schemaString).asInstanceOf[StructType],
+ session.sessionState.conf.sessionLocalTimeZone,
+ TaskContext.get())
+ // Project it. Otherwise, the Arrow column vectors will be closed and
released out.
+ val proj = UnsafeProjection.create(attrs, attrs)
+ Dataset.ofRows(session, LocalRelation(attrs, data.map(r =>
proj(r).copy()).toArray))
Review Comment:
I agree that the `ConvertToLocalRelation` optimization can greatly improve
performance for small tables, especially because local relations have an
accurate implementation of `computeStats()` which allows us to benefit from
other optimizations like auto broadcast joins.
I suppose it's possible that Arrow batches could be _much_ more compressible
than UnsafeRows such that we experience a huge inflation in memory usage. For
example, let's say I have an Arrow row batch containing a huge number of rows
with all boolean fields: these booleans will be encoded space-efficiently in
Arrow but will experience huge bloat when converted into UnsafeRows.
I don't have any sense of whether users are doing this today, though.
_Probably_ most cases are small enough that this wouldn't be an issue, but
that's just speculation (especially because I'm not super familiar with
Pandas-on-Spark usecases).
If we _are_ concerned about this, though, then here's some brainstorming on
potential options:
- We could add a configuration flag so that users can opt-in to revert back
to the old behavior. Users would have to somehow know/discover this flag after
they hit performance problems, though.
- We might be able to come up with reasonably accurate heuristic estimates
of the dataset's UnsafeRow size based on the schema and then choose which path
to use based on the size threshold. Our new flag could be a size threshold flag
(like autobroadcast join) rather than a boolean flag.
- As a strawman, the threshold could be 10 MB (the default autobroadcast
join threshold).
- For the estimation, we can use
`EstimationUtils.getSizePerRow(attributes) * numRows`.
- As an aside, I think that some of these estimates are potentially
inaccurate because they're based on `DataType.defaultSize` which I think tends
to underestimate UnsafeRow overheads for small primitive fields (e.g. it
estimates the cost of a boolean as 1 byte when in reality it's 8 bytes in an
UnsafeRow). That said, I think we should probably use the same estimation logic
everywhere and should separately investigate if we want to re-tune that method
for accuracy. Changes to that estimation would have wide-rippling effects on
whether we autobroadcast join, so we'd need careful investigation to make sure
we don't significantly regress existing queries if we change the estimation.
Do you think it's worth it to worry about this edge-case and add
thresholding? Or is the risk/impact low enough that the extra complexity isn't
justified?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]