GitHub user kiszk opened a pull request:

    https://github.com/apache/spark/pull/13171

    [SPARK-15380][SQL][WIP] Generate code that stores a float/double value in 
each column from ColumnarBatch when DataFrame.cache() is used

    ## What changes were proposed in this pull request?
    
    This PR generates Java code to store a computed float/double value of each 
column into ```ColumnarBatch` when DataFrame.cache() is called. This is done in 
whole stage code generation.
    
    Even when data is read from ParquetReader (data is kept in 
```ColumnarBatch```), the computed value is stored into ```UnsafeRow``` for 
now. Then, the data is stored into ```CachedBatch``` for DataFrame.cache(). 
This leads to data format conversions from columnar storage to row-oriented 
storage and from row-oriented storage to columnar storage. This PR avoid 
conversions by storing the computed value into a columnar storege.
    
    This PR handles only float and double that are stored in a column without 
compression. Another PR will handle other primitive types that may be stored in 
a column in a compressed format. This is for ease of review by reducing the 
size of PR
    
    This PR will consist of three parts.
    
    1. Store the computed value into ```CachedBatch``` when the original value 
is read from ```ColumnarStorage```.
    
    2. Create ```CachedBatch``` for ```df.cache()``` from the 
```ColumnarStorage```
    
    3. Decide whether 1. will be done or not based on the successor operation. 
If the successor operation is not ```df.cache()```, 1. will not occur.
    
    This PR generates Java code for columnar cache only if types in all 
