spark git commit: [SPARK-17490][SQL] Optimize SerializeFromObject() for a primitive array

2016-11-07 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 d1eac3ef4 -> 9873d57f2


[SPARK-17490][SQL] Optimize SerializeFromObject() for a primitive array

Waiting for merging #13680

This PR optimizes `SerializeFromObject()` for an primitive array. This is 
derived from #13758 to address one of problems by using a simple way in #13758.

The current implementation always generates `GenericArrayData` from 
`SerializeFromObject()` for any type of an array in a logical plan. This 
involves a boxing at a constructor of `GenericArrayData` when 
`SerializedFromObject()` has an primitive array.

This PR enables to generate `UnsafeArrayData` from `SerializeFromObject()` for 
a primitive array. It can avoid boxing to create an instance of `ArrayData` in 
the generated code by Catalyst.

This PR also generate `UnsafeArrayData` in a case for `RowEncoder.serializeFor` 
or `CatalystTypeConverters.createToCatalystConverter`.

Performance improvement of `SerializeFromObject()` is up to 2.0x

```
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)

Without this PR
Write an array in Dataset:   Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)   Relative

Int556 /  608 15.1  
66.3   1.0X
Double1668 / 1746  5.0 
198.8   0.3X

with this PR
Write an array in Dataset:   Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)   Relative

Int352 /  401 23.8  
42.0   1.0X
Double 821 /  885 10.2  
97.9   0.4X
```

Here is an example program that will happen in mllib as described in 
[SPARK-16070](https://issues.apache.org/jira/browse/SPARK-16070).

```
sparkContext.parallelize(Seq(Array(1, 2)), 1).toDS.map(e => e).show
```

Generated code before applying this PR

``` java
/* 039 */   protected void processNext() throws java.io.IOException {
/* 040 */ while (inputadapter_input.hasNext()) {
/* 041 */   InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
/* 042 */   int[] inputadapter_value = (int[])inputadapter_row.get(0, null);
/* 043 */
/* 044 */   Object mapelements_obj = ((Expression) 
references[0]).eval(null);
/* 045 */   scala.Function1 mapelements_value1 = (scala.Function1) 
mapelements_obj;
/* 046 */
/* 047 */   boolean mapelements_isNull = false || false;
/* 048 */   int[] mapelements_value = null;
/* 049 */   if (!mapelements_isNull) {
/* 050 */ Object mapelements_funcResult = null;
/* 051 */ mapelements_funcResult = 
mapelements_value1.apply(inputadapter_value);
/* 052 */ if (mapelements_funcResult == null) {
/* 053 */   mapelements_isNull = true;
/* 054 */ } else {
/* 055 */   mapelements_value = (int[]) mapelements_funcResult;
/* 056 */ }
/* 057 */
/* 058 */   }
/* 059 */   mapelements_isNull = mapelements_value == null;
/* 060 */
/* 061 */   serializefromobject_argIsNulls[0] = mapelements_isNull;
/* 062 */   serializefromobject_argValue = mapelements_value;
/* 063 */
/* 064 */   boolean serializefromobject_isNull = false;
/* 065 */   for (int idx = 0; idx < 1; idx++) {
/* 066 */ if (serializefromobject_argIsNulls[idx]) { 
serializefromobject_isNull = true; break; }
/* 067 */   }
/* 068 */
/* 069 */   final ArrayData serializefromobject_value = 
serializefromobject_isNull ? null : new 
org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue);
/* 070 */   serializefromobject_holder.reset();
/* 071 */
/* 072 */   serializefromobject_rowWriter.zeroOutNullBytes();
/* 073 */
/* 074 */   if (serializefromobject_isNull) {
/* 075 */ serializefromobject_rowWriter.setNullAt(0);
/* 076 */   } else {
/* 077 */ // Remember the current cursor so that we can calculate how 
many bytes are
/* 078 */ // written later.
/* 079 */ final int serializefromobject_tmpCursor = 
serializefromobject_holder.cursor;
/* 080 */
/* 081 */ if (serializefromobject_value instanceof UnsafeArrayData) {
/* 082 */   final int serializefromobject_sizeInBytes = 
((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
/* 083 */   // grow the global buffer before writing data.
/* 084 */   
serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 085 */   ((UnsafeArrayData) 
serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, 
serializefromobject_holder.cursor);
/* 086 */   serializefromobject_holder.cursor += 
serializefromobject_sizeInBytes;
/* 087 */
/* 088 

spark git commit: [SPARK-17490][SQL] Optimize SerializeFromObject() for a primitive array

2016-11-07 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 8f0ea011a -> 19cf20806


[SPARK-17490][SQL] Optimize SerializeFromObject() for a primitive array

## What changes were proposed in this pull request?

Waiting for merging #13680

This PR optimizes `SerializeFromObject()` for an primitive array. This is 
derived from #13758 to address one of problems by using a simple way in #13758.

The current implementation always generates `GenericArrayData` from 
`SerializeFromObject()` for any type of an array in a logical plan. This 
involves a boxing at a constructor of `GenericArrayData` when 
`SerializedFromObject()` has an primitive array.

This PR enables to generate `UnsafeArrayData` from `SerializeFromObject()` for 
a primitive array. It can avoid boxing to create an instance of `ArrayData` in 
the generated code by Catalyst.

This PR also generate `UnsafeArrayData` in a case for `RowEncoder.serializeFor` 
or `CatalystTypeConverters.createToCatalystConverter`.

Performance improvement of `SerializeFromObject()` is up to 2.0x

```
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)

