sunchao commented on code in PR #36616:
URL: https://github.com/apache/spark/pull/36616#discussion_r908025111


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala:
##########
@@ -607,6 +607,38 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
     }
   }
 
+  def vectorizedScanPartitionColumnsBenchmark(values: Int, pColumns: Int): 
Unit = {

Review Comment:
   hmm not really sure if this benchmark is necessary. What does it compare? 



##########
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java:
##########
@@ -235,4 +303,37 @@ public static ColumnarBatch toBatch(
     batch.setNumRows(n);
     return batch;
   }
+
+  /**
+   * <b>This method assumes that all constant column are at the end of schema
+   * and `constantColumnLength` represents the number of constant column.<b/>
+   *
+   * This method allocates columns to store elements of each field of the 
schema,
+   * the data columns use `OffHeapColumnVector` when `useOffHeap` is true and
+   * use `OnHeapColumnVector` when `useOffHeap` is false, the constant columns
+   * always use `ConstantColumnVector`.
+   *
+   * Capacity is the initial capacity of the vector, and it will grow as 
necessary.
+   * Capacity is in number of elements, not number of bytes.
+   */
+  public static ColumnVector[] allocateColumns(

Review Comment:
   I see, it seems fine to leave it in this class.



##########
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java:
##########
@@ -243,18 +243,17 @@ private void initBatch(
     for (StructField f: sparkSchema.fields()) {
       batchSchema = batchSchema.add(f);
     }
+    int constantColumnLength = 0;
     if (partitionColumns != null) {
       for (StructField f : partitionColumns.fields()) {
         batchSchema = batchSchema.add(f);
       }
+      constantColumnLength = partitionColumns.fields().length;
     }
 
-    WritableColumnVector[] vectors;
-    if (memMode == MemoryMode.OFF_HEAP) {
-      vectors = OffHeapColumnVector.allocateColumns(capacity, batchSchema);
-    } else {
-      vectors = OnHeapColumnVector.allocateColumns(capacity, batchSchema);
-    }
+    ColumnVector[] vectors = ColumnVectorUtils.allocateColumns(
+      capacity, batchSchema, memMode == MemoryMode.OFF_HEAP /* useOffHeap */, 
constantColumnLength);

Review Comment:
   nit: why we need `/* useOffHeap */`?



##########
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java:
##########
@@ -105,6 +106,61 @@ public static void populate(WritableColumnVector col, 
InternalRow row, int field
     }
   }
 
+  /**
+   * Populates the value of `row[fieldIdx]` into `ConstantColumnVector`.
+   */
+  public static void populate(ConstantColumnVector col, InternalRow row, int 
fieldIdx) {
+    DataType t = col.dataType();
+
+    if (row.isNullAt(fieldIdx)) {
+      col.setNull();
+    } else {
+      if (t == DataTypes.BooleanType) {
+        col.setBoolean(row.getBoolean(fieldIdx));
+      } else if (t == DataTypes.BinaryType) {
+        col.setBinary(row.getBinary(fieldIdx));
+      } else if (t == DataTypes.ByteType) {
+        col.setByte(row.getByte(fieldIdx));
+      } else if (t == DataTypes.ShortType) {
+        col.setShort(row.getShort(fieldIdx));
+      } else if (t == DataTypes.IntegerType) {
+        col.setInt(row.getInt(fieldIdx));
+      } else if (t == DataTypes.LongType) {
+        col.setLong(row.getLong(fieldIdx));
+      } else if (t == DataTypes.FloatType) {
+        col.setFloat(row.getFloat(fieldIdx));
+      } else if (t == DataTypes.DoubleType) {
+        col.setDouble(row.getDouble(fieldIdx));
+      } else if (t == DataTypes.StringType) {
+        UTF8String v = row.getUTF8String(fieldIdx);
+        col.setUtf8String(v);
+      } else if (t instanceof DecimalType) {
+        DecimalType dt = (DecimalType) t;
+        Decimal d = row.getDecimal(fieldIdx, dt.precision(), dt.scale());
+        if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+          col.setInt((int)d.toUnscaledLong());
+        } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+          col.setLong(d.toUnscaledLong());
+        } else {
+          final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+          byte[] bytes = integer.toByteArray();
+          col.setBinary(bytes);
+        }
+      } else if (t instanceof CalendarIntervalType) {
+        // The value of `numRows` is irrelevant.
+        col.setCalendarInterval((CalendarInterval)row.get(fieldIdx, t));

Review Comment:
   nit: space after `(CalendarInterval)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to