columns, which are accessed in operations, are primitive
    
    Motivating example:
    
    ````
    sc.parallelize(1 to 8, 1).map(i => 
i.toFloat).toDF("f").write.parquet(parquetDir)
    val parquetDF = sqlContext.read.parquet(parquetDir)
    val df = parquetDF.selectExpr("f + 1").cache.show
    ````
    
    Generated code
    ````java
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIterator(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ /** Codegened pipeline for:
    /* 006 */ * Project [(f#7 + 1.0) AS (f + 1)#9]
    /* 007 */ +- BatchedScan parquet [f#7] Format: ParquetFormat, InputPaths: 
file:/C:/Users/ishizaki/AppDa...
    /* 008 */ */
    /* 009 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 010 */   private Object[] references;
    /* 011 */   private scala.collection.Iterator scan_input;
    /* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric 
scan_numOutputRows;
    /* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric 
scan_scanTime;
    /* 014 */   private long scan_scanTime1;
    /* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
scan_batch;
    /* 016 */   private int scan_batchIdx;
    /* 017 */   private org.apache.spark.sql.execution.vectorized.ColumnVector 
scan_colInstance0;
    /* 018 */   private UnsafeRow scan_result;
    /* 019 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder scan_holder;
    /* 020 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
scan_rowWriter;
    /* 021 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
project_columnarBatch;
    /* 022 */   private org.apache.spark.sql.execution.vectorized.ColumnVector 
project_colOutInstance0;
    /* 023 */
    /* 024 */   public GeneratedIterator(Object[] references) {
    /* 025 */     this.references = references;
    /* 026 */   }
    /* 027 */
    /* 028 */   public void init(int index, scala.collection.Iterator inputs[]) 
{
    /* 029 */     partitionIndex = index;
    /* 030 */     scan_input = inputs[0];
    /* 031 */     this.scan_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[0];
    /* 032 */     this.scan_scanTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
    /* 033 */     scan_scanTime1 = 0;
    /* 034 */     scan_batch = null;
    /* 035 */     scan_batchIdx = 0;
    /* 036 */     scan_colInstance0 = null;
    /* 037 */     scan_result = new UnsafeRow(1);
    /* 038 */     this.scan_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_result, 0);
    /* 039 */     this.scan_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_holder, 
1);
    /* 040 */
    /* 041 */     project_allocateColumnarStorage();
    /* 042 */   }
    /* 043 */
    /* 044 */   private void scan_nextBatch() throws java.io.IOException {
    /* 045 */     long getBatchStart = System.nanoTime();
    /* 046 */     if (scan_input.hasNext()) {
    /* 047 */       scan_batch = 
(org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
    /* 048 */       scan_numOutputRows.add(scan_batch.numRows());
    /* 049 */       scan_batchIdx = 0;
    /* 050 */       scan_colInstance0 = scan_batch.column(0);
    /* 051 */
    /* 052 */     }
    /* 053 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
    /* 054 */   }
    /* 055 */
    /* 056 */   void project_allocateColumnarStorage() {
    /* 057 */     org.apache.spark.sql.types.StructType project_batchSchema =
    /* 058 */     new org.apache.spark.sql.types.StructType(
    /* 059 */       new org.apache.spark.sql.types.StructField[] {
    /* 060 */         new org.apache.spark.sql.types.StructField(
    /* 061 */           "col0", org.apache.spark.sql.types.DataTypes.FloatType, 
true, org.apache.spark.sql.types.Metadata.empty())
    /* 062 */
    /* 063 */       });
    /* 064 */
    /* 065 */     project_columnarBatch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
    /* 066 */       project_batchSchema, 
org.apache.spark.memory.MemoryMode.ON_HEAP);
    /* 067 */     registerColumnarBatch(project_columnarBatch);
    /* 068 */     project_colOutInstance0 = project_columnarBatch.column(0);
    /* 069 */
    /* 070 */   }
    /* 071 */
    /* 072 */   protected void processNext() throws java.io.IOException {
    /* 073 */     /*** PRODUCE: Project [(f#7 + 1.0) AS (f + 1)#9] */
    /* 074 */
    /* 075 */     /*** PRODUCE: BatchedScan parquet [f#7] Format: 
ParquetFormat, InputPaths: ... */
    /* 076 */
    /* 077 */     if (scan_batch == null) {
    /* 078 */       isColumnarBatchAccessed = true;
    /* 079 */       scan_nextBatch();
    /* 080 */     }
    /* 081 */     int scan_rowWriteIdx = 0;
    /* 082 */     while (scan_batch != null) {
    /* 083 */       int numRows = scan_batch.numRows();
    /* 084 */       while (scan_batchIdx < numRows) {
    /* 085 */         int scan_rowIdx = scan_batchIdx++;
    /* 086 */         /*** CONSUME: Project [(f#7 + 1.0) AS (f + 1)#9] */
    /* 087 */
    /* 088 */         /*** CONSUME: WholeStageCodegen */
    /* 089 */
    /* 090 */         /* (input[0, float] + 1.0) */
    /* 091 */         boolean project_isNull = true;
    /* 092 */         float project_value = -1.0f;
    /* 093 */         /* input[0, float] */
    /* 094 */         /* columnVector[scan_colInstance0, scan_rowIdx, float] */
    /* 095 */         boolean scan_isNull = 
scan_colInstance0.isNullAt(scan_rowIdx);
    /* 096 */         float scan_value = scan_isNull ? -1.0f : 
(scan_colInstance0.getFloat(scan_rowIdx));
    /* 097 */         if (!scan_isNull) {
    /* 098 */           project_isNull = false; // resultCode could change 
nullability.
    /* 099 */           project_value = scan_value + 1.0f;
    /* 100 */
    /* 101 */         }
    /* 102 */         if (project_isNull) {
    /* 103 */           project_colOutInstance0.putNull(scan_rowWriteIdx);
    /* 104 */         } else {
    /* 105 */           System.out.println("rowIdx["+scan_rowWriteIdx+"]: 
v="+project_value);
    /* 106 */           project_colOutInstance0.putFloat(scan_rowWriteIdx, 
project_value);
    /* 107 */         }
    /* 108 */         scan_rowWriteIdx++;
    /* 109 */         project_columnarBatch.setNumRows(scan_rowWriteIdx);
    /* 110 */         if (shouldStop()) return;
    /* 111 */       }
    /* 112 */       scan_batch = null;
    /* 113 */       scan_nextBatch();
    /* 114 */     }
    /* 115 */     scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
    /* 116 */     scan_scanTime1 = 0;
    /* 117 */   }
    /* 118 */ }
    
    ````
    
    ## How was this patch tested?
    
    Not tested yet
    
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kiszk/spark SPARK-15380

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/13171.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #13171
    
----
commit b0b595011b718be3773aa7341d6610495b48e9e4
Author: Kazuaki Ishizaki <[email protected]>
Date:   2016-05-18T10:54:00Z

    initial commit

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to