GitHub user kiszk opened a pull request:
https://github.com/apache/spark/pull/13704
[SPARK-15985][SQL] Reduce runtime overhead of a program that reads an
primitive array in Dataset
## What changes were proposed in this pull request?
This PR reduces runtime overhead of a program the reads an primitive array
in Dataset. Generated code copies array elements from Dataset to a temporary
array. If we know that types of source and destination are primitive array, we
apply one of the following optimization:
1. Eliminate an allocation of ```Object[]``` and call
```ArrayData.to<Type>Array()``` method if types of source and destination are
the same
2. Eliminate a pair of ```isNullAt()``` and a ```null``` assignment and
allocate an primitive array instead of ```Object[]``` if types of source and
destination are different
An example program
```
val ds = Seq(Array(1.0, 2.0, 3.0), Array(4.0, 5.0, 6.0)).toDS()
ds.map(p => {
var s = 0.0
for (i <- 0 to 2) { s += p(i) }
s
}).show
```
Generated code before applying this PR
```
/* 036 */ protected void processNext() throws java.io.IOException {
/* 037 */ while (inputadapter_input.hasNext()) {
/* 038 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 039 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 040 */ ArrayData inputadapter_value = inputadapter_isNull ? null :
(inputadapter_row.getArray(0));
/* 041 */
/* 042 */ boolean deserializetoobject_isNull1 = inputadapter_isNull;
/* 043 */ ArrayData deserializetoobject_value1 = null;
/* 044 */ if (!inputadapter_isNull) {
/* 045 */ final int deserializetoobject_n =
inputadapter_value.numElements();
/* 046 */ final Object[] deserializetoobject_values = new
Object[deserializetoobject_n];
/* 047 */ for (int deserializetoobject_j = 0; deserializetoobject_j
< deserializetoobject_n; deserializetoobject_j ++) {
/* 048 */ if (inputadapter_value.isNullAt(deserializetoobject_j))
{
/* 049 */ deserializetoobject_values[deserializetoobject_j] =
null;
/* 050 */ } else {
/* 051 */ boolean deserializetoobject_feNull = false;
/* 052 */ double deserializetoobject_fePrim =
/* 053 */ inputadapter_value.getDouble(deserializetoobject_j);
/* 054 */
/* 055 */ boolean deserializetoobject_teNull =
deserializetoobject_feNull;
/* 056 */ double deserializetoobject_tePrim = -1.0;
/* 057 */ if (!deserializetoobject_feNull) {
/* 058 */ deserializetoobject_tePrim =
deserializetoobject_fePrim;
/* 059 */ }
/* 060 */
/* 061 */ if (deserializetoobject_teNull) {
/* 062 */ deserializetoobject_values[deserializetoobject_j] =
null;
/* 063 */ } else {
/* 064 */ deserializetoobject_values[deserializetoobject_j] =
deserializetoobject_tePrim;
/* 065 */ }
/* 066 */ }
/* 067 */ }
/* 068 */ deserializetoobject_value1 = new
org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_values);
/* 069 */
/* 070 */ }
/* 071 */
/* 072 */ boolean deserializetoobject_isNull =
deserializetoobject_isNull1;
/* 073 */ final double[] deserializetoobject_value =
deserializetoobject_isNull ? null : (double[])
deserializetoobject_value1.toDoubleArray();
/* 074 */ deserializetoobject_isNull = deserializetoobject_value ==
null;
/* 075 */
/* 076 */ Object mapelements_obj = ((Expression)
references[0]).eval(null);
/* 077 */ scala.Function1 mapelements_value1 = (scala.Function1)
mapelements_obj;
/* 078 */
/* 079 */ boolean mapelements_isNull = false ||
deserializetoobject_isNull;
/* 080 */ final double mapelements_value = mapelements_isNull ? -1.0
: (Double) mapelements_value1.apply(deserializetoobject_value);
/* 081 */
/* 082 */ serializefromobject_rowWriter.zeroOutNullBytes();
/* 083 */
/* 084 */ if (mapelements_isNull) {
/* 085 */ serializefromobject_rowWriter.setNullAt(0);
/* 086 */ } else {
/* 087 */ serializefromobject_rowWriter.write(0, mapelements_value);
/* 088 */ }
/* 089 */ append(serializefromobject_result);
/* 090 */ if (shouldStop()) return;
/* 091 */ }
/* 092 */ }
```
Generated code after applying this PR
```
/* 036 */ protected void processNext() throws java.io.IOException {
/* 037 */ while (inputadapter_input.hasNext()) {
/* 038 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 039 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 040 */ ArrayData inputadapter_value = inputadapter_isNull ? null :
(inputadapter_row.getArray(0));
/* 041 */
/* 042 */ boolean deserializetoobject_isNull1 = inputadapter_isNull;
/* 043 */ ArrayData deserializetoobject_value1 = null;
/* 044 */ if (!inputadapter_isNull) {
/* 045 */ final double[] deserializetoobject_values =
inputadapter_value.toDoubleArray();
/* 046 */ deserializetoobject_value1 = new
org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_values);
/* 047 */
/* 048 */ }
/* 049 */
/* 050 */ boolean deserializetoobject_isNull =
deserializetoobject_isNull1;
/* 051 */ final double[] deserializetoobject_value =
deserializetoobject_isNull ? null : (double[])
deserializetoobject_value1.toDoubleArray();
/* 052 */ deserializetoobject_isNull = deserializetoobject_value ==
null;
/* 053 */
/* 054 */ Object mapelements_obj = ((Expression)
references[0]).eval(null);
/* 055 */ scala.Function1 mapelements_value1 = (scala.Function1)
mapelements_obj;
/* 056 */
/* 057 */ boolean mapelements_isNull = false ||
deserializetoobject_isNull;
/* 058 */ final double mapelements_value = mapelements_isNull ? -1.0
: (Double) mapelements_value1.apply(deserializetoobject_value);
/* 059 */
/* 060 */ serializefromobject_rowWriter.zeroOutNullBytes();
/* 061 */
/* 062 */ if (mapelements_isNull) {
/* 063 */ serializefromobject_rowWriter.setNullAt(0);
/* 064 */ } else {
/* 065 */ serializefromobject_rowWriter.write(0, mapelements_value);
/* 066 */ }
/* 067 */ append(serializefromobject_result);
/* 068 */ if (shouldStop()) return;
/* 069 */ }
/* 070 */ }
```
## How was this patch tested?
Tested by existing Dataset unit tests
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kiszk/spark SPARK-15985
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/13704.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #13704
----
commit 17f17d60794c1f0ab81e21ec2484742a7610f06d
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-06-16T07:46:27Z
optimize to read primitive array elements in Dataset
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]