Github user kiszk commented on the issue:
https://github.com/apache/spark/pull/18014
@cloud-fan When I think about the use case of `ColumnVector.getArray` (i.e.
in generated code by the whole-stage code geneneration), I think that it is
better to return `UnsafeArrayData` instead of `ColumnVector.Array` in the
currnet generated code.
The following is an example program and current generated code. In the
generated code, we will replace `inputadapter_row.getArray(0)` at line 35 with
`columnVector.getArray(rowIdx)`. So, let us assume
`columnVector.getArray(rowIdx)` is used and focus on `inputadapter_value`.
At projection, if a type of `inputadapter_value` is `UnsafeArrayData`, line
72 just performs fast memory copy for a contigious region. On the other hand,
if a type of `inputadapter_value` is `ColumnVector.Array`, lines 79-86 performs
element-wise copy that is slower.
I think that there are three options.
1. Add a method `UnsafeArrayData ColumnVector.getArray()` to use
`UnsafeArrayData` anywhere in the generated code.
2. Add a conditional branch for `ColumnVector.Array` in the generated code
and prepare specialized copy routine for `ColumnVector.Array`. In this case, we
can use bulk copy for array body, but use element-wise copy for null bits.
3. In addition to 2, we support bit-wise null bits representation in
`ColumnVector.Array`. In this case, we can use two bulk copy. One is for null
bits, and the other is for array body. Downside of this approach is to
introduce additional conditional branch at ColumnVector.isNullAt() (i.e. check
whether we use byte-wise or bit-wise null representation).
What do you think? Or, are there any other ideas?
```java
val df = sparkContext.parallelize(Seq(Array(0, 1), Array(1, 2)),
1).toDF("a").cache
df.count
df.filter("a[0] > 0").show
```
```java
/* 031 */ protected void processNext() throws java.io.IOException {
/* 032 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 033 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 034 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 035 */ ArrayData inputadapter_value = inputadapter_isNull ? null :
(inputadapter_row.getArray(0));
/* 037 */ if (!(!(inputadapter_isNull))) continue;
/* 039 */ boolean filter_isNull2 = true;
/* 040 */ boolean filter_value2 = false;
/* 042 */ boolean filter_isNull3 = true;
/* 043 */ int filter_value3 = -1;
/* 045 */ filter_isNull3 = false;
/* 047 */ final int filter_index = (int) 0;
/* 048 */ if (filter_index >= inputadapter_value.numElements() ||
filter_index < 0 || inputadapter_value.isNullAt(filter_index)) {
/* 049 */ filter_isNull3 = true;
/* 050 */ } else {
/* 051 */ filter_value3 = inputadapter_value.getInt(filter_index);
/* 052 */ }
/* 053 */ if (!filter_isNull3) {
/* 054 */ filter_isNull2 = false;
/* 055 */ filter_value2 = filter_value3 > 0;
/* 057 */ }
/* 058 */ if (filter_isNull2 || !filter_value2) continue;
/* 060 */ filter_numOutputRows.add(1);
/* 062 */ filter_holder.reset();
/* 066 */ final int filter_tmpCursor = filter_holder.cursor;
/* 068 */ if (inputadapter_value instanceof UnsafeArrayData) {
/* 069 */ final int filter_sizeInBytes = ((UnsafeArrayData)
inputadapter_value).getSizeInBytes();
/* 071 */ filter_holder.grow(filter_sizeInBytes);
/* 072 */ ((UnsafeArrayData)
inputadapter_value).writeToMemory(filter_holder.buffer, filter_holder.cursor);
/* 073 */ filter_holder.cursor += filter_sizeInBytes;
/* 075 */ } else {
/* 076 */ final int filter_numElements =
inputadapter_value.numElements();
/* 077 */ filter_arrayWriter.initialize(filter_holder,
filter_numElements, 4);
/* 079 */ for (int filter_index1 = 0; filter_index1 <
filter_numElements; filter_index1++) {
/* 080 */ if (inputadapter_value.isNullAt(filter_index1)) {
/* 081 */ filter_arrayWriter.setNullInt(filter_index1);
/* 082 */ } else {
/* 083 */ final int filter_element =
inputadapter_value.getInt(filter_index1);
/* 084 */ filter_arrayWriter.write(filter_index1,
filter_element);
/* 085 */ }
/* 086 */ }
/* 087 */ }
/* 089 */ filter_rowWriter.setOffsetAndSize(0, filter_tmpCursor,
filter_holder.cursor - filter_tmpCursor);
/* 090 */ filter_result.setTotalSize(filter_holder.totalSize());
/* 091 */ append(filter_result);
/* 092 */ if (shouldStop()) return;
/* 093 */ }
/* 094 */ }
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]