sadikovi commented on code in PR #36616:
URL: https://github.com/apache/spark/pull/36616#discussion_r881200270


##########
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java:
##########
@@ -42,6 +43,27 @@ public static OnHeapColumnVector[] allocateColumns(int 
capacity, StructType sche
     return allocateColumns(capacity, schema.fields());
   }
 
+  /**
+   * Allocates columns to store elements of each field of the schema,
+   * the constant column use `ConstantColumnVector` and others use 
`OnHeapColumnVector`,
+   * this method assumes that all constant column are at the end of schema.
+   * Capacity is the initial capacity of the vector and it will grow as 
necessary. Capacity is
+   * in number of elements, not number of bytes.
+   */
+  public static ColumnVector[] allocateColumns(

Review Comment:
   Would it be possible to factor it out into a common method?



##########
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java:
##########
@@ -42,6 +43,27 @@ public static OffHeapColumnVector[] allocateColumns(int 
capacity, StructType sch
     return allocateColumns(capacity, schema.fields());
   }
 
+  /**
+   * Allocates columns to store elements of each field of the schema,
+   * the constant column use `ConstantColumnVector` and others use 
`OffHeapColumnVector`,
+   * this method assumes that all constant column are at the end of schema.

Review Comment:
   Can we make this clearer? I think it is a very important assumption. For 
example, you can put this on a new line to emphasise significance. 



##########
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java:
##########
@@ -105,6 +105,73 @@ public static void populate(WritableColumnVector col, 
InternalRow row, int field
     }
   }
 
+  /**
+   * Fill value of `row[fieldIdx]` into `ConstantColumnVector`.
+   */
+  public static void fill(ConstantColumnVector col, InternalRow row, int 
fieldIdx) {
+    DataType t = col.dataType();
+
+    if (row.isNullAt(fieldIdx)) {
+      col.setNull();
+    } else {
+      if (t == DataTypes.BooleanType) {
+        col.setBoolean(row.getBoolean(fieldIdx));
+      } else if (t == DataTypes.BinaryType) {
+        col.setBinary(row.getBinary(fieldIdx));
+      } else if (t == DataTypes.ByteType) {
+        col.setByte(row.getByte(fieldIdx));
+      } else if (t == DataTypes.ShortType) {
+        col.setShort(row.getShort(fieldIdx));
+      } else if (t == DataTypes.IntegerType) {
+        col.setInt(row.getInt(fieldIdx));
+      } else if (t == DataTypes.LongType) {
+        col.setLong(row.getLong(fieldIdx));
+      } else if (t == DataTypes.FloatType) {
+        col.setFloat(row.getFloat(fieldIdx));
+      } else if (t == DataTypes.DoubleType) {
+        col.setDouble(row.getDouble(fieldIdx));
+      } else if (t == DataTypes.StringType) {
+        UTF8String v = row.getUTF8String(fieldIdx);
+        col.setUtf8String(v);
+      } else if (t instanceof DecimalType) {
+        DecimalType dt = (DecimalType)t;
+        Decimal d = row.getDecimal(fieldIdx, dt.precision(), dt.scale());
+        if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+          col.setInt((int)d.toUnscaledLong());
+        } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+          col.setLong(d.toUnscaledLong());
+        } else {
+          final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+          byte[] bytes = integer.toByteArray();
+          col.setBinary(bytes);
+        }
+      } else if (t instanceof CalendarIntervalType) {
+        CalendarInterval c = (CalendarInterval)row.get(fieldIdx, t);
+        // The value of `numRows` is irrelevant.
+        ConstantColumnVector monthsVector =
+          new ConstantColumnVector(1, IntegerType$.MODULE$);
+        ConstantColumnVector daysVector =
+          new ConstantColumnVector(1, IntegerType$.MODULE$);
+        ConstantColumnVector microsecondsVector =
+          new ConstantColumnVector(1, LongType$.MODULE$);
+        monthsVector.setInt(c.months);
+        daysVector.setInt(c.days);
+        microsecondsVector.setLong(c.microseconds);
+        col.setChild(0, monthsVector);
+        col.setChild(1, daysVector);
+        col.setChild(2, microsecondsVector);
+      } else if (t instanceof DateType || t instanceof YearMonthIntervalType) {
+        col.setInt(row.getInt(fieldIdx));
+      } else if (t instanceof TimestampType || t instanceof TimestampNTZType ||
+        t instanceof DayTimeIntervalType) {
+        col.setLong(row.getLong(fieldIdx));
+      } else {
+        throw new RuntimeException(String.format("DataType %s is not 
supported" +

Review Comment:
   I would suggest to fall back to the original vector type here in case it 
supports the data type to avoid regressions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to