cloud-fan commented on a change in pull request #32717:
URL: https://github.com/apache/spark/pull/32717#discussion_r646311510
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
##########
@@ -143,3 +143,109 @@ class CachedBatchSerializerSuite extends QueryTest with
SharedSparkSession {
}
}
}
+
+object DummyAllocator {
+ private var allocated: Long = 0
+ def alloc(size: Long): Unit = synchronized {
+ allocated += size
+ }
+ def release(size: Long): Unit = synchronized {
+ allocated -= size
+ }
+ def getAllocatedMemory: Long = synchronized {
+ allocated
+ }
+}
+
+case class RefCountedCachedBatch(
+ numRows: Int,
+ stats: InternalRow,
+ size: Long,
+ cachedBatch: CachedBatch) extends SimpleMetricsCachedBatch with
AutoCloseable {
+ DummyAllocator.alloc(size)
+ var allocated_size: Long = size
+ override def close(): Unit = synchronized {
+ DummyAllocator.release(allocated_size)
+ allocated_size = 0
+ }
+ override def sizeInBytes: Long = allocated_size
+}
+
+class RefCountedTestCachedBatchSerializer extends DefaultCachedBatchSerializer
{
+
+ override def convertInternalRowToCachedBatch(
+ input: RDD[InternalRow],
+ schema: Seq[Attribute],
+ storageLevel: StorageLevel,
+ conf: SQLConf): RDD[CachedBatch] = {
+ val batchSize = conf.columnBatchSize
+ val useCompression = conf.useCompression
+ val cachedBatchRdd = convertForCacheInternal(input, schema, batchSize,
useCompression)
+ cachedBatchRdd.mapPartitionsInternal { cachedBatchIter =>
+ cachedBatchIter.map(cachedBatch => {
+ val actualCachedBatch = cachedBatch.asInstanceOf[DefaultCachedBatch]
+ new RefCountedCachedBatch(
+ actualCachedBatch.numRows,
+ actualCachedBatch.stats,
+ 100,
+ cachedBatch)
+ })
+ }
+ }
+
+ override def convertCachedBatchToInternalRow(
+ input: RDD[CachedBatch],
+ cacheAttributes: Seq[Attribute],
+ selectedAttributes: Seq[Attribute],
+ conf: SQLConf): RDD[InternalRow] = {
+ val actualCachedBatchIter = input.mapPartitionsInternal { cachedBatchIter
=>
+ cachedBatchIter.map(_.asInstanceOf[RefCountedCachedBatch].cachedBatch)
+ }
+ super.convertCachedBatchToInternalRow(
+ actualCachedBatchIter,
+ cacheAttributes,
+ selectedAttributes,
+ conf)
+ }
+
+ override def supportsColumnarOutput(schema: StructType): Boolean = false
+ override def supportsColumnarInput(schema: Seq[Attribute]): Boolean = false
+}
+
+class RefCountedTestCachedBatchSerializerSuite extends QueryTest with
SharedSparkSession {
Review comment:
shall we put it in a new file?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]