Without this PR
Write an array in Dataset:   Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)   Relative

Int556 /  608 15.1  
66.3   1.0X
Double1668 / 1746  5.0 
198.8   0.3X

with this PR
Write an array in Dataset:   Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)   Relative

Int352 /  401 23.8  
42.0   1.0X
Double 821 /  885 10.2  
97.9   0.4X
```

Here is an example program that will happen in mllib as described in 
[SPARK-16070](https://issues.apache.org/jira/browse/SPARK-16070).

```
sparkContext.parallelize(Seq(Array(1, 2)), 1).toDS.map(e => e).show
```

Generated code before applying this PR

``` java
/* 039 */   protected void processNext() throws java.io.IOException {
/* 040 */ while (inputadapter_input.hasNext()) {
/* 041 */   InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
/* 042 */   int[] inputadapter_value = (int[])inputadapter_row.get(0, null);
/* 043 */
/* 044 */   Object mapelements_obj = ((Expression) 
references[0]).eval(null);
/* 045 */   scala.Function1 mapelements_value1 = (scala.Function1) 
mapelements_obj;
/* 046 */
/* 047 */   boolean mapelements_isNull = false || false;
/* 048 */   int[] mapelements_value = null;
/* 049 */   if (!mapelements_isNull) {
/* 050 */ Object mapelements_funcResult = null;
/* 051 */ mapelements_funcResult = 
mapelements_value1.apply(inputadapter_value);
/* 052 */ if (mapelements_funcResult == null) {
/* 053 */   mapelements_isNull = true;
/* 054 */ } else {
/* 055 */   mapelements_value = (int[]) mapelements_funcResult;
/* 056 */ }
/* 057 */
/* 058 */   }
/* 059 */   mapelements_isNull = mapelements_value == null;
/* 060 */
/* 061 */   serializefromobject_argIsNulls[0] = mapelements_isNull;
/* 062 */   serializefromobject_argValue = mapelements_value;
/* 063 */
/* 064 */   boolean serializefromobject_isNull = false;
/* 065 */   for (int idx = 0; idx < 1; idx++) {
/* 066 */ if (serializefromobject_argIsNulls[idx]) { 
serializefromobject_isNull = true; break; }
/* 067 */   }
/* 068 */
/* 069 */   final ArrayData serializefromobject_value = 
serializefromobject_isNull ? null : new 
org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue);
/* 070 */   serializefromobject_holder.reset();
/* 071 */
/* 072 */   serializefromobject_rowWriter.zeroOutNullBytes();
/* 073 */
/* 074 */   if (serializefromobject_isNull) {
/* 075 */ serializefromobject_rowWriter.setNullAt(0);
/* 076 */   } else {
/* 077 */ // Remember the current cursor so that we can calculate how 
many bytes are
/* 078 */ // written later.
/* 079 */ final int serializefromobject_tmpCursor = 
serializefromobject_holder.cursor;
/* 080 */
/* 081 */ if (serializefromobject_value instanceof UnsafeArrayData) {
/* 082 */   final int serializefromobject_sizeInBytes = 
((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
/* 083 */   // grow the global buffer before writing data.
/* 084 */   
serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 085 */   ((UnsafeArrayData) 
serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, 
serializefromobject_holder.cursor);
/* 086 */   serializefromobject_holder.cursor +=