revans2 commented on a change in pull request #29067:
URL: https://github.com/apache/spark/pull/29067#discussion_r463737044



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -85,77 +241,96 @@ case class CachedRDDBuilder(
   }
 
   private def buildBuffers(): RDD[CachedBatch] = {
-    val output = cachedPlan.output
-    val cached = cachedPlan.execute().mapPartitionsInternal { rowIterator =>
-      new Iterator[CachedBatch] {
-        def next(): CachedBatch = {
-          val columnBuilders = output.map { attribute =>
-            ColumnBuilder(attribute.dataType, batchSize, attribute.name, 
useCompression)
-          }.toArray
-
-          var rowCount = 0
-          var totalSize = 0L
-          while (rowIterator.hasNext && rowCount < batchSize
-            && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
-            val row = rowIterator.next()
-
-            // Added for SPARK-6082. This assertion can be useful for 
scenarios when something
-            // like Hive TRANSFORM is used. The external data generation 
script used in TRANSFORM
-            // may result malformed rows, causing 
ArrayIndexOutOfBoundsException, which is somewhat
-            // hard to decipher.
-            assert(
-              row.numFields == columnBuilders.length,
-              s"Row column number mismatch, expected ${output.size} columns, " 
+
-                s"but got ${row.numFields}." +
-                s"\nRow content: $row")
-
-            var i = 0
-            totalSize = 0
-            while (i < row.numFields) {
-              columnBuilders(i).appendFrom(row, i)
-              totalSize += columnBuilders(i).columnStats.sizeInBytes
-              i += 1
-            }
-            rowCount += 1
-          }
-
-          sizeInBytesStats.add(totalSize)
-          rowCountStats.add(rowCount)
-
-          val stats = InternalRow.fromSeq(
-            columnBuilders.flatMap(_.columnStats.collectedStatistics).toSeq)
-          CachedBatch(rowCount, columnBuilders.map { builder =>
-            JavaUtils.bufferToArray(builder.build())
-          }, stats)
-        }
-
-        def hasNext: Boolean = rowIterator.hasNext
-      }
+    val cb = if (cachedPlan.supportsColumnar) {
+      serializer.convertColumnarBatchToCachedBatch(

Review comment:
       Yes and No.  If you go through the front door and create an 
`InMemoryRelation` using `InMemoryRelation.apply`, but not the one made for 
testing, then it guarantees that the only time you get a `SparkPlan` that 
`supportsColumnar` is when the serializer also `supportsColumnarInput`. You 
could purposely construct an `InMemoryReplaction` that does not match this, but 
you would have to go out of your way to do it.  I can add in an assertion to an 
InMemoryRelation constructor to make sure that it is correct if you want me to.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to