GitHub user kiszk opened a pull request:

    https://github.com/apache/spark/pull/11956

    [SPARK-14098][SQL][WIP] Generate Java code that gets a value in each column 
of CachedBatch when DataFrame.cache() is called

    ## What changes were proposed in this pull request?
    
    This PR generates Java code to get a value of each column from CachedBatch 
when DataFrame.cache() is called. This is done in whole stage code generation.
    
    When DataFrame.cache() is called, data is stored as column-oriented storage 
(columnar cache) in CachedBatch. In some cases, data is compressed. This PR 
avoid conversion from column-oriented storage to row-oriented storage.
    
    This PR consists of two parts.
    
    1. Pass data in CachedBatch to generated code. CachedBatch consists of 
multiple ByteBuffer arrays. A ByteBuffer may have compressed data. If the array 
is compressed, decompress it and pass it to generated code. If the array is not 
compressed, it is just passed to generated code.
    
    2. Generate code both for row-oriented and column-oriented storages if 
InMemoryColumnarTableScan exists in a plan tree. A choice is performed by 
checking an given iterator is ColumnaIterator at runtime
    
    
    The initial version has the following limitations:
    - generates Java code for columnar cache only if types in all columns are 
primitive.
    - metricValue may not be correct (see Line 42 in the below code)
    - directly refer to variables in Decoder, CachedBatch, and so on (should 
define APIs)
    
    
    Motivating example
    
    ````
        val df = (0 to 10).map(i => ((i + 10).toInt, i.toInt)).toDF("k", "v")
        df.cache().filter("v <= 8").show()
    ````
    
    Generated code
    ````java
    /* 005 */ /** Codegened pipeline for:
    /* 006 */ * Filter (v#6 <= 8)
    /* 007 */ +- INPUT
    /* 008 */ */
    /* 009 */ class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 010 */   private Object[] references;
    /* 011 */   private scala.collection.Iterator inputadapter_input;
    /* 012 */   private org.apache.spark.sql.execution.metric.LongSQLMetric 
filter_numOutputRows;
    /* 013 */   private 
org.apache.spark.sql.execution.metric.LongSQLMetricValue filter_metricValue;
    /* 014 */   private UnsafeRow filter_result;
    /* 015 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
    /* 016 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
filter_rowWriter;
    /* 017 */   private scala.collection.Iterator inputadapter_input1;
    /* 018 */   private org.apache.spark.sql.execution.vectorized.ColumnVector 
inputadapter_col0;
    /* 019 */   private org.apache.spark.sql.execution.vectorized.ColumnVector 
inputadapter_col1;
    /* 020 */   private org.apache.spark.sql.execution.metric.LongSQLMetric 
filter_numOutputRows1;
    /* 021 */   private 
org.apache.spark.sql.execution.metric.LongSQLMetricValue filter_metricValue1;
    /* 022 */   private UnsafeRow filter_result1;
    /* 023 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder1;
    /* 024 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
filter_rowWriter1;
    /* 025 */   private Object columnar_batch;
    /* 026 */   private int columnar_batchIdx;
    /* 027 */
    /* 028 */   public GeneratedIterator(Object[] references) {
    /* 029 */     this.references = references;
    /* 030 */   }
    /* 031 */
    /* 032 */   public void init(scala.collection.Iterator inputs[]) {
    /* 033 */     inputadapter_input = inputs[0];
    /* 034 */     this.filter_numOutputRows = 
(org.apache.spark.sql.execution.metric.LongSQLMetric) references[0];
    /* 035 */     filter_metricValue = 
(org.apache.spark.sql.execution.metric.LongSQLMetricValue) 
filter_numOutputRows.localValue();
    /* 036 */     filter_result = new UnsafeRow(2);
    /* 037 */     this.filter_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 
0);
    /* 038 */     this.filter_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder,
 2);
    /* 039 */     inputadapter_input1 = inputs[0];
    /* 040 */     inputadapter_col0 = null;
    /* 041 */     inputadapter_col1 = null;
    /* 042 */     this.filter_numOutputRows1 = 
