GitHub user kiszk opened a pull request:
https://github.com/apache/spark/pull/11956
[SPARK-14098][SQL][WIP] Generate Java code that gets a value in each column
of CachedBatch when DataFrame.cache() is called
## What changes were proposed in this pull request?
This PR generates Java code to get a value of each column from CachedBatch
when DataFrame.cache() is called. This is done in whole stage code generation.
When DataFrame.cache() is called, data is stored as column-oriented storage
(columnar cache) in CachedBatch. In some cases, data is compressed. This PR
avoid conversion from column-oriented storage to row-oriented storage.
This PR consists of two parts.
1. Pass data in CachedBatch to generated code. CachedBatch consists of
multiple ByteBuffer arrays. A ByteBuffer may have compressed data. If the array
is compressed, decompress it and pass it to generated code. If the array is not
compressed, it is just passed to generated code.
2. Generate code both for row-oriented and column-oriented storages if
InMemoryColumnarTableScan exists in a plan tree. A choice is performed by
checking an given iterator is ColumnaIterator at runtime
The initial version has the following limitations:
- generates Java code for columnar cache only if types in all columns are
primitive.
- metricValue may not be correct (see Line 42 in the below code)
- directly refer to variables in Decoder, CachedBatch, and so on (should
define APIs)
Motivating example
````
val df = (0 to 10).map(i => ((i + 10).toInt, i.toInt)).toDF("k", "v")
df.cache().filter("v <= 8").show()
````
Generated code
````java
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Filter (v#6 <= 8)
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */ private Object[] references;
/* 011 */ private scala.collection.Iterator inputadapter_input;
/* 012 */ private org.apache.spark.sql.execution.metric.LongSQLMetric
filter_numOutputRows;
/* 013 */ private
org.apache.spark.sql.execution.metric.LongSQLMetricValue filter_metricValue;
/* 014 */ private UnsafeRow filter_result;
/* 015 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 016 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
filter_rowWriter;
/* 017 */ private scala.collection.Iterator inputadapter_input1;
/* 018 */ private org.apache.spark.sql.execution.vectorized.ColumnVector
inputadapter_col0;
/* 019 */ private org.apache.spark.sql.execution.vectorized.ColumnVector
inputadapter_col1;
/* 020 */ private org.apache.spark.sql.execution.metric.LongSQLMetric
filter_numOutputRows1;
/* 021 */ private
org.apache.spark.sql.execution.metric.LongSQLMetricValue filter_metricValue1;
/* 022 */ private UnsafeRow filter_result1;
/* 023 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder1;
/* 024 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
filter_rowWriter1;
/* 025 */ private Object columnar_batch;
/* 026 */ private int columnar_batchIdx;
/* 027 */
/* 028 */ public GeneratedIterator(Object[] references) {
/* 029 */ this.references = references;
/* 030 */ }
/* 031 */
/* 032 */ public void init(scala.collection.Iterator inputs[]) {
/* 033 */ inputadapter_input = inputs[0];
/* 034 */ this.filter_numOutputRows =
(org.apache.spark.sql.execution.metric.LongSQLMetric) references[0];
/* 035 */ filter_metricValue =
(org.apache.spark.sql.execution.metric.LongSQLMetricValue)
filter_numOutputRows.localValue();
/* 036 */ filter_result = new UnsafeRow(2);
/* 037 */ this.filter_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result,
0);
/* 038 */ this.filter_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder,
2);
/* 039 */ inputadapter_input1 = inputs[0];
/* 040 */ inputadapter_col0 = null;
/* 041 */ inputadapter_col1 = null;
/* 042 */ this.filter_numOutputRows1 =
(org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 043 */ filter_metricValue1 =
(org.apache.spark.sql.execution.metric.LongSQLMetricValue)
filter_numOutputRows1.localValue();
/* 044 */ filter_result1 = new UnsafeRow(2);
/* 045 */ this.filter_holder1 = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result1,
0);
/* 046 */ this.filter_rowWriter1 = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder1,
2);
/* 047 */ columnar_batch = null;
/* 048 */ columnar_batchIdx = 0;
/* 049 */ }
/* 050 */
/* 051 */ private void processBatch(scala.collection.Iterator itr) throws
java.io.IOException {
/* 052 */ while (true) {
/* 053 */ if (columnar_batchIdx == 0) {
/* 054 */ if (itr.hasNext()) {
/* 055 */ columnar_batch = itr.next();
/* 056 */ } else {
/* 057 */ cleanup();
/* 058 */ return;
/* 059 */ }
/* 060 */ }
/* 061 */ /*** PRODUCE: Filter (v#6 <= 8) */
/* 062 */
/* 063 */ /*** PRODUCE: INPUT */
/* 064 */
/* 065 */ org.apache.spark.sql.execution.columnar.CachedBatch batch =
/* 066 */ (org.apache.spark.sql.execution.columnar.CachedBatch)
columnar_batch;
/* 067 */
/* 068 */ if (columnar_batchIdx == 0) {
/* 069 */ inputadapter_col0 =
batch.column(((org.apache.spark.sql.execution.columnar.ColumnarIterator)inputadapter_input1).getColumnIndexes()[0]);
/* 070 */ inputadapter_col1 =
batch.column(((org.apache.spark.sql.execution.columnar.ColumnarIterator)inputadapter_input1).getColumnIndexes()[1]);
/* 071 */ }
/* 072 */
/* 073 */ int inputadapter_numRows = batch.numRows();
/* 074 */ while (!shouldStop() && (columnar_batchIdx <
inputadapter_numRows)) {
/* 075 */ int inputadapter_rowIdx = columnar_batchIdx++;
/* 076 */ /*** CONSUME: Filter (v#6 <= 8) */
/* 077 */ /* columnVector[inputadapter_col1, inputadapter_rowIdx,
int] */
/* 078 */ int filter_value8 =
inputadapter_col1.getInt(inputadapter_rowIdx);
/* 079 */
/* 080 */ /* (input[1, int] <= 8) */
/* 081 */ boolean filter_value9 = false;
/* 082 */ filter_value9 = filter_value8 <= 8;
/* 083 */ if (!filter_value9) continue;
/* 084 */
/* 085 */ filter_metricValue1.add(1);
/* 086 */ /* columnVector[inputadapter_col0, inputadapter_rowIdx,
int] */
/* 087 */ int filter_value7 =
inputadapter_col0.getInt(inputadapter_rowIdx);
/* 088 */ filter_rowWriter1.write(0, filter_value7);
/* 089 */
/* 090 */ filter_rowWriter1.write(1, filter_value8);
/* 091 */ append(filter_result1);
/* 092 */ }
/* 093 */
/* 094 */ if (shouldStop()) return;
/* 095 */ columnar_batchIdx = 0;
/* 096 */ }
/* 097 */ }
/* 098 */
/* 099 */ private void processRow() throws java.io.IOException {
/* 100 */ /*** PRODUCE: Filter (v#6 <= 8) */
/* 101 */
/* 102 */ /*** PRODUCE: INPUT */
/* 103 */
/* 104 */ while (!shouldStop() && inputadapter_input.hasNext()) {
/* 105 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 106 */ /*** CONSUME: Filter (v#6 <= 8) */
/* 107 */ /* input[1, int] */
/* 108 */ int filter_value1 = inputadapter_row.getInt(1);
/* 109 */
/* 110 */ /* (input[1, int] <= 8) */
/* 111 */ boolean filter_value2 = false;
/* 112 */ filter_value2 = filter_value1 <= 8;
/* 113 */ if (!filter_value2) continue;
/* 114 */
/* 115 */ filter_metricValue.add(1);
/* 116 */ /* input[0, int] */
/* 117 */ int filter_value = inputadapter_row.getInt(0);
/* 118 */ filter_rowWriter.write(0, filter_value);
/* 119 */
/* 120 */ filter_rowWriter.write(1, filter_value1);
/* 121 */ append(filter_result);
/* 122 */ }
/* 123 */ }
/* 124 */
/* 125 */ private void cleanup() {
/* 126 */ columnar_batch = null;
/* 127 */
/* 128 */ inputadapter_col0 = null;
/* 129 */ inputadapter_col1 = null;
/* 130 */
/* 131 */ }
/* 132 */
/* 133 */ protected void processNext() throws java.io.IOException {
/* 134 */ org.apache.spark.sql.execution.columnar.ColumnarIterator
columnItr = null;
/* 135 */ if (columnar_batch != null) {
/* 136 */ columnItr =
(org.apache.spark.sql.execution.columnar.ColumnarIterator)
/* 137 */ inputadapter_input1;
/* 138 */ processBatch(columnItr.getInput());
/* 139 */ } else if (inputadapter_input1 instanceof
/* 140 */ org.apache.spark.sql.execution.columnar.ColumnarIterator &&
/* 141 */ ((columnItr =
(org.apache.spark.sql.execution.columnar.ColumnarIterator)
/* 142 */ inputadapter_input1).isSupportColumnarCodeGen())) {
/* 143 */ columnar_batchIdx = 0;
/* 144 */ processBatch(columnItr.getInput());
/* 145 */ } else {
/* 146 */ processRow();
/* 147 */ }
/* 148 */ }
/* 149 */ }
````
## How was this patch tested?
Tested existing test suites
Should add test suites for operations to dataframe generated by df.cache().
(If this patch involves UI changes, please attach a screenshot; otherwise,
remove this)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kiszk/spark SPARK-14098
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/11956.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #11956
----
commit 52758a529cf1e3ae284ec6c6dcf3f93c8224acec
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-03-25T08:17:38Z
add a property 'spark.sql.inMemoryColumnarStorage.codegen' to controll code
generation
commit f8c29107ca9a31fbdae78133637838dac318d5a8
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-03-25T08:22:45Z
add CachedBatch.column() to prepare a columnar storage that is accessed by
generated code
commit 4de1376e6f64651d3e5e8368ecc283d3243b4c51
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-03-25T08:26:20Z
add utility methods for putting a primitive value to ByteBuffer
commit 63e2fcb8a376f23fc6a2203d21593ed3b8f4c9af
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-03-25T08:32:32Z
add ByteBufferColumnVector, which wraps ByteByffer for columnar storage, as
an implementation of ColumnVector
commit a920ea5e5a946721337c6231b9406d228a387af0
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-03-25T08:34:59Z
add APIs to get a CachedBatch
commit f2a72807e9e94395498ebba60515b3b19fdc00a2
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-03-25T08:36:10Z
add decompress() method
commit a1699de143b6750778a0b9f200dd5194c951841d
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-03-25T08:36:49Z
add decompress() method
commit 4211ffb6f36ff3a9b493eff233a53c1a33b68a50
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-03-25T08:37:20Z
add ColumnVectorReference class
commit ee3ae412beea4b6c28f145bc65ac03f71e506a56
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-03-25T08:38:52Z
Do not call createHashMap() until actually executed at first
commit 5ae981d3515c3dbbc17f43d1bc0bc8196578c544
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-03-25T08:40:45Z
generate two version of Java codes for row-oriented and column-oriented
storages if InMemoryColumnar exists in a tree plan
commit f61c685947fa0818f6c2d18e40a9025514501e27
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-03-25T08:58:29Z
apply SPARK-14092 to generated code for columnar storage
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]