JoshRosen commented on code in PR #36683: URL: https://github.com/apache/spark/pull/36683#discussion_r883134877
########## sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala: ########## @@ -190,32 +191,30 @@ private[sql] object ArrowConverters { } /** - * Create a DataFrame from an RDD of serialized ArrowRecordBatches. + * Create a DataFrame from an iterator of serialized ArrowRecordBatches. */ - private[sql] def toDataFrame( - arrowBatchRDD: JavaRDD[Array[Byte]], + def toDataFrame( + arrowBatches: Iterator[Array[Byte]], schemaString: String, session: SparkSession): DataFrame = { - val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] - val timeZoneId = session.sessionState.conf.sessionLocalTimeZone - val rdd = arrowBatchRDD.rdd.mapPartitions { iter => - val context = TaskContext.get() - ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context) - } - session.internalCreateDataFrame(rdd.setName("arrow"), schema) + val attrs = DataType.fromJson(schemaString).asInstanceOf[StructType].toAttributes + val data = ArrowConverters.fromBatchIterator( + arrowBatches, + DataType.fromJson(schemaString).asInstanceOf[StructType], + session.sessionState.conf.sessionLocalTimeZone, + TaskContext.get()) + // Project it. Otherwise, the Arrow column vectors will be closed and released out. + val proj = UnsafeProjection.create(attrs, attrs) + Dataset.ofRows(session, LocalRelation(attrs, data.map(r => proj(r).copy()).toArray)) Review Comment: I agree that the `ConvertToLocalRelation` optimization can greatly improve performance for small tables, especially because local relations have an accurate implementation of `computeStats()` which allows us to benefit from other optimizations like auto broadcast joins. I suppose it's possible that Arrow batches could be _much_ more compressible than UnsafeRows such that we experience a huge inflation in memory usage. For example, let's say I have an Arrow row batch containing a huge number of rows with all boolean fields: these booleans will be encoded space-efficiently in Arrow but will experience huge bloat when converted into UnsafeRows. I don't have any sense of whether users are doing this today, though. _Probably_ most cases are small enough that this wouldn't be an issue, but that's just speculation (especially because I'm not super familiar with Pandas-on-Spark usecases). If we _are_ concerned about this, though, then here's some brainstorming on potential options: - We could add a configuration flag so that users can opt-in to revert back to the old behavior. Users would have to somehow know/discover this flag after they hit performance problems, though. - We might be able to come up with reasonably accurate heuristic estimates of the dataset's UnsafeRow size based on the schema and then choose which path to use based on the size threshold. Our new flag could be a size threshold flag (like autobroadcast join) rather than a boolean flag. - As a strawman, the threshold could be 10 MB (the default autobroadcast join threshold). - For the estimation, we can use `EstimationUtils.getSizePerRow(attributes) * numRows`. - As an aside, I think that some of these estimates are potentially inaccurate because they're based on `DataType.defaultSize` which I think tends to underestimate UnsafeRow overheads for small primitive fields (e.g. it estimates the cost of a boolean as 1 byte when in reality it's 8 bytes in an UnsafeRow). That said, I think we should probably use the same estimation logic everywhere and should separately investigate if we want to re-tune that method for accuracy. Changes to that estimation would have wide-rippling effects on whether we autobroadcast join, so we'd need careful investigation to make sure we don't significantly regress existing queries if we change the estimation. Do you think it's worth it to worry about this edge-case and add thresholding? Or is the risk/impact low enough that the extra complexity isn't justified? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org