cloud-fan commented on code in PR #55844:
URL: https://github.com/apache/spark/pull/55844#discussion_r3231360671
##########
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java:
##########
@@ -93,6 +95,31 @@ public static void populate(ConstantColumnVector col,
InternalRow row, int field
col.setCalendarInterval((CalendarInterval) row.get(fieldIdx, t));
} else if (pdt instanceof PhysicalVariantType) {
col.setVariant((VariantVal)row.get(fieldIdx, t));
+ } else if (pdt instanceof PhysicalStructType) {
+ StructType st = (StructType) t;
+ InternalRow inner = row.getStruct(fieldIdx, st.fields().length);
+ InternalRow tmpRow = new GenericInternalRow(1);
+ for (int i = 0; i < st.fields().length; i++) {
+ StructField field = st.fields()[i];
+ tmpRow.update(0, inner.isNullAt(i) ? null : inner.get(i,
field.dataType()));
+ // ConstantColumnVector's constructor pre-allocates struct children
with the parent's
+ // numRows, so writeToOffHeapColumnVector recurses with the right
capacity.
Review Comment:
The comment has two errors: `writeToOffHeapColumnVector` doesn't exist — the
recursive call on the next line is `populate`. And the constructor
pre-allocates struct children at `numRows = 1`, not the parent's `numRows`.
```suggestion
// ConstantColumnVector's constructor pre-allocates struct
children, so the recursive
// populate call below has a target vector to write into.
```
##########
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java:
##########
@@ -93,6 +95,31 @@ public static void populate(ConstantColumnVector col,
InternalRow row, int field
col.setCalendarInterval((CalendarInterval) row.get(fieldIdx, t));
} else if (pdt instanceof PhysicalVariantType) {
col.setVariant((VariantVal)row.get(fieldIdx, t));
+ } else if (pdt instanceof PhysicalStructType) {
+ StructType st = (StructType) t;
+ InternalRow inner = row.getStruct(fieldIdx, st.fields().length);
+ InternalRow tmpRow = new GenericInternalRow(1);
+ for (int i = 0; i < st.fields().length; i++) {
+ StructField field = st.fields()[i];
+ tmpRow.update(0, inner.isNullAt(i) ? null : inner.get(i,
field.dataType()));
+ // ConstantColumnVector's constructor pre-allocates struct children
with the parent's
+ // numRows, so writeToOffHeapColumnVector recurses with the right
capacity.
+ populate((ConstantColumnVector) col.getChild(i), tmpRow, 0);
+ }
+ } else if (pdt instanceof PhysicalArrayType || pdt instanceof
PhysicalMapType) {
+ // Allocate a 1-row off-heap backing vector to hold the constant
complex value.
+ OffHeapColumnVector backing = new OffHeapColumnVector(1, t);
Review Comment:
Hardcoding `OffHeapColumnVector` here deviates from the codebase convention.
Everywhere else that allocates a `WritableColumnVector` branches on a
`MemoryMode` flag derived from `spark.sql.columnVector.offheap.enabled` — see
`OrcFileFormat.scala:160-162`, `OrcScan.scala:70-72`,
`ColumnarEvaluatorFactory.scala:77-79`, `AggregatePushDownUtils.scala:155-157`,
`InMemoryRelation.scala:207-209`, and `VectorizedParquetRecordReader.java:165`.
`ColumnVectorUtils.toBatch` in this same file already threads `MemoryMode`
explicitly.
Would you mind threading `MemoryMode` through `populate` so this matches? A
back-compat overload keeps the test/benchmark call sites untouched:
```java
public static void populate(ConstantColumnVector col, InternalRow row, int
fieldIdx) {
populate(col, row, fieldIdx, MemoryMode.ON_HEAP);
}
public static void populate(
ConstantColumnVector col, InternalRow row, int fieldIdx, MemoryMode
memMode) { ... }
```
Then the array/map branch picks `OffHeapColumnVector` vs
`OnHeapColumnVector` from `memMode`. The three production callers all have mode
in scope (or one conf lookup away): `VectorizedParquetRecordReader` has
`MEMORY_MODE`, `OrcColumnarBatchReader` has `memoryMode`, and `FileScanRDD` can
derive it from `sparkSession.sessionState.conf`. Total impact looks like ~15
LOC across 4 files, no test changes.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorUtilsSuite.scala:
##########
@@ -134,30 +135,78 @@ class ColumnVectorUtilsSuite extends SparkFunSuite {
}
}
- testConstantColumnVector("not supported: fill map", 10,
+ testConstantColumnVector("fill array of ints", 10, ArrayType(IntegerType)) {
vector =>
+ val arr = new GenericArrayData(Array[Any](1, 2, 3, 4, 5))
+ ColumnVectorUtils.populate(vector, InternalRow(arr), 0)
+ (0 until 10).foreach { i =>
+ assert(vector.getArray(i).toIntArray === Array(1, 2, 3, 4, 5))
+ }
+ }
+
+ testConstantColumnVector("fill array of strings", 10, ArrayType(StringType))
{ vector =>
+ val arr = new GenericArrayData(Array[Any](
+ UTF8String.fromString("a"),
+ UTF8String.fromString("bb"),
+ UTF8String.fromString("ccc")))
+ ColumnVectorUtils.populate(vector, InternalRow(arr), 0)
+ (0 until 10).foreach { i =>
+ val a = vector.getArray(i)
+ assert(a.numElements() == 3)
+ assert(a.getUTF8String(0) == UTF8String.fromString("a"))
+ assert(a.getUTF8String(1) == UTF8String.fromString("bb"))
+ assert(a.getUTF8String(2) == UTF8String.fromString("ccc"))
+ }
+ }
+
+ testConstantColumnVector("fill map of int -> boolean", 10,
MapType(IntegerType, BooleanType)) { vector =>
- val message = intercept[RuntimeException] {
- ColumnVectorUtils.populate(vector, InternalRow("fakeMap"), 0)
- }.getMessage
- assert(message == "DataType MAP<INT, BOOLEAN> is not supported in column
vectorized reader.")
+ val keys = new GenericArrayData(Array[Any](1, 2, 3))
+ val values = new GenericArrayData(Array[Any](true, false, true))
+ val map = new ArrayBasedMapData(keys, values)
+ ColumnVectorUtils.populate(vector, InternalRow(map), 0)
+ (0 until 10).foreach { i =>
+ val m = vector.getMap(i)
+ assert(m.numElements() == 3)
+ assert(m.keyArray().toIntArray === Array(1, 2, 3))
+ assert(m.valueArray().toBooleanArray === Array(true, false, true))
+ }
}
- testConstantColumnVector("not supported: fill struct", 10,
+ testConstantColumnVector("fill struct", 10,
new StructType()
.add(StructField("name", StringType))
.add(StructField("age", IntegerType))) { vector =>
- val message = intercept[RuntimeException] {
- ColumnVectorUtils.populate(vector, InternalRow("fakeStruct"), 0)
- }.getMessage
- assert(message ==
- "DataType STRUCT<name: STRING, age: INT> is not supported in column
vectorized reader.")
- }
-
- testConstantColumnVector("not supported: fill array", 10,
- ArrayType(IntegerType)) { vector =>
- val message = intercept[RuntimeException] {
- ColumnVectorUtils.populate(vector, InternalRow("fakeArray"), 0)
- }.getMessage
- assert(message == "DataType ARRAY<INT> is not supported in column
vectorized reader.")
+ val row = InternalRow(UTF8String.fromString("jack"), 27)
+ ColumnVectorUtils.populate(vector, InternalRow(row), 0)
+ (0 until 10).foreach { i =>
+ assert(vector.getChild(0).getUTF8String(i) ==
UTF8String.fromString("jack"))
+ assert(vector.getChild(1).getInt(i) == 27)
+ }
+ }
+
+ testConstantColumnVector("fill nested array<struct>", 10,
+ ArrayType(new StructType()
+ .add(StructField("k", StringType))
+ .add(StructField("v", IntegerType)))) { vector =>
+ val structs = new GenericArrayData(Array[Any](
+ InternalRow(UTF8String.fromString("a"), 1),
+ InternalRow(UTF8String.fromString("bb"), 2),
+ InternalRow(null, 3)))
+ ColumnVectorUtils.populate(vector, InternalRow(structs), 0)
+ (0 until 10).foreach { _ =>
+ val a = vector.getArray(0)
Review Comment:
Other tests in this file iterate with `i` and call `vector.getArray(i)`.
This loop uses `_` and a hard-coded `0`, so it only ever reads row 0 ten times.
Functionally equivalent (`ConstantColumnVector.getArray` ignores `rowId`), but
inconsistent with the surrounding tests.
```suggestion
(0 until 10).foreach { i =>
val a = vector.getArray(i)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]