GitHub user kiszk opened a pull request:

    https://github.com/apache/spark/pull/20637

    Remove redundant null checks in generated Java code by 
GenerateUnsafeProjection

    ## What changes were proposed in this pull request?
    
    This PR works for one of TODOs in `GenerateUnsafeProjection` "if the 
nullability of field is correct, we can use it to save null check" to simplify 
generated code.  
    When `nullable=false` in `DataType`, `GenerateUnsafeProjection` removed 
code for null checks in the generated Java code.
    
    The following is an example.
    
    Source code
    ```
        val dataType3 = (new StructType)
          .add("a", StringType, nullable = false)
          .add("b", StringType, nullable = false)
          .add("c", StringType, nullable = false)
        val exprs3 = BoundReference(0, dataType3, nullable = false) :: Nil
        val projection3 = GenerateUnsafeProjection.generate(exprs3)
        projection3.apply(InternalRow(AlwaysNonNull))
    ```
    Generated code without this PR
    ```
    /* 001 */ public java.lang.Object generate(Object[] references) {
    /* 002 */   return new SpecificUnsafeProjection(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ class SpecificUnsafeProjection extends 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
    /* 006 */
    /* 007 */   private Object[] references;
    /* 008 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] 
mutableStateArray1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];
    /* 009 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] 
mutableStateArray2 = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
    /* 010 */   private UnsafeRow[] mutableStateArray = new UnsafeRow[1];
    /* 011 */
    /* 012 */   public SpecificUnsafeProjection(Object[] references) {
    /* 013 */     this.references = references;
    /* 014 */     mutableStateArray[0] = new UnsafeRow(1);
    /* 015 */     mutableStateArray1[0] = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mutableStateArray[0],
 32);
    /* 016 */     mutableStateArray2[0] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mutableStateArray1[0],
 1);
    /* 017 */     mutableStateArray2[1] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mutableStateArray1[0],
 3);
    /* 018 */
    /* 019 */   }
    /* 020 */
    /* 021 */   public void initialize(int partitionIndex) {
    /* 022 */
    /* 023 */   }
    /* 024 */
    /* 025 */   // Scala.Function1 need this
    /* 026 */   public java.lang.Object apply(java.lang.Object row) {
    /* 027 */     return apply((InternalRow) row);
    /* 028 */   }
    /* 029 */
    /* 030 */   public UnsafeRow apply(InternalRow i) {
    /* 031 */     mutableStateArray1[0].reset();
    /* 032 */
    /* 033 */     InternalRow value = i.getStruct(0, 3);
    /* 034 */     // Remember the current cursor so that we can calculate how 
many bytes are
    /* 035 */     // written later.
    /* 036 */     final int tmpCursor = mutableStateArray1[0].cursor;
    /* 037 */
    /* 038 */     final InternalRow tmpInput = value;
    /* 039 */     if (tmpInput instanceof UnsafeRow) {
    /* 040 */
    /* 041 */       final int sizeInBytes = ((UnsafeRow) 
tmpInput).getSizeInBytes();
    /* 042 */       // grow the global buffer before writing data.
    /* 043 */       mutableStateArray1[0].grow(sizeInBytes);
    /* 044 */       ((UnsafeRow) 
tmpInput).writeToMemory(mutableStateArray1[0].buffer, 
mutableStateArray1[0].cursor);
    /* 045 */       mutableStateArray1[0].cursor += sizeInBytes;
    /* 046 */
    /* 047 */     } else {
    /* 048 */       mutableStateArray2[1].reset();
    /* 049 */
    /* 050 */
    /* 051 */       if (tmpInput.isNullAt(0)) {
    /* 052 */         mutableStateArray2[1].setNullAt(0);
    /* 053 */       } else {
    /* 054 */         mutableStateArray2[1].write(0, tmpInput.getUTF8String(0));
    /* 055 */       }
    /* 056 */
    /* 057 */
    /* 058 */       if (tmpInput.isNullAt(1)) {
    /* 059 */         mutableStateArray2[1].setNullAt(1);
    /* 060 */       } else {
    /* 061 */         mutableStateArray2[1].write(1, tmpInput.getUTF8String(1));
    /* 062 */       }
    /* 063 */
    /* 064 */
    /* 065 */       if (tmpInput.isNullAt(2)) {
    /* 066 */         mutableStateArray2[1].setNullAt(2);
    /* 067 */       } else {
    /* 068 */         mutableStateArray2[1].write(2, tmpInput.getUTF8String(2));
    /* 069 */       }
    /* 070 */     }
    /* 071 */
    /* 072 */     mutableStateArray2[0].setOffsetAndSize(0, tmpCursor, 
mutableStateArray1[0].cursor - tmpCursor);
    /* 073 */     
mutableStateArray[0].setTotalSize(mutableStateArray1[0].totalSize());
    /* 074 */     return mutableStateArray[0];
    /* 075 */   }
    /* 076 */
    /* 077 */
    /* 078 */ }
    ```
    
    Generated code with this PR
    ```
    /* 001 */ public java.lang.Object generate(Object[] references) {
    /* 002 */   return new SpecificUnsafeProjection(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ class SpecificUnsafeProjection extends 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
    /* 006 */
    /* 007 */   private Object[] references;
    /* 008 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] 
mutableStateArray1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];
    /* 009 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] 
mutableStateArray2 = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
    /* 010 */   private UnsafeRow[] mutableStateArray = new UnsafeRow[1];
    /* 011 */
    /* 012 */   public SpecificUnsafeProjection(Object[] references) {
    /* 013 */     this.references = references;
    /* 014 */     mutableStateArray[0] = new UnsafeRow(1);
    /* 015 */     mutableStateArray1[0] = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mutableStateArray[0],
 32);
    /* 016 */     mutableStateArray2[0] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mutableStateArray1[0],
 1);
    /* 017 */     mutableStateArray2[1] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mutableStateArray1[0],
 3);
    /* 018 */
    /* 019 */   }
    /* 020 */
    /* 021 */   public void initialize(int partitionIndex) {
    /* 022 */
    /* 023 */   }
    /* 024 */
    /* 025 */   // Scala.Function1 need this
    /* 026 */   public java.lang.Object apply(java.lang.Object row) {
    /* 027 */     return apply((InternalRow) row);
    /* 028 */   }
    /* 029 */
    /* 030 */   public UnsafeRow apply(InternalRow i) {
    /* 031 */     mutableStateArray1[0].reset();
    /* 032 */
    /* 033 */     InternalRow value = i.getStruct(0, 3);
    /* 034 */     // Remember the current cursor so that we can calculate how 
many bytes are
    /* 035 */     // written later.
    /* 036 */     final int tmpCursor = mutableStateArray1[0].cursor;
    /* 037 */
    /* 038 */     final InternalRow tmpInput = value;
    /* 039 */     if (tmpInput instanceof UnsafeRow) {
    /* 040 */
    /* 041 */       final int sizeInBytes = ((UnsafeRow) 
tmpInput).getSizeInBytes();
    /* 042 */       // grow the global buffer before writing data.
    /* 043 */       mutableStateArray1[0].grow(sizeInBytes);
    /* 044 */       ((UnsafeRow) 
tmpInput).writeToMemory(mutableStateArray1[0].buffer, 
mutableStateArray1[0].cursor);
    /* 045 */       mutableStateArray1[0].cursor += sizeInBytes;
    /* 046 */
    /* 047 */     } else {
    /* 048 */       mutableStateArray2[1].reset();
    /* 049 */
    /* 050 */
    /* 051 */       mutableStateArray2[1].write(0, tmpInput.getUTF8String(0));
    /* 052 */
    /* 053 */
    /* 054 */       mutableStateArray2[1].write(1, tmpInput.getUTF8String(1));
    /* 055 */
    /* 056 */
    /* 057 */       mutableStateArray2[1].write(2, tmpInput.getUTF8String(2));
    /* 058 */     }
    /* 059 */
    /* 060 */     mutableStateArray2[0].setOffsetAndSize(0, tmpCursor, 
mutableStateArray1[0].cursor - tmpCursor);
    /* 061 */     
mutableStateArray[0].setTotalSize(mutableStateArray1[0].totalSize());
    /* 062 */     return mutableStateArray[0];
    /* 063 */   }
    /* 064 */
    /* 065 */
    /* 066 */ }
    ```
    
    ## How was this patch tested?
    
    Added new test cases into `GenerateUnsafeProjectionSuite`

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kiszk/spark SPARK-23466

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20637.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20637
    
----
commit e2e9e3604284949d9d762274e2e1f55348851073
Author: Kazuaki Ishizaki <ishizaki@...>
Date:   2018-02-19T19:31:36Z

    initial commit

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to