GitHub user kiszk opened a pull request:

    https://github.com/apache/spark/pull/17172

    [SPARK-19008][SQL] Improve performance of Dataset.map by eliminating 
boxing/unboxing

    ## What changes were proposed in this pull request?
    
    This PR improve performance of Dataset.map() for primitive types by 
removing boxing/unbox operations.
    
    Current Catalyst generates a method call to a `apply()` method of an 
anonymous function written in Scala. The types of an argument and return value 
are `java.lang.Object`. As a result, each method call for a primitive value 
involves a pair of unboxing and boxing for calling this `apply()` method and a 
pair of boxing and unboxing for returning from this `apply()` method.
    
    This PR directly calls a specialized version of a `apply()` method without 
boxing and unboxing. For example, if types of an arguments ant return value is 
`int`, this PR generates a method call to `apply$mcII$sp`. This PR supports any 
combination of `Int`, `Long`, `Float`, and `Double`.
    
    
    The following is a benchmark result using [this 
program](https://github.com/apache/spark/pull/16391/files) with 4.7x. Here is a 
Dataset part of this program.
    
    Without this PR
    ```
    OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 
4.4.0-47-generic
    Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
    back-to-back map:                        Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------
    RDD                                           1923 / 1952         52.0      
    19.2       1.0X
    DataFrame                                      526 /  548        190.2      
     5.3       3.7X
    Dataset                                       3094 / 3154         32.3      
    30.9       0.6X
    ```
    
    With this PR
    ```
    OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 
4.4.0-47-generic
    Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
    back-to-back map:                        Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------
    RDD                                           1883 / 1892         53.1      
    18.8       1.0X
    DataFrame                                      502 /  642        199.1      
     5.0       3.7X
    Dataset                                        657 /  784        152.2      
     6.6       2.9X
    ```
    
    ```java
      def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): 
Benchmark = {
        import spark.implicits._
        val rdd = spark.sparkContext.range(0, numRows)
        val ds = spark.range(0, numRows)
        val func = (l: Long) => l + 1
        val benchmark = new Benchmark("back-to-back map", numRows)
    ...
        benchmark.addCase("Dataset") { iter =>
          var res = ds.as[Long]
          var i = 0
          while (i < numChains) {
            res = res.map(func)
            i += 1
          }
          res.queryExecution.toRdd.foreach(_ => Unit)
        }
        benchmark
      }
    ```
    
    
    A motivating example
    ```java
    Seq(1, 2, 3).toDS.map(i => i * 7).show
    ```
    
    Generated code without this PR
    ```java
    /* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private scala.collection.Iterator[] inputs;
    /* 008 */   private scala.collection.Iterator inputadapter_input;
    /* 009 */   private UnsafeRow deserializetoobject_result;
    /* 010 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
deserializetoobject_holder;
    /* 011 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
deserializetoobject_rowWriter;
    /* 012 */   private int mapelements_argValue;
    /* 013 */   private UnsafeRow mapelements_result;
    /* 014 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
mapelements_holder;
    /* 015 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
mapelements_rowWriter;
    /* 016 */   private UnsafeRow serializefromobject_result;
    /* 017 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
serializefromobject_holder;
    /* 018 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
serializefromobject_rowWriter;
    /* 019 */
    /* 020 */   public GeneratedIterator(Object[] references) {
    /* 021 */     this.references = references;
    /* 022 */   }
    /* 023 */
    /* 024 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
    /* 025 */     partitionIndex = index;
    /* 026 */     this.inputs = inputs;
    /* 027 */     inputadapter_input = inputs[0];
    /* 028 */     deserializetoobject_result = new UnsafeRow(1);
    /* 029 */     this.deserializetoobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result,
 0);
    /* 030 */     this.deserializetoobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder,
 1);
    /* 031 */
    /* 032 */     mapelements_result = new UnsafeRow(1);
    /* 033 */     this.mapelements_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result,
 0);
    /* 034 */     this.mapelements_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder,
 1);
    /* 035 */     serializefromobject_result = new UnsafeRow(1);
    /* 036 */     this.serializefromobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result,
 0);
    /* 037 */     this.serializefromobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder,
 1);
    /* 038 */
    /* 039 */   }
    /* 040 */
    /* 043 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 044 */       int inputadapter_value = inputadapter_row.getInt(0);
    /* 045 */
    /* 046 */       boolean mapelements_isNull = true;
    /* 047 */       int mapelements_value = -1;
    /* 048 */       if (!false) {
    /* 049 */         mapelements_argValue = inputadapter_value;
    /* 050 */
    /* 051 */         mapelements_isNull = false;
    /* 052 */         if (!mapelements_isNull) {
    /* 053 */           Object mapelements_funcResult = null;
    /* 054 */           mapelements_funcResult = ((scala.Function1) 
references[0]).apply(mapelements_argValue);
    /* 055 */           if (mapelements_funcResult == null) {
    /* 056 */             mapelements_isNull = true;
    /* 057 */           } else {
    /* 058 */             mapelements_value = (Integer) mapelements_funcResult;
    /* 059 */           }
    /* 060 */
    /* 061 */         }
    /* 062 */
    /* 063 */       }
    /* 064 */
    /* 065 */       serializefromobject_rowWriter.zeroOutNullBytes();
    /* 066 */
    /* 067 */       if (mapelements_isNull) {
    /* 068 */         serializefromobject_rowWriter.setNullAt(0);
    /* 069 */       } else {
    /* 070 */         serializefromobject_rowWriter.write(0, mapelements_value);
    /* 071 */       }
    /* 072 */       append(serializefromobject_result);
    /* 073 */       if (shouldStop()) return;
    /* 074 */     }
    /* 075 */   }
    /* 076 */ }
    ```
    
    Generated code with this PR (lines 48-56 are changed)
    ```java
    /* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private scala.collection.Iterator[] inputs;
    /* 008 */   private scala.collection.Iterator inputadapter_input;
    /* 009 */   private UnsafeRow deserializetoobject_result;
    /* 010 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
deserializetoobject_holder;
    /* 011 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
deserializetoobject_rowWriter;
    /* 012 */   private int mapelements_argValue;
    /* 013 */   private UnsafeRow mapelements_result;
    /* 014 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
mapelements_holder;
    /* 015 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
mapelements_rowWriter;
    /* 016 */   private UnsafeRow serializefromobject_result;
    /* 017 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
serializefromobject_holder;
    /* 018 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
serializefromobject_rowWriter;
    /* 019 */
    /* 020 */   public GeneratedIterator(Object[] references) {
    /* 021 */     this.references = references;
    /* 022 */   }
    /* 023 */
    /* 024 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
    /* 025 */     partitionIndex = index;
    /* 026 */     this.inputs = inputs;
    /* 027 */     inputadapter_input = inputs[0];
    /* 028 */     deserializetoobject_result = new UnsafeRow(1);
    /* 029 */     this.deserializetoobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result,
 0);
    /* 030 */     this.deserializetoobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder,
 1);
    /* 031 */
    /* 032 */     mapelements_result = new UnsafeRow(1);
    /* 033 */     this.mapelements_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result,
 0);
    /* 034 */     this.mapelements_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder,
 1);
    /* 035 */     serializefromobject_result = new UnsafeRow(1);
    /* 036 */     this.serializefromobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result,
 0);
    /* 037 */     this.serializefromobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder,
 1);
    /* 038 */
    /* 039 */   }
    /* 040 */
    /* 041 */   protected void processNext() throws java.io.IOException {
    /* 042 */     while (inputadapter_input.hasNext() && !stopEarly()) {
    /* 043 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 044 */       int inputadapter_value = inputadapter_row.getInt(0);
    /* 045 */
    /* 046 */       boolean mapelements_isNull = true;
    /* 047 */       int mapelements_value = -1;
    /* 048 */       if (!false) {
    /* 049 */         mapelements_argValue = inputadapter_value;
    /* 050 */
    /* 051 */         mapelements_isNull = false;
    /* 052 */         if (!mapelements_isNull) {
    /* 053 */           mapelements_value = ((scala.Function1) 
references[0]).apply$mcII$sp(mapelements_argValue);
    /* 054 */         }
    /* 055 */
    /* 056 */       }
    /* 057 */
    /* 058 */       serializefromobject_rowWriter.zeroOutNullBytes();
    /* 059 */
    /* 060 */       if (mapelements_isNull) {
    /* 061 */         serializefromobject_rowWriter.setNullAt(0);
    /* 062 */       } else {
    /* 063 */         serializefromobject_rowWriter.write(0, mapelements_value);
    /* 064 */       }
    /* 065 */       append(serializefromobject_result);
    /* 066 */       if (shouldStop()) return;
    /* 067 */     }
    /* 068 */   }
    /* 069 */ }
    ```
    
    ## How was this patch tested?
    
    Added new test suites to `DatasetPrimitiveSuite`.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kiszk/spark SPARK-19008

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17172.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17172
    
----
commit d8b5f8d839d5c3388244cf2a6dcf4494d927145f
Author: Kazuaki Ishizaki <[email protected]>
Date:   2017-03-06T06:42:10Z

    Initial commit

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to