Github user michalsenkyr commented on the issue:
https://github.com/apache/spark/pull/16541
Apologies for taking so long.
I tried modifying the serialization logic as best as I could to serialize
into `UnsafeArrayData` ([branch
diff](https://github.com/michalsenkyr/spark/compare/dataset-seq-builder...michalsenkyr:dataset-seq-builder-unsafe)).
I had to first convert into an array to use `fromPrimitiveArray` on the
result. That's probably the reason why the benchmark came up slightly worse:
```
OpenJDK 64-Bit Server VM 1.8.0_121-b13 on Linux 4.9.6-1-ARCH
AMD A10-4600M APU with Radeon(tm) HD Graphics
collect: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Seq 256 / 287 0,0
255670,1 1,0X
List 161 / 220 0,0
161091,7 1,6X
mutable.Queue 304 / 324 0,0
303823,3 0,8X
```
I am not entirely sure how `GenericArrayData` and `UnsafeArrayData` is
handled on transformations and shuffles though, so it's possible that more
complex tests will reveal better performance. However, I'm not sure that I can
test this properly on my single-machine setup. I'd definitely be interested in
benchmark results on a cluster setup.
Generated code:
```
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator inputadapter_input;
/* 009 */ private boolean CollectObjects_loopIsNull1;
/* 010 */ private int CollectObjects_loopValue0;
/* 011 */ private UnsafeRow deserializetoobject_result;
/* 012 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
deserializetoobject_holder;
/* 013 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
deserializetoobject_rowWriter;
/* 014 */ private scala.collection.immutable.List mapelements_argValue;
/* 015 */ private UnsafeRow mapelements_result;
/* 016 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
mapelements_holder;
/* 017 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
mapelements_rowWriter;
/* 018 */ private scala.collection.immutable.List
serializefromobject_argValue;
/* 019 */ private UnsafeRow serializefromobject_result;
/* 020 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
serializefromobject_holder;
/* 021 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
serializefromobject_rowWriter;
/* 022 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter
serializefromobject_arrayWriter;
/* 023 */
/* 024 */ public GeneratedIterator(Object[] references) {
/* 025 */ this.references = references;
/* 026 */ }
/* 027 */
/* 028 */ public void init(int index, scala.collection.Iterator[] inputs)
{
/* 029 */ partitionIndex = index;
/* 030 */ this.inputs = inputs;
/* 031 */ inputadapter_input = inputs[0];
/* 032 */
/* 033 */ deserializetoobject_result = new UnsafeRow(1);
/* 034 */ this.deserializetoobject_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result,
32);
/* 035 */ this.deserializetoobject_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder,
1);
/* 036 */
/* 037 */ mapelements_result = new UnsafeRow(1);
/* 038 */ this.mapelements_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result,
32);
/* 039 */ this.mapelements_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder,
1);
/* 040 */
/* 041 */ serializefromobject_result = new UnsafeRow(1);
/* 042 */ this.serializefromobject_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result,
32);
/* 043 */ this.serializefromobject_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder,
1);
/* 044 */ this.serializefromobject_arrayWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 045 */
/* 046 */ }
/* 047 */
/* 048 */ protected void processNext() throws java.io.IOException {
/* 049 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 050 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 051 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 052 */ ArrayData inputadapter_value = inputadapter_isNull ? null :
(inputadapter_row.getArray(0));
/* 053 */
/* 054 */ scala.collection.immutable.List deserializetoobject_value =
null;
/* 055 */
/* 056 */ if (!inputadapter_isNull) {
/* 057 */ int deserializetoobject_dataLength =
inputadapter_value.numElements();
/* 058 */ scala.collection.mutable.Builder
CollectObjects_builderValue2 =
scala.collection.immutable.List$.MODULE$.newBuilder();
/* 059 */
CollectObjects_builderValue2.sizeHint(deserializetoobject_dataLength);
/* 060 */
/* 061 */ int deserializetoobject_loopIndex = 0;
/* 062 */ while (deserializetoobject_loopIndex <
deserializetoobject_dataLength) {
/* 063 */ CollectObjects_loopValue0 = (int)
(inputadapter_value.getInt(deserializetoobject_loopIndex));
/* 064 */ CollectObjects_loopIsNull1 =
inputadapter_value.isNullAt(deserializetoobject_loopIndex);
/* 065 */
/* 066 */ if (CollectObjects_loopIsNull1) {
/* 067 */ throw new RuntimeException(((java.lang.String)
references[0]));
/* 068 */ }
/* 069 */ if (false) {
/* 070 */ CollectObjects_builderValue2.$plus$eq(null);
/* 071 */ } else {
/* 072 */
CollectObjects_builderValue2.$plus$eq(CollectObjects_loopValue0);
/* 073 */ }
/* 074 */
/* 075 */ deserializetoobject_loopIndex += 1;
/* 076 */ }
/* 077 */
/* 078 */ deserializetoobject_value =
(scala.collection.immutable.List) CollectObjects_builderValue2.result();
/* 079 */ }
/* 080 */
/* 081 */ boolean mapelements_isNull = true;
/* 082 */ scala.collection.immutable.List mapelements_value = null;
/* 083 */ if (!false) {
/* 084 */ mapelements_argValue = deserializetoobject_value;
/* 085 */
/* 086 */ mapelements_isNull = false;
/* 087 */ if (!mapelements_isNull) {
/* 088 */ Object mapelements_funcResult = null;
/* 089 */ mapelements_funcResult = ((scala.Function1)
references[1]).apply(mapelements_argValue);
/* 090 */ if (mapelements_funcResult == null) {
/* 091 */ mapelements_isNull = true;
/* 092 */ } else {
/* 093 */ mapelements_value = (scala.collection.immutable.List)
mapelements_funcResult;
/* 094 */ }
/* 095 */
/* 096 */ }
/* 097 */ mapelements_isNull = mapelements_value == null;
/* 098 */ }
/* 099 */
/* 100 */ if (mapelements_isNull) {
/* 101 */ throw new RuntimeException(((java.lang.String)
references[2]));
/* 102 */ }
/* 103 */ serializefromobject_argValue = mapelements_value;
/* 104 */
/* 105 */ boolean serializefromobject_isNull = false;
/* 106 */ final ArrayData serializefromobject_value = false ? null :
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromIntegerSeq(serializefromobject_argValue);
/* 107 */ serializefromobject_isNull = serializefromobject_value ==
null;
/* 108 */ serializefromobject_holder.reset();
/* 109 */
/* 110 */ serializefromobject_rowWriter.zeroOutNullBytes();
/* 111 */
/* 112 */ if (serializefromobject_isNull) {
/* 113 */ serializefromobject_rowWriter.setNullAt(0);
/* 114 */ } else {
/* 115 */ // Remember the current cursor so that we can calculate
how many bytes are
/* 116 */ // written later.
/* 117 */ final int serializefromobject_tmpCursor =
serializefromobject_holder.cursor;
/* 118 */
/* 119 */ if (serializefromobject_value instanceof UnsafeArrayData)
{
/* 120 */ final int serializefromobject_sizeInBytes =
((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
/* 121 */ // grow the global buffer before writing data.
/* 122 */
serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 123 */ ((UnsafeArrayData)
serializefromobject_value).writeToMemory(serializefromobject_holder.buffer,
serializefromobject_holder.cursor);
/* 124 */ serializefromobject_holder.cursor +=
serializefromobject_sizeInBytes;
/* 125 */
/* 126 */ } else {
/* 127 */ final int serializefromobject_numElements =
serializefromobject_value.numElements();
/* 128 */
serializefromobject_arrayWriter.initialize(serializefromobject_holder,
serializefromobject_numElements, 4);
/* 129 */
/* 130 */ for (int serializefromobject_index = 0;
serializefromobject_index < serializefromobject_numElements;
serializefromobject_index++) {
/* 131 */ if
(serializefromobject_value.isNullAt(serializefromobject_index)) {
/* 132 */
serializefromobject_arrayWriter.setNullInt(serializefromobject_index);
/* 133 */ } else {
/* 134 */ final int serializefromobject_element =
serializefromobject_value.getInt(serializefromobject_index);
/* 135 */
serializefromobject_arrayWriter.write(serializefromobject_index,
serializefromobject_element);
/* 136 */ }
/* 137 */ }
/* 138 */ }
/* 139 */
/* 140 */ serializefromobject_rowWriter.setOffsetAndSize(0,
serializefromobject_tmpCursor, serializefromobject_holder.cursor -
serializefromobject_tmpCursor);
/* 141 */ }
/* 142 */
serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 143 */ append(serializefromobject_result);
/* 144 */ if (shouldStop()) return;
/* 145 */ }
/* 146 */ }
/* 147 */ }
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]