Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/18958#discussion_r134208734
--- Diff:
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
---
@@ -0,0 +1,653 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * This class adds write APIs to ColumnVector.
+ * It supports all the types and contains put APIs as well as their
batched versions.
+ * The batched versions are preferable whenever possible.
+ *
+ * Capacity: The data stored is dense but the arrays are not fixed
capacity. It is the
+ * responsibility of the caller to call reserve() to ensure there is
enough room before adding
+ * elements. This means that the put() APIs do not check as in common
cases (i.e. flat schemas),
+ * the lengths are known up front.
+ *
+ * A ColumnVector should be considered immutable once originally created.
In other words, it is not
+ * valid to call put APIs after reads until reset() is called.
+ */
+public abstract class WritableColumnVector extends ColumnVector {
+
+ /**
+ * Resets this column for writing. The currently stored values are no
longer accessible.
+ */
+ public void reset() {
+ if (isConstant) return;
+
+ if (childColumns != null) {
+ for (ColumnVector c: childColumns) {
+ ((WritableColumnVector) c).reset();
+ }
+ }
+ numNulls = 0;
+ elementsAppended = 0;
+ if (anyNullsSet) {
+ putNotNulls(0, capacity);
+ anyNullsSet = false;
+ }
+ }
+
+ public void reserve(int requiredCapacity) {
+ if (requiredCapacity > capacity) {
+ int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity *
2L);
+ if (requiredCapacity <= newCapacity) {
+ try {
+ reserveInternal(newCapacity);
+ } catch (OutOfMemoryError outOfMemoryError) {
+ throwUnsupportedException(requiredCapacity, outOfMemoryError);
+ }
+ } else {
+ throwUnsupportedException(requiredCapacity, null);
+ }
+ }
+ }
+
+ private void throwUnsupportedException(int requiredCapacity, Throwable
cause) {
+ String message = "Cannot reserve additional contiguous bytes in the
vectorized reader " +
+ "(requested = " + requiredCapacity + " bytes). As a workaround,
you can disable the " +
+ "vectorized reader by setting " +
SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() +
+ " to false.";
+ throw new RuntimeException(message, cause);
+ }
+
+ /**
+ * Ensures that there is enough storage to store capacity elements. That
is, the put() APIs
+ * must work for all rowIds < capacity.
+ */
+ protected abstract void reserveInternal(int capacity);
+
+ /**
+ * Sets the value at rowId to null/not null.
+ */
+ public abstract void putNotNull(int rowId);
+ public abstract void putNull(int rowId);
+
+ /**
+ * Sets the values from [rowId, rowId + count) to null/not null.
+ */
+ public abstract void putNulls(int rowId, int count);
+ public abstract void putNotNulls(int rowId, int count);
+
+ /**
+ * Sets the value at rowId to `value`.
+ */
+ public abstract void putBoolean(int rowId, boolean value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to value.
+ */
+ public abstract void putBooleans(int rowId, int count, boolean value);
+
+ /**
+ * Sets the value at rowId to `value`.
+ */
+ public abstract void putByte(int rowId, byte value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to value.
+ */
+ public abstract void putBytes(int rowId, int count, byte value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src + srcIndex, src +
srcIndex + count)
+ */
+ public abstract void putBytes(int rowId, int count, byte[] src, int
srcIndex);
+
+ /**
+ * Sets the value at rowId to `value`.
+ */
+ public abstract void putShort(int rowId, short value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to value.
+ */
+ public abstract void putShorts(int rowId, int count, short value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src + srcIndex, src +
srcIndex + count)
+ */
+ public abstract void putShorts(int rowId, int count, short[] src, int
srcIndex);
+
+ /**
+ * Sets the value at rowId to `value`.
+ */
+ public abstract void putInt(int rowId, int value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to value.
+ */
+ public abstract void putInts(int rowId, int count, int value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src + srcIndex, src +
srcIndex + count)
+ */
+ public abstract void putInts(int rowId, int count, int[] src, int
srcIndex);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src[srcIndex],
src[srcIndex + count])
+ * The data in src must be 4-byte little endian ints.
+ */
+ public abstract void putIntsLittleEndian(int rowId, int count, byte[]
src, int srcIndex);
+
+ /**
+ * Sets the value at rowId to `value`.
+ */
+ public abstract void putLong(int rowId, long value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to value.
+ */
+ public abstract void putLongs(int rowId, int count, long value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src + srcIndex, src +
srcIndex + count)
+ */
+ public abstract void putLongs(int rowId, int count, long[] src, int
srcIndex);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src[srcIndex],
src[srcIndex + count])
+ * The data in src must be 8-byte little endian longs.
+ */
+ public abstract void putLongsLittleEndian(int rowId, int count, byte[]
src, int srcIndex);
+
+ /**
+ * Sets the value at rowId to `value`.
+ */
+ public abstract void putFloat(int rowId, float value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to value.
+ */
+ public abstract void putFloats(int rowId, int count, float value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src + srcIndex, src +
srcIndex + count)
+ */
+ public abstract void putFloats(int rowId, int count, float[] src, int
srcIndex);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src[srcIndex],
src[srcIndex + count])
+ * The data in src must be ieee formatted floats.
+ */
+ public abstract void putFloats(int rowId, int count, byte[] src, int
srcIndex);
+
+ /**
+ * Sets the value at rowId to `value`.
+ */
+ public abstract void putDouble(int rowId, double value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to value.
+ */
+ public abstract void putDoubles(int rowId, int count, double value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src + srcIndex, src +
srcIndex + count)
+ */
+ public abstract void putDoubles(int rowId, int count, double[] src, int
srcIndex);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src[srcIndex],
src[srcIndex + count])
+ * The data in src must be ieee formatted doubles.
+ */
+ public abstract void putDoubles(int rowId, int count, byte[] src, int
srcIndex);
+
+ /**
+ * Puts a byte array that already exists in this column.
+ */
+ public abstract void putArray(int rowId, int offset, int length);
+
+ /**
+ * Sets the value at rowId to `value`.
+ */
+ public abstract int putByteArray(int rowId, byte[] value, int offset,
int count);
+ public final int putByteArray(int rowId, byte[] value) {
+ return putByteArray(rowId, value, 0, value.length);
+ }
+
+ /**
+ * Returns the value for rowId.
+ */
+ private ColumnVector.Array getByteArray(int rowId) {
+ ColumnVector.Array array = getArray(rowId);
+ array.data.loadBytes(array);
+ return array;
+ }
+
+ /**
+ * Returns the decimal for rowId.
+ */
+ @Override
+ public Decimal getDecimal(int rowId, int precision, int scale) {
+ if (precision <= Decimal.MAX_INT_DIGITS()) {
+ return Decimal.createUnsafe(getInt(rowId), precision, scale);
+ } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
+ return Decimal.createUnsafe(getLong(rowId), precision, scale);
+ } else {
+ // TODO: best perf?
+ byte[] bytes = getBinary(rowId);
+ BigInteger bigInteger = new BigInteger(bytes);
+ BigDecimal javaDecimal = new BigDecimal(bigInteger, scale);
+ return Decimal.apply(javaDecimal, precision, scale);
+ }
+ }
+
+ public void putDecimal(int rowId, Decimal value, int precision) {
+ if (precision <= Decimal.MAX_INT_DIGITS()) {
+ putInt(rowId, (int) value.toUnscaledLong());
+ } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
+ putLong(rowId, value.toUnscaledLong());
+ } else {
+ BigInteger bigInteger = value.toJavaBigDecimal().unscaledValue();
+ putByteArray(rowId, bigInteger.toByteArray());
+ }
+ }
+
+ /**
+ * Returns the UTF8String for rowId.
+ */
+ @Override
+ public UTF8String getUTF8String(int rowId) {
+ if (dictionary == null) {
+ ColumnVector.Array a = getByteArray(rowId);
+ return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset,
a.length);
+ } else {
+ byte[] bytes =
dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
+ return UTF8String.fromBytes(bytes);
+ }
+ }
+
+ /**
+ * Returns the byte array for rowId.
+ */
+ @Override
+ public byte[] getBinary(int rowId) {
+ if (dictionary == null) {
+ ColumnVector.Array array = getByteArray(rowId);
+ byte[] bytes = new byte[array.length];
+ System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0,
bytes.length);
+ return bytes;
+ } else {
+ return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
+ }
+ }
+
+ /**
+ * Append APIs. These APIs all behave similarly and will append data to
the current vector. It
+ * is not valid to mix the put and append APIs. The append APIs are
slower and should only be
+ * used if the sizes are not known up front.
+ * In all these cases, the return value is the rowId for the first
appended element.
+ */
+ public final int appendNull() {
+ assert (!(dataType() instanceof StructType)); // Use appendStruct()
+ reserve(elementsAppended + 1);
+ putNull(elementsAppended);
+ return elementsAppended++;
+ }
+
+ public final int appendNotNull() {
+ reserve(elementsAppended + 1);
+ putNotNull(elementsAppended);
+ return elementsAppended++;
+ }
+
+ public final int appendNulls(int count) {
+ assert (!(dataType() instanceof StructType));
+ reserve(elementsAppended + count);
+ int result = elementsAppended;
+ putNulls(elementsAppended, count);
+ elementsAppended += count;
+ return result;
+ }
+
+ public final int appendNotNulls(int count) {
+ assert (!(dataType() instanceof StructType));
+ reserve(elementsAppended + count);
+ int result = elementsAppended;
+ putNotNulls(elementsAppended, count);
+ elementsAppended += count;
+ return result;
+ }
+
+ public final int appendBoolean(boolean v) {
+ reserve(elementsAppended + 1);
+ putBoolean(elementsAppended, v);
+ return elementsAppended++;
+ }
+
+ public final int appendBooleans(int count, boolean v) {
+ reserve(elementsAppended + count);
+ int result = elementsAppended;
+ putBooleans(elementsAppended, count, v);
+ elementsAppended += count;
+ return result;
+ }
+
+ public final int appendByte(byte v) {
+ reserve(elementsAppended + 1);
+ putByte(elementsAppended, v);
+ return elementsAppended++;
+ }
+
+ public final int appendBytes(int count, byte v) {
+ reserve(elementsAppended + count);
+ int result = elementsAppended;
+ putBytes(elementsAppended, count, v);
+ elementsAppended += count;
+ return result;
+ }
+
+ public final int appendBytes(int length, byte[] src, int offset) {
+ reserve(elementsAppended + length);
+ int result = elementsAppended;
+ putBytes(elementsAppended, length, src, offset);
+ elementsAppended += length;
+ return result;
+ }
+
+ public final int appendShort(short v) {
+ reserve(elementsAppended + 1);
+ putShort(elementsAppended, v);
+ return elementsAppended++;
+ }
+
+ public final int appendShorts(int count, short v) {
+ reserve(elementsAppended + count);
+ int result = elementsAppended;
+ putShorts(elementsAppended, count, v);
+ elementsAppended += count;
+ return result;
+ }
+
+ public final int appendShorts(int length, short[] src, int offset) {
+ reserve(elementsAppended + length);
+ int result = elementsAppended;
+ putShorts(elementsAppended, length, src, offset);
+ elementsAppended += length;
+ return result;
+ }
+
+ public final int appendInt(int v) {
+ reserve(elementsAppended + 1);
+ putInt(elementsAppended, v);
+ return elementsAppended++;
+ }
+
+ public final int appendInts(int count, int v) {
+ reserve(elementsAppended + count);
+ int result = elementsAppended;
+ putInts(elementsAppended, count, v);
+ elementsAppended += count;
+ return result;
+ }
+
+ public final int appendInts(int length, int[] src, int offset) {
+ reserve(elementsAppended + length);
+ int result = elementsAppended;
+ putInts(elementsAppended, length, src, offset);
+ elementsAppended += length;
+ return result;
+ }
+
+ public final int appendLong(long v) {
+ reserve(elementsAppended + 1);
+ putLong(elementsAppended, v);
+ return elementsAppended++;
+ }
+
+ public final int appendLongs(int count, long v) {
+ reserve(elementsAppended + count);
+ int result = elementsAppended;
+ putLongs(elementsAppended, count, v);
+ elementsAppended += count;
+ return result;
+ }
+
+ public final int appendLongs(int length, long[] src, int offset) {
+ reserve(elementsAppended + length);
+ int result = elementsAppended;
+ putLongs(elementsAppended, length, src, offset);
+ elementsAppended += length;
+ return result;
+ }
+
+ public final int appendFloat(float v) {
+ reserve(elementsAppended + 1);
+ putFloat(elementsAppended, v);
+ return elementsAppended++;
+ }
+
+ public final int appendFloats(int count, float v) {
+ reserve(elementsAppended + count);
+ int result = elementsAppended;
+ putFloats(elementsAppended, count, v);
+ elementsAppended += count;
+ return result;
+ }
+
+ public final int appendFloats(int length, float[] src, int offset) {
+ reserve(elementsAppended + length);
+ int result = elementsAppended;
+ putFloats(elementsAppended, length, src, offset);
+ elementsAppended += length;
+ return result;
+ }
+
+ public final int appendDouble(double v) {
+ reserve(elementsAppended + 1);
+ putDouble(elementsAppended, v);
+ return elementsAppended++;
+ }
+
+ public final int appendDoubles(int count, double v) {
+ reserve(elementsAppended + count);
+ int result = elementsAppended;
+ putDoubles(elementsAppended, count, v);
+ elementsAppended += count;
+ return result;
+ }
+
+ public final int appendDoubles(int length, double[] src, int offset) {
+ reserve(elementsAppended + length);
+ int result = elementsAppended;
+ putDoubles(elementsAppended, length, src, offset);
+ elementsAppended += length;
+ return result;
+ }
+
+ public final int appendByteArray(byte[] value, int offset, int length) {
+ int copiedOffset = arrayData().appendBytes(length, value, offset);
+ reserve(elementsAppended + 1);
+ putArray(elementsAppended, copiedOffset, length);
+ return elementsAppended++;
+ }
+
+ public final int appendArray(int length) {
+ reserve(elementsAppended + 1);
+ putArray(elementsAppended, arrayData().elementsAppended, length);
+ return elementsAppended++;
+ }
+
+ /**
+ * Appends a NULL struct. This *has* to be used for structs instead of
appendNull() as this
+ * recursively appends a NULL to its children.
+ * We don't have this logic as the general appendNull implementation to
optimize the more
+ * common non-struct case.
+ */
+ public final int appendStruct(boolean isNull) {
+ if (isNull) {
+ appendNull();
+ for (ColumnVector c: childColumns) {
+ if (c.type instanceof StructType) {
+ ((WritableColumnVector) c).appendStruct(true);
+ } else {
+ ((WritableColumnVector) c).appendNull();
+ }
+ }
+ } else {
+ appendNotNull();
+ }
+ return elementsAppended;
+ }
+
+ /**
+ * Returns the data for the underlying array.
+ */
+ @Override
+ public WritableColumnVector arrayData() { return (WritableColumnVector)
childColumns[0]; }
+
+ /**
+ * Returns the ordinal's child data column.
+ */
+ @Override
+ public WritableColumnVector getChildColumn(int ordinal) {
+ return (WritableColumnVector) childColumns[ordinal];
+ }
+
+ /**
+ * Returns the elements appended.
+ */
+ public final int getElementsAppended() { return elementsAppended; }
+
+ /**
+ * Marks this column as being constant.
+ */
+ public final void setIsConstant() { isConstant = true; }
+
+ /**
+ * Upper limit for the maximum capacity for this column.
+ */
+ @VisibleForTesting
+ protected int MAX_CAPACITY = Integer.MAX_VALUE;
+
+ /**
+ * Default size of each array length value. This grows as necessary.
+ */
+ protected static final int DEFAULT_ARRAY_LENGTH = 4;
+
+ /**
+ * Current write cursor (row index) when appending data.
+ */
+ protected int elementsAppended;
+
+ /**
+ * True if this column's values are fixed. This means the column values
never change, even
+ * across resets.
+ */
+ protected boolean isConstant;
+
+ /**
+ * Update the dictionary.
+ */
+ public void setDictionary(Dictionary dictionary) {
+ this.dictionary = dictionary;
+ }
+
+ /**
+ * Reserve a integer column for ids of dictionary.
+ */
+ public WritableColumnVector reserveDictionaryIds(int capacity) {
+ WritableColumnVector dictionaryIds = (WritableColumnVector)
this.dictionaryIds;
+ if (dictionaryIds == null) {
+ dictionaryIds = reserveNewColumn(capacity, DataTypes.IntegerType);
+ this.dictionaryIds = dictionaryIds;
+ } else {
+ dictionaryIds.reset();
+ dictionaryIds.reserve(capacity);
+ }
+ return dictionaryIds;
+ }
+
+ /**
+ * Returns the underlying integer column for ids of dictionary.
+ */
+ @Override
+ public WritableColumnVector getDictionaryIds() {
+ return (WritableColumnVector) dictionaryIds;
+ }
+
+ /**
+ * Reserve a new column.
+ */
+ protected abstract WritableColumnVector reserveNewColumn(int capacity,
DataType type);
+
+ /**
+ * Initialize child columns.
+ */
+ protected void initialize() {
--- End diff --
We could move this method into the constructor.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]