Github user michalsenkyr commented on the issue:

    https://github.com/apache/spark/pull/16541
  
    Apologies for taking so long.
    
    I tried modifying the serialization logic as best as I could to serialize 
into `UnsafeArrayData` ([branch 
diff](https://github.com/michalsenkyr/spark/compare/dataset-seq-builder...michalsenkyr:dataset-seq-builder-unsafe)).
 I had to first convert into an array to use `fromPrimitiveArray` on the 
result. That's probably the reason why the benchmark came up slightly worse:
    
    ```
    OpenJDK 64-Bit Server VM 1.8.0_121-b13 on Linux 4.9.6-1-ARCH
    AMD A10-4600M APU with Radeon(tm) HD Graphics
    collect:                                 Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------
    Seq                                            256 /  287          0,0      
255670,1       1,0X
    List                                           161 /  220          0,0      
161091,7       1,6X
    mutable.Queue                                  304 /  324          0,0      
303823,3       0,8X
    ```
    
    I am not entirely sure how `GenericArrayData` and `UnsafeArrayData` is 
handled on transformations and shuffles though, so it's possible that more 
complex tests will reveal better performance. However, I'm not sure that I can 
test this properly on my single-machine setup. I'd definitely be interested in 
benchmark results on a cluster setup.
    
    Generated code:
    
    ```
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIterator(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private scala.collection.Iterator[] inputs;
    /* 008 */   private scala.collection.Iterator inputadapter_input;
    /* 009 */   private boolean CollectObjects_loopIsNull1;
    /* 010 */   private int CollectObjects_loopValue0;
    /* 011 */   private UnsafeRow deserializetoobject_result;
    /* 012 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
deserializetoobject_holder;
    /* 013 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
deserializetoobject_rowWriter;
    /* 014 */   private scala.collection.immutable.List mapelements_argValue;
    /* 015 */   private UnsafeRow mapelements_result;
    /* 016 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
mapelements_holder;
    /* 017 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
mapelements_rowWriter;
    /* 018 */   private scala.collection.immutable.List 
serializefromobject_argValue;
    /* 019 */   private UnsafeRow serializefromobject_result;
    /* 020 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
serializefromobject_holder;
    /* 021 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
serializefromobject_rowWriter;
    /* 022 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter 
serializefromobject_arrayWriter;
    /* 023 */
    /* 024 */   public GeneratedIterator(Object[] references) {
    /* 025 */     this.references = references;
    /* 026 */   }
    /* 027 */
    /* 028 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
    /* 029 */     partitionIndex = index;
    /* 030 */     this.inputs = inputs;
    /* 031 */     inputadapter_input = inputs[0];
    /* 032 */
    /* 033 */     deserializetoobject_result = new UnsafeRow(1);
    /* 034 */     this.deserializetoobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result,
 32);
    /* 035 */     this.deserializetoobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder,
 1);
    /* 036 */
    /* 037 */     mapelements_result = new UnsafeRow(1);
    /* 038 */     this.mapelements_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result,
 32);
    /* 039 */     this.mapelements_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder,
 1);
    /* 040 */
    /* 041 */     serializefromobject_result = new UnsafeRow(1);
    /* 042 */     this.serializefromobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result,
 32);
    /* 043 */     this.serializefromobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder,
 1);
    /* 044 */     this.serializefromobject_arrayWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
    /* 045 */
    /* 046 */   }
    /* 047 */
    /* 048 */   protected void processNext() throws java.io.IOException {
    /* 049 */     while (inputadapter_input.hasNext() && !stopEarly()) {
    /* 050 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 051 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
    /* 052 */       ArrayData inputadapter_value = inputadapter_isNull ? null : 
(inputadapter_row.getArray(0));
    /* 053 */
    /* 054 */       scala.collection.immutable.List deserializetoobject_value = 
null;
    /* 055 */
    /* 056 */       if (!inputadapter_isNull) {
    /* 057 */         int deserializetoobject_dataLength = 
inputadapter_value.numElements();
    /* 058 */         scala.collection.mutable.Builder 
CollectObjects_builderValue2 = 
scala.collection.immutable.List$.MODULE$.newBuilder();
    /* 059 */         
CollectObjects_builderValue2.sizeHint(deserializetoobject_dataLength);
    /* 060 */
    /* 061 */         int deserializetoobject_loopIndex = 0;
    /* 062 */         while (deserializetoobject_loopIndex < 
deserializetoobject_dataLength) {
    /* 063 */           CollectObjects_loopValue0 = (int) 
(inputadapter_value.getInt(deserializetoobject_loopIndex));
    /* 064 */           CollectObjects_loopIsNull1 = 
inputadapter_value.isNullAt(deserializetoobject_loopIndex);
    /* 065 */
    /* 066 */           if (CollectObjects_loopIsNull1) {
    /* 067 */             throw new RuntimeException(((java.lang.String) 
references[0]));
    /* 068 */           }
    /* 069 */           if (false) {
    /* 070 */             CollectObjects_builderValue2.$plus$eq(null);
    /* 071 */           } else {
    /* 072 */             
CollectObjects_builderValue2.$plus$eq(CollectObjects_loopValue0);
    /* 073 */           }
    /* 074 */
    /* 075 */           deserializetoobject_loopIndex += 1;
    /* 076 */         }
    /* 077 */
    /* 078 */         deserializetoobject_value = 
(scala.collection.immutable.List) CollectObjects_builderValue2.result();
    /* 079 */       }
    /* 080 */
    /* 081 */       boolean mapelements_isNull = true;
    /* 082 */       scala.collection.immutable.List mapelements_value = null;
    /* 083 */       if (!false) {
    /* 084 */         mapelements_argValue = deserializetoobject_value;
    /* 085 */
    /* 086 */         mapelements_isNull = false;
    /* 087 */         if (!mapelements_isNull) {
    /* 088 */           Object mapelements_funcResult = null;
    /* 089 */           mapelements_funcResult = ((scala.Function1) 
references[1]).apply(mapelements_argValue);
    /* 090 */           if (mapelements_funcResult == null) {
    /* 091 */             mapelements_isNull = true;
    /* 092 */           } else {
    /* 093 */             mapelements_value = (scala.collection.immutable.List) 
mapelements_funcResult;
    /* 094 */           }
    /* 095 */
    /* 096 */         }
    /* 097 */         mapelements_isNull = mapelements_value == null;
    /* 098 */       }
    /* 099 */
    /* 100 */       if (mapelements_isNull) {
    /* 101 */         throw new RuntimeException(((java.lang.String) 
references[2]));
    /* 102 */       }
    /* 103 */       serializefromobject_argValue = mapelements_value;
    /* 104 */
    /* 105 */       boolean serializefromobject_isNull = false;
    /* 106 */       final ArrayData serializefromobject_value = false ? null : 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromIntegerSeq(serializefromobject_argValue);
    /* 107 */       serializefromobject_isNull = serializefromobject_value == 
null;
    /* 108 */       serializefromobject_holder.reset();
    /* 109 */
    /* 110 */       serializefromobject_rowWriter.zeroOutNullBytes();
    /* 111 */
    /* 112 */       if (serializefromobject_isNull) {
    /* 113 */         serializefromobject_rowWriter.setNullAt(0);
    /* 114 */       } else {
    /* 115 */         // Remember the current cursor so that we can calculate 
how many bytes are
    /* 116 */         // written later.
    /* 117 */         final int serializefromobject_tmpCursor = 
serializefromobject_holder.cursor;
    /* 118 */
    /* 119 */         if (serializefromobject_value instanceof UnsafeArrayData) 
{
    /* 120 */           final int serializefromobject_sizeInBytes = 
((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
    /* 121 */           // grow the global buffer before writing data.
    /* 122 */           
serializefromobject_holder.grow(serializefromobject_sizeInBytes);
    /* 123 */           ((UnsafeArrayData) 
serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, 
serializefromobject_holder.cursor);
    /* 124 */           serializefromobject_holder.cursor += 
serializefromobject_sizeInBytes;
    /* 125 */
    /* 126 */         } else {
    /* 127 */           final int serializefromobject_numElements = 
serializefromobject_value.numElements();
    /* 128 */           
serializefromobject_arrayWriter.initialize(serializefromobject_holder, 
serializefromobject_numElements, 4);
    /* 129 */
    /* 130 */           for (int serializefromobject_index = 0; 
serializefromobject_index < serializefromobject_numElements; 
serializefromobject_index++) {
    /* 131 */             if 
(serializefromobject_value.isNullAt(serializefromobject_index)) {
    /* 132 */               
serializefromobject_arrayWriter.setNullInt(serializefromobject_index);
    /* 133 */             } else {
    /* 134 */               final int serializefromobject_element = 
serializefromobject_value.getInt(serializefromobject_index);
    /* 135 */               
serializefromobject_arrayWriter.write(serializefromobject_index, 
serializefromobject_element);
    /* 136 */             }
    /* 137 */           }
    /* 138 */         }
    /* 139 */
    /* 140 */         serializefromobject_rowWriter.setOffsetAndSize(0, 
serializefromobject_tmpCursor, serializefromobject_holder.cursor - 
serializefromobject_tmpCursor);
    /* 141 */       }
    /* 142 */       
serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
    /* 143 */       append(serializefromobject_result);
    /* 144 */       if (shouldStop()) return;
    /* 145 */     }
    /* 146 */   }
    /* 147 */ }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to