(org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
    /* 043 */     filter_metricValue1 = 
(org.apache.spark.sql.execution.metric.LongSQLMetricValue) 
filter_numOutputRows1.localValue();
    /* 044 */     filter_result1 = new UnsafeRow(2);
    /* 045 */     this.filter_holder1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result1, 
0);
    /* 046 */     this.filter_rowWriter1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder1,
 2);
    /* 047 */     columnar_batch = null;
    /* 048 */     columnar_batchIdx = 0;
    /* 049 */   }
    /* 050 */
    /* 051 */   private void processBatch(scala.collection.Iterator itr) throws 
java.io.IOException {
    /* 052 */     while (true) {
    /* 053 */       if (columnar_batchIdx == 0) {
    /* 054 */         if (itr.hasNext()) {
    /* 055 */           columnar_batch = itr.next();
    /* 056 */         } else {
    /* 057 */           cleanup();
    /* 058 */           return;
    /* 059 */         }
    /* 060 */       }
    /* 061 */       /*** PRODUCE: Filter (v#6 <= 8) */
    /* 062 */
    /* 063 */       /*** PRODUCE: INPUT */
    /* 064 */
    /* 065 */       org.apache.spark.sql.execution.columnar.CachedBatch batch =
    /* 066 */       (org.apache.spark.sql.execution.columnar.CachedBatch) 
columnar_batch;
    /* 067 */
    /* 068 */       if (columnar_batchIdx == 0) {
    /* 069 */         inputadapter_col0 = 
batch.column(((org.apache.spark.sql.execution.columnar.ColumnarIterator)inputadapter_input1).getColumnIndexes()[0]);
    /* 070 */         inputadapter_col1 = 
batch.column(((org.apache.spark.sql.execution.columnar.ColumnarIterator)inputadapter_input1).getColumnIndexes()[1]);
    /* 071 */       }
    /* 072 */
    /* 073 */       int inputadapter_numRows = batch.numRows();
    /* 074 */       while (!shouldStop() && (columnar_batchIdx < 
inputadapter_numRows)) {
    /* 075 */         int inputadapter_rowIdx = columnar_batchIdx++;
    /* 076 */         /*** CONSUME: Filter (v#6 <= 8) */
    /* 077 */         /* columnVector[inputadapter_col1, inputadapter_rowIdx, 
int] */
    /* 078 */         int filter_value8 = 
inputadapter_col1.getInt(inputadapter_rowIdx);
    /* 079 */
    /* 080 */         /* (input[1, int] <= 8) */
    /* 081 */         boolean filter_value9 = false;
    /* 082 */         filter_value9 = filter_value8 <= 8;
    /* 083 */         if (!filter_value9) continue;
    /* 084 */
    /* 085 */         filter_metricValue1.add(1);
    /* 086 */         /* columnVector[inputadapter_col0, inputadapter_rowIdx, 
int] */
    /* 087 */         int filter_value7 = 
inputadapter_col0.getInt(inputadapter_rowIdx);
    /* 088 */         filter_rowWriter1.write(0, filter_value7);
    /* 089 */
    /* 090 */         filter_rowWriter1.write(1, filter_value8);
    /* 091 */         append(filter_result1);
    /* 092 */       }
    /* 093 */
    /* 094 */       if (shouldStop()) return;
    /* 095 */       columnar_batchIdx = 0;
    /* 096 */     }
    /* 097 */   }
    /* 098 */
    /* 099 */   private void processRow() throws java.io.IOException {
    /* 100 */     /*** PRODUCE: Filter (v#6 <= 8) */
    /* 101 */
    /* 102 */     /*** PRODUCE: INPUT */
    /* 103 */
    /* 104 */     while (!shouldStop() && inputadapter_input.hasNext()) {
    /* 105 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 106 */       /*** CONSUME: Filter (v#6 <= 8) */
    /* 107 */       /* input[1, int] */
    /* 108 */       int filter_value1 = inputadapter_row.getInt(1);
    /* 109 */
    /* 110 */       /* (input[1, int] <= 8) */
    /* 111 */       boolean filter_value2 = false;
    /* 112 */       filter_value2 = filter_value1 <= 8;
    /* 113 */       if (!filter_value2) continue;
    /* 114 */
    /* 115 */       filter_metricValue.add(1);
    /* 116 */       /* input[0, int] */
    /* 117 */       int filter_value = inputadapter_row.getInt(0);
    /* 118 */       filter_rowWriter.write(0, filter_value);
    /* 119 */
    /* 120 */       filter_rowWriter.write(1, filter_value1);
    /* 121 */       append(filter_result);
    /* 122 */     }
    /* 123 */   }
    /* 124 */
    /* 125 */   private void cleanup() {
    /* 126 */     columnar_batch = null;
    /* 127 */
    /* 128 */     inputadapter_col0 = null;
    /* 129 */     inputadapter_col1 = null;
    /* 130 */
    /* 131 */   }
    /* 132 */
    /* 133 */   protected void processNext() throws java.io.IOException {
    /* 134 */     org.apache.spark.sql.execution.columnar.ColumnarIterator 
columnItr = null;
    /* 135 */     if (columnar_batch != null) {
    /* 136 */       columnItr = 
(org.apache.spark.sql.execution.columnar.ColumnarIterator)
    /* 137 */       inputadapter_input1;
    /* 138 */       processBatch(columnItr.getInput());
    /* 139 */     } else if (inputadapter_input1 instanceof
    /* 140 */       org.apache.spark.sql.execution.columnar.ColumnarIterator &&
    /* 141 */       ((columnItr = 
(org.apache.spark.sql.execution.columnar.ColumnarIterator)
    /* 142 */           inputadapter_input1).isSupportColumnarCodeGen())) {
    /* 143 */       columnar_batchIdx = 0;
    /* 144 */       processBatch(columnItr.getInput());
    /* 145 */     } else {
    /* 146 */       processRow();
    /* 147 */     }
    /* 148 */   }
    /* 149 */ }
    ````
    
    ## How was this patch tested?
    
    Tested existing test suites
    Should add test suites for operations to dataframe generated by df.cache().
    
    
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kiszk/spark SPARK-14098

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11956.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11956
    
