cloud-fan commented on code in PR #55844:
URL: https://github.com/apache/spark/pull/55844#discussion_r3231360671


##########
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java:
##########
@@ -93,6 +95,31 @@ public static void populate(ConstantColumnVector col, 
InternalRow row, int field
         col.setCalendarInterval((CalendarInterval) row.get(fieldIdx, t));
       } else if (pdt instanceof PhysicalVariantType) {
         col.setVariant((VariantVal)row.get(fieldIdx, t));
+      } else if (pdt instanceof PhysicalStructType) {
+        StructType st = (StructType) t;
+        InternalRow inner = row.getStruct(fieldIdx, st.fields().length);
+        InternalRow tmpRow = new GenericInternalRow(1);
+        for (int i = 0; i < st.fields().length; i++) {
+          StructField field = st.fields()[i];
+          tmpRow.update(0, inner.isNullAt(i) ? null : inner.get(i, 
field.dataType()));
+          // ConstantColumnVector's constructor pre-allocates struct children 
with the parent's
+          // numRows, so writeToOffHeapColumnVector recurses with the right 
capacity.

Review Comment:
   The comment has two errors: `writeToOffHeapColumnVector` doesn't exist — the 
recursive call on the next line is `populate`. And the constructor 
pre-allocates struct children at `numRows = 1`, not the parent's `numRows`.
   
   ```suggestion
             // ConstantColumnVector's constructor pre-allocates struct 
children, so the recursive
             // populate call below has a target vector to write into.
   ```



##########
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java:
##########
@@ -93,6 +95,31 @@ public static void populate(ConstantColumnVector col, 
InternalRow row, int field
         col.setCalendarInterval((CalendarInterval) row.get(fieldIdx, t));
       } else if (pdt instanceof PhysicalVariantType) {
         col.setVariant((VariantVal)row.get(fieldIdx, t));
+      } else if (pdt instanceof PhysicalStructType) {
+        StructType st = (StructType) t;
+        InternalRow inner = row.getStruct(fieldIdx, st.fields().length);
+        InternalRow tmpRow = new GenericInternalRow(1);
+        for (int i = 0; i < st.fields().length; i++) {
+          StructField field = st.fields()[i];
+          tmpRow.update(0, inner.isNullAt(i) ? null : inner.get(i, 
field.dataType()));
+          // ConstantColumnVector's constructor pre-allocates struct children 
with the parent's
+          // numRows, so writeToOffHeapColumnVector recurses with the right 
capacity.
+          populate((ConstantColumnVector) col.getChild(i), tmpRow, 0);
+        }
+      } else if (pdt instanceof PhysicalArrayType || pdt instanceof 
PhysicalMapType) {
+        // Allocate a 1-row off-heap backing vector to hold the constant 
complex value.
+        OffHeapColumnVector backing = new OffHeapColumnVector(1, t);

Review Comment:
   Hardcoding `OffHeapColumnVector` here deviates from the codebase convention. 
Everywhere else that allocates a `WritableColumnVector` branches on a 
`MemoryMode` flag derived from `spark.sql.columnVector.offheap.enabled` — see 
`OrcFileFormat.scala:160-162`, `OrcScan.scala:70-72`, 
`ColumnarEvaluatorFactory.scala:77-79`, `AggregatePushDownUtils.scala:155-157`, 
`InMemoryRelation.scala:207-209`, and `VectorizedParquetRecordReader.java:165`. 
`ColumnVectorUtils.toBatch` in this same file already threads `MemoryMode` 
explicitly.
   
   Would you mind threading `MemoryMode` through `populate` so this matches? A 
back-compat overload keeps the test/benchmark call sites untouched:
   
   ```java
   public static void populate(ConstantColumnVector col, InternalRow row, int 
fieldIdx) {
     populate(col, row, fieldIdx, MemoryMode.ON_HEAP);
   }
   public static void populate(
       ConstantColumnVector col, InternalRow row, int fieldIdx, MemoryMode 
memMode) { ... }
   ```
   
   Then the array/map branch picks `OffHeapColumnVector` vs 
`OnHeapColumnVector` from `memMode`. The three production callers all have mode 
in scope (or one conf lookup away): `VectorizedParquetRecordReader` has 
`MEMORY_MODE`, `OrcColumnarBatchReader` has `memoryMode`, and `FileScanRDD` can 
derive it from `sparkSession.sessionState.conf`. Total impact looks like ~15 
LOC across 4 files, no test changes.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorUtilsSuite.scala:
##########
@@ -134,30 +135,78 @@ class ColumnVectorUtilsSuite extends SparkFunSuite {
     }
   }
 
-  testConstantColumnVector("not supported: fill map", 10,
+  testConstantColumnVector("fill array of ints", 10, ArrayType(IntegerType)) { 
vector =>
+    val arr = new GenericArrayData(Array[Any](1, 2, 3, 4, 5))
+    ColumnVectorUtils.populate(vector, InternalRow(arr), 0)
+    (0 until 10).foreach { i =>
+      assert(vector.getArray(i).toIntArray === Array(1, 2, 3, 4, 5))
+    }
+  }
+
+  testConstantColumnVector("fill array of strings", 10, ArrayType(StringType)) 
{ vector =>
+    val arr = new GenericArrayData(Array[Any](
+      UTF8String.fromString("a"),
+      UTF8String.fromString("bb"),
+      UTF8String.fromString("ccc")))
+    ColumnVectorUtils.populate(vector, InternalRow(arr), 0)
+    (0 until 10).foreach { i =>
+      val a = vector.getArray(i)
+      assert(a.numElements() == 3)
+      assert(a.getUTF8String(0) == UTF8String.fromString("a"))
+      assert(a.getUTF8String(1) == UTF8String.fromString("bb"))
+      assert(a.getUTF8String(2) == UTF8String.fromString("ccc"))
+    }
+  }
+
+  testConstantColumnVector("fill map of int -> boolean", 10,
     MapType(IntegerType, BooleanType)) { vector =>
-    val message = intercept[RuntimeException] {
-      ColumnVectorUtils.populate(vector, InternalRow("fakeMap"), 0)
-    }.getMessage
-    assert(message == "DataType MAP<INT, BOOLEAN> is not supported in column 
vectorized reader.")
+    val keys = new GenericArrayData(Array[Any](1, 2, 3))
+    val values = new GenericArrayData(Array[Any](true, false, true))
+    val map = new ArrayBasedMapData(keys, values)
+    ColumnVectorUtils.populate(vector, InternalRow(map), 0)
+    (0 until 10).foreach { i =>
+      val m = vector.getMap(i)
+      assert(m.numElements() == 3)
+      assert(m.keyArray().toIntArray === Array(1, 2, 3))
+      assert(m.valueArray().toBooleanArray === Array(true, false, true))
+    }
   }
 
-  testConstantColumnVector("not supported: fill struct", 10,
+  testConstantColumnVector("fill struct", 10,
     new StructType()
       .add(StructField("name", StringType))
       .add(StructField("age", IntegerType))) { vector =>
-    val message = intercept[RuntimeException] {
-      ColumnVectorUtils.populate(vector, InternalRow("fakeStruct"), 0)
-    }.getMessage
-    assert(message ==
-      "DataType STRUCT<name: STRING, age: INT> is not supported in column 
vectorized reader.")
-  }
-
-  testConstantColumnVector("not supported: fill array", 10,
-    ArrayType(IntegerType)) { vector =>
-    val message = intercept[RuntimeException] {
-      ColumnVectorUtils.populate(vector, InternalRow("fakeArray"), 0)
-    }.getMessage
-    assert(message == "DataType ARRAY<INT> is not supported in column 
vectorized reader.")
+    val row = InternalRow(UTF8String.fromString("jack"), 27)
+    ColumnVectorUtils.populate(vector, InternalRow(row), 0)
+    (0 until 10).foreach { i =>
+      assert(vector.getChild(0).getUTF8String(i) == 
UTF8String.fromString("jack"))
+      assert(vector.getChild(1).getInt(i) == 27)
+    }
+  }
+
+  testConstantColumnVector("fill nested array<struct>", 10,
+    ArrayType(new StructType()
+      .add(StructField("k", StringType))
+      .add(StructField("v", IntegerType)))) { vector =>
+    val structs = new GenericArrayData(Array[Any](
+      InternalRow(UTF8String.fromString("a"), 1),
+      InternalRow(UTF8String.fromString("bb"), 2),
+      InternalRow(null, 3)))
+    ColumnVectorUtils.populate(vector, InternalRow(structs), 0)
+    (0 until 10).foreach { _ =>
+      val a = vector.getArray(0)

Review Comment:
   Other tests in this file iterate with `i` and call `vector.getArray(i)`. 
This loop uses `_` and a hard-coded `0`, so it only ever reads row 0 ten times. 
Functionally equivalent (`ConstantColumnVector.getArray` ignores `rowId`), but 
inconsistent with the surrounding tests.
   
   ```suggestion
       (0 until 10).foreach { i =>
         val a = vector.getArray(i)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to