c21 commented on a change in pull request #35068:
URL: https://github.com/apache/spark/pull/35068#discussion_r778602285
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
##########
@@ -0,0 +1,212 @@
+package org.apache.spark.sql.execution.vectorized;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarMap;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/**
+ * This class adds the constant support to ColumnVector.
+ * It supports all the types and contains put APIs,
+ * which will put the exact same value to all rows.
+ *
+ * Capacity: The vector only stores one copy of the data, and acts as an
unbounded vector
+ * (get from any row will return the same value)
+ */
+public class ConstantColumnVector extends ColumnVector {
Review comment:
I am thinking whether we should extend `WritableColumnVector` instead,
so we can easily leverage constant column vector to represent partition columns.
It seems for partition columns, we are doing copying of same value per row
([Parquet](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L258)
and
[ORC](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java#L171)).
A future improvement is to use the constant column vector we are introducing
here to avoid unnecessary operations.
@cloud-fan WDYT?
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
##########
@@ -0,0 +1,212 @@
+package org.apache.spark.sql.execution.vectorized;
Review comment:
Let's add the Apache license header similar to other files.
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
##########
@@ -0,0 +1,212 @@
+package org.apache.spark.sql.execution.vectorized;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarMap;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/**
+ * This class adds the constant support to ColumnVector.
+ * It supports all the types and contains put APIs,
+ * which will put the exact same value to all rows.
+ *
+ * Capacity: The vector only stores one copy of the data, and acts as an
unbounded vector
+ * (get from any row will return the same value)
+ */
+public class ConstantColumnVector extends ColumnVector {
+
+ private byte nullData;
+ private byte byteData;
+ private short shortData;
+ private int intData;
+ private long longData;
+ private float floatData;
+ private double doubleData;
+ private byte[] byteArrayData;
+ private ConstantColumnVector childData;
+ private ColumnarArray arrayData;
+ private ColumnarMap mapData;
+
+ /**
+ * Sets up the data type of this constant column vector.
+ * @param type
+ */
+ public ConstantColumnVector(DataType type) {
+ super(type);
+ }
+
+ @Override
+ public void close() {
+ byteArrayData = null;
+ childData = null;
+ arrayData = null;
+ mapData = null;
+ }
+
+ @Override
+ public boolean hasNull() {
+ return nullData == 1;
+ }
+
+ @Override
+ public int numNulls() {
+ return -1;
Review comment:
why `-1` here?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
##########
@@ -130,67 +130,37 @@ class FileScanRDD(
UnsafeProjection.create(joinedExpressions)
}
+ // metadata constant column vectors, will only be updated when the
current file is changed
+ val metadataVectors: Seq[ConstantColumnVector] =
+ metadataColumns.map(m => new ConstantColumnVector(m.dataType))
+
/**
* For each partitioned file, metadata columns for each record in the
file are exactly same.
- * Only update metadata row when `currentFile` is changed.
+ * Only update metadata row and vectors when `currentFile` is changed.
*/
- private def updateMetadataRow(): Unit = {
- if (metadataColumns.nonEmpty && currentFile != null) {
- val path = new Path(currentFile.filePath)
- metadataColumns.zipWithIndex.foreach { case (attr, i) =>
- attr.name match {
- case FILE_PATH => metadataRow.update(i,
UTF8String.fromString(path.toString))
- case FILE_NAME => metadataRow.update(i,
UTF8String.fromString(path.getName))
- case FILE_SIZE => metadataRow.update(i, currentFile.fileSize)
- case FILE_MODIFICATION_TIME =>
- // the modificationTime from the file is in millisecond,
- // while internally, the TimestampType is stored in microsecond
- metadataRow.update(i, currentFile.modificationTime * 1000L)
- }
+ private def updateMetadataData(): Unit = {
Review comment:
nit: the name `updateMetadataData` is kind of hard to read. btw why we
delete `createMetadataColumnVector`, and mix vector and row together here? IMO
it's better to separate code paths for non-vectorized and vectorized scan if
possible.
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
##########
@@ -0,0 +1,212 @@
+package org.apache.spark.sql.execution.vectorized;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarMap;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/**
+ * This class adds the constant support to ColumnVector.
+ * It supports all the types and contains put APIs,
+ * which will put the exact same value to all rows.
+ *
+ * Capacity: The vector only stores one copy of the data, and acts as an
unbounded vector
+ * (get from any row will return the same value)
+ */
+public class ConstantColumnVector extends ColumnVector {
+
+ private byte nullData;
+ private byte byteData;
+ private short shortData;
+ private int intData;
+ private long longData;
+ private float floatData;
+ private double doubleData;
+ private byte[] byteArrayData;
+ private ConstantColumnVector childData;
+ private ColumnarArray arrayData;
+ private ColumnarMap mapData;
+
+ /**
+ * Sets up the data type of this constant column vector.
+ * @param type
+ */
+ public ConstantColumnVector(DataType type) {
+ super(type);
+ }
+
+ @Override
+ public void close() {
+ byteArrayData = null;
+ childData = null;
+ arrayData = null;
+ mapData = null;
+ }
+
+ @Override
+ public boolean hasNull() {
+ return nullData == 1;
+ }
+
+ @Override
+ public int numNulls() {
+ return -1;
+ }
+
+ @Override
+ public boolean isNullAt(int rowId) {
+ return nullData == 1;
+ }
+
+ public void putNull() {
+ nullData = (byte) 1;
+ }
+
+ public void putNotNull() {
+ nullData = (byte) 0;
+ }
+
+ @Override
+ public boolean getBoolean(int rowId) {
+ return byteData == 1;
+ }
+
+ public void putBoolean(boolean value) {
+ byteData = (byte) ((value) ? 1 : 0);
+ }
+
+ @Override
+ public byte getByte(int rowId) {
+ return byteData;
+ }
+
+ public void putByte(byte value) {
+ byteData = value;
+ }
+
+ @Override
+ public short getShort(int rowId) {
+ return shortData;
+ }
+
+ public void putShort(short value) {
+ shortData = value;
+ }
+
+ @Override
+ public int getInt(int rowId) {
+ return intData;
+ }
+
+ public void putInt(int value) {
+ intData = value;
+ }
+
+ @Override
+ public long getLong(int rowId) {
+ return longData;
+ }
+
+ public void putLong(long value) {
+ longData = value;
+ }
+
+ @Override
+ public float getFloat(int rowId) {
+ return floatData;
+ }
+
+ public void putFloat(float value) {
+ floatData = value;
+ }
+
+ @Override
+ public double getDouble(int rowId) {
+ return doubleData;
+ }
+
+ public void putDouble(double value) {
+ doubleData = value;
+ }
+
+ @Override
+ public ColumnarArray getArray(int rowId) {
+ return arrayData;
+ }
+
+ public void putArray(ColumnarArray value) {
+ arrayData = value;
+ }
+
+ @Override
+ public ColumnarMap getMap(int ordinal) {
+ return mapData;
+ }
+
+ public void putMap(ColumnarMap value) {
+ mapData = value;
+ }
+
+ @Override
+ public Decimal getDecimal(int rowId, int precision, int scale) {
+ // copy and modify from WritableColumnVector
+ if (precision <= Decimal.MAX_INT_DIGITS()) {
+ return Decimal.createUnsafe(getInt(rowId), precision, scale);
+ } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
+ return Decimal.createUnsafe(getLong(rowId), precision, scale);
+ } else {
+ byte[] bytes = getBinary(rowId);
+ BigInteger bigInteger = new BigInteger(bytes);
+ BigDecimal javaDecimal = new BigDecimal(bigInteger, scale);
+ return Decimal.apply(javaDecimal, precision, scale);
+ }
+ }
+
+ public void putDecimal(Decimal value, int precision) {
+ // copy and modify from WritableColumnVector
+ if (precision <= Decimal.MAX_INT_DIGITS()) {
+ putInt((int) value.toUnscaledLong());
+ } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
+ putLong(value.toUnscaledLong());
+ } else {
+ BigInteger bigInteger = value.toJavaBigDecimal().unscaledValue();
+ putByteArray(bigInteger.toByteArray());
+ }
+ }
+
+ @Override
+ public UTF8String getUTF8String(int rowId) {
+ return UTF8String.fromBytes(byteArrayData);
+ }
+
+ public void putUtf8String(UTF8String value) {
+ putByteArray(value.getBytes());
+ }
+
+ private void putByteArray(byte[] value) {
+ byteArrayData = value;
+ }
+
+ @Override
+ public byte[] getBinary(int rowId) {
+ return byteArrayData;
+ }
+
+ public void putBinary(byte[] value) {
+ putByteArray(value);
+ }
+
+ @Override
+ public ColumnVector getChild(int ordinal) {
+ return childData;
+ }
+
+ public void putChild(ConstantColumnVector value) {
+ childData = value;
+ }
Review comment:
why we need this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]