----
commit 52758a529cf1e3ae284ec6c6dcf3f93c8224acec
Author: Kazuaki Ishizaki <[email protected]>
Date:   2016-03-25T08:17:38Z

    add a property 'spark.sql.inMemoryColumnarStorage.codegen' to controll code 
generation

commit f8c29107ca9a31fbdae78133637838dac318d5a8
Author: Kazuaki Ishizaki <[email protected]>
Date:   2016-03-25T08:22:45Z

    add CachedBatch.column() to prepare a columnar storage that is accessed by 
generated code

commit 4de1376e6f64651d3e5e8368ecc283d3243b4c51
Author: Kazuaki Ishizaki <[email protected]>
Date:   2016-03-25T08:26:20Z

    add utility methods for putting a primitive value to ByteBuffer

commit 63e2fcb8a376f23fc6a2203d21593ed3b8f4c9af
Author: Kazuaki Ishizaki <[email protected]>
Date:   2016-03-25T08:32:32Z

    add ByteBufferColumnVector, which wraps ByteByffer for columnar storage, as 
an implementation of ColumnVector

commit a920ea5e5a946721337c6231b9406d228a387af0
Author: Kazuaki Ishizaki <[email protected]>
Date:   2016-03-25T08:34:59Z

    add APIs to get a CachedBatch

commit f2a72807e9e94395498ebba60515b3b19fdc00a2
Author: Kazuaki Ishizaki <[email protected]>
Date:   2016-03-25T08:36:10Z

    add decompress() method

commit a1699de143b6750778a0b9f200dd5194c951841d
Author: Kazuaki Ishizaki <[email protected]>
Date:   2016-03-25T08:36:49Z

    add decompress() method

commit 4211ffb6f36ff3a9b493eff233a53c1a33b68a50
Author: Kazuaki Ishizaki <[email protected]>
Date:   2016-03-25T08:37:20Z

    add ColumnVectorReference class

commit ee3ae412beea4b6c28f145bc65ac03f71e506a56
Author: Kazuaki Ishizaki <[email protected]>
Date:   2016-03-25T08:38:52Z

    Do not call createHashMap() until actually executed at first

commit 5ae981d3515c3dbbc17f43d1bc0bc8196578c544
Author: Kazuaki Ishizaki <[email protected]>
Date:   2016-03-25T08:40:45Z

    generate two version of Java codes for row-oriented and column-oriented 
storages if InMemoryColumnar exists in a tree plan

commit f61c685947fa0818f6c2d18e40a9025514501e27
Author: Kazuaki Ishizaki <[email protected]>
Date:   2016-03-25T08:58:29Z

    apply SPARK-14092 to generated code for columnar storage

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to