revans2 commented on a change in pull request #29067:
URL: https://github.com/apache/spark/pull/29067#discussion_r457412184
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -85,69 +208,55 @@ case class CachedRDDBuilder(
}
private def buildBuffers(): RDD[CachedBatch] = {
- val output = cachedPlan.output
- val cached = cachedPlan.execute().mapPartitionsInternal { rowIterator =>
- new Iterator[CachedBatch] {
- def next(): CachedBatch = {
- val columnBuilders = output.map { attribute =>
- ColumnBuilder(attribute.dataType, batchSize, attribute.name,
useCompression)
- }.toArray
-
- var rowCount = 0
- var totalSize = 0L
- while (rowIterator.hasNext && rowCount < batchSize
- && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
- val row = rowIterator.next()
-
- // Added for SPARK-6082. This assertion can be useful for
scenarios when something
- // like Hive TRANSFORM is used. The external data generation
script used in TRANSFORM
- // may result malformed rows, causing
ArrayIndexOutOfBoundsException, which is somewhat
- // hard to decipher.
- assert(
- row.numFields == columnBuilders.length,
- s"Row column number mismatch, expected ${output.size} columns, "
+
- s"but got ${row.numFields}." +
- s"\nRow content: $row")
-
- var i = 0
- totalSize = 0
- while (i < row.numFields) {
- columnBuilders(i).appendFrom(row, i)
- totalSize += columnBuilders(i).columnStats.sizeInBytes
- i += 1
- }
- rowCount += 1
- }
-
- sizeInBytesStats.add(totalSize)
- rowCountStats.add(rowCount)
-
- val stats = InternalRow.fromSeq(
- columnBuilders.flatMap(_.columnStats.collectedStatistics).toSeq)
- CachedBatch(rowCount, columnBuilders.map { builder =>
- JavaUtils.bufferToArray(builder.build())
- }, stats)
- }
-
- def hasNext: Boolean = rowIterator.hasNext
- }
- }.persist(storageLevel)
-
+ val cached = serializer.convertForCache(cachedPlan)
+ .map { batch =>
+ sizeInBytesStats.add(batch.sizeInBytes)
+ rowCountStats.add(batch.numRows)
+ batch
+ }.persist(storageLevel)
cached.setName(cachedName)
cached
}
}
object InMemoryRelation {
+ private[this] var ser: Option[CachedBatchSerializer] = None
Review comment:
I'll try to get rid of it. I am aware of the issues with double-checked
locking. In this case it is a matter of efficiency instead of correctness. The
object being created is not going to change in any meaningful way. If we
created a new one each time it would still do the right thing. The goal here
was just to remove the overhead of reflection when creating subsequent objects.
but I should have at least documented it as such.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]