Github user kiszk commented on a diff in the pull request:
https://github.com/apache/spark/pull/17436#discussion_r151352379
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
---
@@ -60,9 +62,15 @@ case class InMemoryTableScanExec(
private lazy val columnarBatchSchema = new
StructType(columnIndices.map(i => relationSchema(i)))
+ private lazy val memoryMode =
SparkEnv.get.memoryManager.tungstenMemoryMode
+
private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch):
ColumnarBatch = {
val rowCount = cachedColumnarBatch.numRows
- val columnVectors = OnHeapColumnVector.allocateColumns(rowCount,
columnarBatchSchema)
+ val columnVectors = if (memoryMode == MemoryMode.ON_HEAP) {
+ OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
+ } else {
+ OffHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
--- End diff --
For now, I added `context.addTaskCompletionListener`, but I did not add
`batch.close` in generated code.
This is because ParquetReader reuses a `ColumnarBatch. Thus, to call
`batch.close` causes runtime exception during the reuse.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]