revans2 commented on a change in pull request #29067:
URL: https://github.com/apache/spark/pull/29067#discussion_r458215059
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -85,77 +232,81 @@ case class CachedRDDBuilder(
}
private def buildBuffers(): RDD[CachedBatch] = {
- val output = cachedPlan.output
- val cached = cachedPlan.execute().mapPartitionsInternal { rowIterator =>
- new Iterator[CachedBatch] {
- def next(): CachedBatch = {
- val columnBuilders = output.map { attribute =>
- ColumnBuilder(attribute.dataType, batchSize, attribute.name,
useCompression)
- }.toArray
-
- var rowCount = 0
- var totalSize = 0L
- while (rowIterator.hasNext && rowCount < batchSize
- && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
- val row = rowIterator.next()
-
- // Added for SPARK-6082. This assertion can be useful for
scenarios when something
- // like Hive TRANSFORM is used. The external data generation
script used in TRANSFORM
- // may result malformed rows, causing
ArrayIndexOutOfBoundsException, which is somewhat
- // hard to decipher.
- assert(
- row.numFields == columnBuilders.length,
- s"Row column number mismatch, expected ${output.size} columns, "
+
- s"but got ${row.numFields}." +
- s"\nRow content: $row")
-
- var i = 0
- totalSize = 0
- while (i < row.numFields) {
- columnBuilders(i).appendFrom(row, i)
- totalSize += columnBuilders(i).columnStats.sizeInBytes
- i += 1
- }
- rowCount += 1
- }
-
- sizeInBytesStats.add(totalSize)
- rowCountStats.add(rowCount)
-
- val stats = InternalRow.fromSeq(
- columnBuilders.flatMap(_.columnStats.collectedStatistics).toSeq)
- CachedBatch(rowCount, columnBuilders.map { builder =>
- JavaUtils.bufferToArray(builder.build())
- }, stats)
- }
-
- def hasNext: Boolean = rowIterator.hasNext
- }
+ val cb = if (cachedPlan.supportsColumnar) {
+ serializer.convertForCacheColumnar(cachedPlan.executeColumnar(),
+ cachedPlan.output,
+ storageLevel,
+ cachedPlan.conf)
+ } else {
+ serializer.convertForCache(cachedPlan.execute(),
+ cachedPlan.output,
+ storageLevel,
+ cachedPlan.conf)
+ }
+ val cached = cb.map { batch =>
+ sizeInBytesStats.add(batch.sizeInBytes)
+ rowCountStats.add(batch.numRows)
+ batch
}.persist(storageLevel)
-
cached.setName(cachedName)
cached
}
}
object InMemoryRelation {
+ private[this] var ser: Option[CachedBatchSerializer] = None
+ private[this] def getSerializer(sqlConf: SQLConf): CachedBatchSerializer =
synchronized {
+ if (ser.isEmpty) {
+ val serName = sqlConf.getConf(StaticSQLConf.SPARK_CACHE_SERIALIZER)
+ val serClass = Utils.classForName(serName)
+ val instance =
serClass.getConstructor().newInstance().asInstanceOf[CachedBatchSerializer]
+ ser = Some(instance)
+ }
+ ser.get
+ }
+
def apply(
- useCompression: Boolean,
- batchSize: Int,
+ storageLevel: StorageLevel,
+ qe: QueryExecution,
+ tableName: Option[String]): InMemoryRelation = {
+ val optimizedPlan = qe.optimizedPlan
+ val serializer = getSerializer(optimizedPlan.conf)
+ val child = if (serializer.supportsColumnarInput(optimizedPlan.output)) {
Review comment:
Sorry I didn't explain. I wanted to avoid reflection if possible,
because our GPU plugin can replace `ColumnarToRow` with a version that also
pulls data back from the `GPU`, and with
(SPARK-32334)[https://issues.apache.org/jira/browse/SPARK-32334] there is the
possibility around that more things related to columnar transitions might
change. So instead of trying to pick apart the incoming `SparkPlan` to remove
the `ColumnarToRow`, I added an API to `QueryExecution` so it can generate a
plan without the transition in it.
If you would prefer me to try and pick apart the plan I can do that instead.
I just thought this was a cleaner approach.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]