GitHub user kiszk opened a pull request:
https://github.com/apache/spark/pull/13171
[SPARK-15380][SQL][WIP] Generate code that stores a float/double value in
each column from ColumnarBatch when DataFrame.cache() is used
## What changes were proposed in this pull request?
This PR generates Java code to store a computed float/double value of each
column into ```ColumnarBatch` when DataFrame.cache() is called. This is done in
whole stage code generation.
Even when data is read from ParquetReader (data is kept in
```ColumnarBatch```), the computed value is stored into ```UnsafeRow``` for
now. Then, the data is stored into ```CachedBatch``` for DataFrame.cache().
This leads to data format conversions from columnar storage to row-oriented
storage and from row-oriented storage to columnar storage. This PR avoid
conversions by storing the computed value into a columnar storege.
This PR handles only float and double that are stored in a column without
compression. Another PR will handle other primitive types that may be stored in
a column in a compressed format. This is for ease of review by reducing the
size of PR
This PR will consist of three parts.
1. Store the computed value into ```CachedBatch``` when the original value
is read from ```ColumnarStorage```.
2. Create ```CachedBatch``` for ```df.cache()``` from the
```ColumnarStorage```
3. Decide whether 1. will be done or not based on the successor operation.
If the successor operation is not ```df.cache()```, 1. will not occur.
This PR generates Java code for columnar cache only if types in all
columns, which are accessed in operations, are primitive
Motivating example:
````
sc.parallelize(1 to 8, 1).map(i =>
i.toFloat).toDF("f").write.parquet(parquetDir)
val parquetDF = sqlContext.read.parquet(parquetDir)
val df = parquetDF.selectExpr("f + 1").cache.show
````
Generated code
````java
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Project [(f#7 + 1.0) AS (f + 1)#9]
/* 007 */ +- BatchedScan parquet [f#7] Format: ParquetFormat, InputPaths:
file:/C:/Users/ishizaki/AppDa...
/* 008 */ */
/* 009 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */ private Object[] references;
/* 011 */ private scala.collection.Iterator scan_input;
/* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric
scan_numOutputRows;
/* 013 */ private org.apache.spark.sql.execution.metric.SQLMetric
scan_scanTime;
/* 014 */ private long scan_scanTime1;
/* 015 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch
scan_batch;
/* 016 */ private int scan_batchIdx;
/* 017 */ private org.apache.spark.sql.execution.vectorized.ColumnVector
scan_colInstance0;
/* 018 */ private UnsafeRow scan_result;
/* 019 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder scan_holder;
/* 020 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
scan_rowWriter;
/* 021 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch
project_columnarBatch;
/* 022 */ private org.apache.spark.sql.execution.vectorized.ColumnVector
project_colOutInstance0;
/* 023 */
/* 024 */ public GeneratedIterator(Object[] references) {
/* 025 */ this.references = references;
/* 026 */ }
/* 027 */
/* 028 */ public void init(int index, scala.collection.Iterator inputs[])
{
/* 029 */ partitionIndex = index;
/* 030 */ scan_input = inputs[0];
/* 031 */ this.scan_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 032 */ this.scan_scanTime =
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 033 */ scan_scanTime1 = 0;
/* 034 */ scan_batch = null;
/* 035 */ scan_batchIdx = 0;
/* 036 */ scan_colInstance0 = null;
/* 037 */ scan_result = new UnsafeRow(1);
/* 038 */ this.scan_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_result, 0);
/* 039 */ this.scan_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_holder,
1);
/* 040 */
/* 041 */ project_allocateColumnarStorage();
/* 042 */ }
/* 043 */
/* 044 */ private void scan_nextBatch() throws java.io.IOException {
/* 045 */ long getBatchStart = System.nanoTime();
/* 046 */ if (scan_input.hasNext()) {
/* 047 */ scan_batch =
(org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 048 */ scan_numOutputRows.add(scan_batch.numRows());
/* 049 */ scan_batchIdx = 0;
/* 050 */ scan_colInstance0 = scan_batch.column(0);
/* 051 */
/* 052 */ }
/* 053 */ scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 054 */ }
/* 055 */
/* 056 */ void project_allocateColumnarStorage() {
/* 057 */ org.apache.spark.sql.types.StructType project_batchSchema =
/* 058 */ new org.apache.spark.sql.types.StructType(
/* 059 */ new org.apache.spark.sql.types.StructField[] {
/* 060 */ new org.apache.spark.sql.types.StructField(
/* 061 */ "col0", org.apache.spark.sql.types.DataTypes.FloatType,
true, org.apache.spark.sql.types.Metadata.empty())
/* 062 */
/* 063 */ });
/* 064 */
/* 065 */ project_columnarBatch =
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
/* 066 */ project_batchSchema,
org.apache.spark.memory.MemoryMode.ON_HEAP);
/* 067 */ registerColumnarBatch(project_columnarBatch);
/* 068 */ project_colOutInstance0 = project_columnarBatch.column(0);
/* 069 */
/* 070 */ }
/* 071 */
/* 072 */ protected void processNext() throws java.io.IOException {
/* 073 */ /*** PRODUCE: Project [(f#7 + 1.0) AS (f + 1)#9] */
/* 074 */
/* 075 */ /*** PRODUCE: BatchedScan parquet [f#7] Format:
ParquetFormat, InputPaths: ... */
/* 076 */
/* 077 */ if (scan_batch == null) {
/* 078 */ isColumnarBatchAccessed = true;
/* 079 */ scan_nextBatch();
/* 080 */ }
/* 081 */ int scan_rowWriteIdx = 0;
/* 082 */ while (scan_batch != null) {
/* 083 */ int numRows = scan_batch.numRows();
/* 084 */ while (scan_batchIdx < numRows) {
/* 085 */ int scan_rowIdx = scan_batchIdx++;
/* 086 */ /*** CONSUME: Project [(f#7 + 1.0) AS (f + 1)#9] */
/* 087 */
/* 088 */ /*** CONSUME: WholeStageCodegen */
/* 089 */
/* 090 */ /* (input[0, float] + 1.0) */
/* 091 */ boolean project_isNull = true;
/* 092 */ float project_value = -1.0f;
/* 093 */ /* input[0, float] */
/* 094 */ /* columnVector[scan_colInstance0, scan_rowIdx, float] */
/* 095 */ boolean scan_isNull =
scan_colInstance0.isNullAt(scan_rowIdx);
/* 096 */ float scan_value = scan_isNull ? -1.0f :
(scan_colInstance0.getFloat(scan_rowIdx));
/* 097 */ if (!scan_isNull) {
/* 098 */ project_isNull = false; // resultCode could change
nullability.
/* 099 */ project_value = scan_value + 1.0f;
/* 100 */
/* 101 */ }
/* 102 */ if (project_isNull) {
/* 103 */ project_colOutInstance0.putNull(scan_rowWriteIdx);
/* 104 */ } else {
/* 105 */ System.out.println("rowIdx["+scan_rowWriteIdx+"]:
v="+project_value);
/* 106 */ project_colOutInstance0.putFloat(scan_rowWriteIdx,
project_value);
/* 107 */ }
/* 108 */ scan_rowWriteIdx++;
/* 109 */ project_columnarBatch.setNumRows(scan_rowWriteIdx);
/* 110 */ if (shouldStop()) return;
/* 111 */ }
/* 112 */ scan_batch = null;
/* 113 */ scan_nextBatch();
/* 114 */ }
/* 115 */ scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
/* 116 */ scan_scanTime1 = 0;
/* 117 */ }
/* 118 */ }
````
## How was this patch tested?
Not tested yet
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kiszk/spark SPARK-15380
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/13171.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #13171
----
commit b0b595011b718be3773aa7341d6610495b48e9e4
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-05-18T10:54:00Z
initial commit
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]