revans2 commented on a change in pull request #29067:
URL: https://github.com/apache/spark/pull/29067#discussion_r458988318



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -85,77 +232,81 @@ case class CachedRDDBuilder(
   }
 
   private def buildBuffers(): RDD[CachedBatch] = {
-    val output = cachedPlan.output
-    val cached = cachedPlan.execute().mapPartitionsInternal { rowIterator =>
-      new Iterator[CachedBatch] {
-        def next(): CachedBatch = {
-          val columnBuilders = output.map { attribute =>
-            ColumnBuilder(attribute.dataType, batchSize, attribute.name, 
useCompression)
-          }.toArray
-
-          var rowCount = 0
-          var totalSize = 0L
-          while (rowIterator.hasNext && rowCount < batchSize
-            && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
-            val row = rowIterator.next()
-
-            // Added for SPARK-6082. This assertion can be useful for 
scenarios when something
-            // like Hive TRANSFORM is used. The external data generation 
script used in TRANSFORM
-            // may result malformed rows, causing 
ArrayIndexOutOfBoundsException, which is somewhat
-            // hard to decipher.
-            assert(
-              row.numFields == columnBuilders.length,
-              s"Row column number mismatch, expected ${output.size} columns, " 
+
-                s"but got ${row.numFields}." +
-                s"\nRow content: $row")
-
-            var i = 0
-            totalSize = 0
-            while (i < row.numFields) {
-              columnBuilders(i).appendFrom(row, i)
-              totalSize += columnBuilders(i).columnStats.sizeInBytes
-              i += 1
-            }
-            rowCount += 1
-          }
-
-          sizeInBytesStats.add(totalSize)
-          rowCountStats.add(rowCount)
-
-          val stats = InternalRow.fromSeq(
-            columnBuilders.flatMap(_.columnStats.collectedStatistics).toSeq)
-          CachedBatch(rowCount, columnBuilders.map { builder =>
-            JavaUtils.bufferToArray(builder.build())
-          }, stats)
-        }
-
-        def hasNext: Boolean = rowIterator.hasNext
-      }
+    val cb = if (cachedPlan.supportsColumnar) {
+      serializer.convertForCacheColumnar(cachedPlan.executeColumnar(),
+        cachedPlan.output,
+        storageLevel,
+        cachedPlan.conf)
+    } else {
+      serializer.convertForCache(cachedPlan.execute(),
+        cachedPlan.output,
+        storageLevel,
+        cachedPlan.conf)
+    }
+    val cached = cb.map { batch =>
+      sizeInBytesStats.add(batch.sizeInBytes)
+      rowCountStats.add(batch.numRows)
+      batch
     }.persist(storageLevel)
-
     cached.setName(cachedName)
     cached
   }
 }
 
 object InMemoryRelation {
 
+  private[this] var ser: Option[CachedBatchSerializer] = None
+  private[this] def getSerializer(sqlConf: SQLConf): CachedBatchSerializer = 
synchronized {
+    if (ser.isEmpty) {
+      val serName = sqlConf.getConf(StaticSQLConf.SPARK_CACHE_SERIALIZER)
+      val serClass = Utils.classForName(serName)
+      val instance = 
serClass.getConstructor().newInstance().asInstanceOf[CachedBatchSerializer]
+      ser = Some(instance)
+    }
+    ser.get
+  }
+
   def apply(
-      useCompression: Boolean,
-      batchSize: Int,
+      storageLevel: StorageLevel,
+      qe: QueryExecution,
+      tableName: Option[String]): InMemoryRelation = {
+    val optimizedPlan = qe.optimizedPlan
+    val serializer = getSerializer(optimizedPlan.conf)
+    val child = if (serializer.supportsColumnarInput(optimizedPlan.output)) {

Review comment:
       I'll switch over to doing `isInstanceOf[ColumnarToRow]` just to get this 
unblocked.
   
   I don't consider the change I made a hack. It is a relatively small and 
focused change. It also will result in less implicit coupling between the cache 
code and the columnar planning code.
   
   I like SPARK-32334 too, but for what I have proposed so far it really only 
covers transformations to arrow. If you have ideas on how you want it to impact 
these changes please add some comments there so we can discuss it and get a 
better idea of how it should all fit together.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to