GitHub user kiszk opened a pull request:
https://github.com/apache/spark/pull/17378
[SPARK-20046][SQL] Facilitate loop optimizations in a JIT compiler
regarding sqlContext.read.parquet()
## What changes were proposed in this pull request?
This PR improves performance of operations with `sqlContext.read.parquet()`
by changing Java code generated by Catalyst. This PR is inspired by [the blog
article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html)
and [this stackoverflow
entry](http://stackoverflow.com/questions/40629435/fast-parquet-row-count-in-spark).
This PR changes generated code in the following two points.
1. Replace a while-loop with long instance variables a for-loop with int
local variables
2. Suppress generation of `shouldStop()` method if this method is
unnecessary (e.g. `append()` is not generated).
These points facilitates compiler optimizations in a JIT compiler by
feeding the simplified Java code into the JIT compiler. The performance of
`sqlContext.read.parquet().count` is improved by 1.09x.
Benchmark program:
```java
val dir = "/dev/shm/parquet"
val N = 1000 * 1000 * 40
val iters = 20
val benchmark = new Benchmark("Parquet", N * iters, minNumIters = 5,
warmupTime = 30.seconds)
sparkSession.range(n).write.mode("overwrite").parquet(dir)
benchmark.addCase("count") { i: Int =>
var n = 0
var len = 0L
while (n < iters) {
len += sparkSession.read.parquet(dir).count
n += 1
}
}
benchmark.run
```
Performance result without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux
4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
Parquet: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
w/o this PR 1152 / 1211 694.7
1.4 1.0X
```
Performance result with this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux
4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
Parquet: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
with this PR 1053 / 1121 760.0
1.3 1.0X
```
Here is a comparison between generated code w/o and with this PR. Only the
method ```agg_doAggregateWithoutKey``` is changed.
Generated code without this PR
```java
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private boolean agg_initAgg;
/* 009 */ private boolean agg_bufIsNull;
/* 010 */ private long agg_bufValue;
/* 011 */ private scala.collection.Iterator scan_input;
/* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric
scan_numOutputRows;
/* 013 */ private org.apache.spark.sql.execution.metric.SQLMetric
scan_scanTime;
/* 014 */ private long scan_scanTime1;
/* 015 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch
scan_batch;
/* 016 */ private int scan_batchIdx;
/* 017 */ private org.apache.spark.sql.execution.metric.SQLMetric
agg_numOutputRows;
/* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric
agg_aggTime;
/* 019 */ private UnsafeRow agg_result;
/* 020 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 021 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 022 */
/* 023 */ public GeneratedIterator(Object[] references) {
/* 024 */ this.references = references;
/* 025 */ }
/* 026 */
/* 027 */ public void init(int index, scala.collection.Iterator[] inputs)
{
/* 028 */ partitionIndex = index;
/* 029 */ this.inputs = inputs;
/* 030 */ agg_initAgg = false;
/* 031 */
/* 032 */ scan_input = inputs[0];
/* 033 */ this.scan_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 034 */ this.scan_scanTime =
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 035 */ scan_scanTime1 = 0;
/* 036 */ scan_batch = null;
/* 037 */ scan_batchIdx = 0;
/* 038 */ this.agg_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 039 */ this.agg_aggTime =
(org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 040 */ agg_result = new UnsafeRow(1);
/* 041 */ this.agg_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 042 */ this.agg_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder,
1);
/* 043 */
/* 044 */ }
/* 045 */
/* 046 */ private void agg_doAggregateWithoutKey() throws
java.io.IOException {
/* 047 */ // initialize aggregation buffer
/* 048 */ agg_bufIsNull = false;
/* 049 */ agg_bufValue = 0L;
/* 050 */
/* 051 */ if (scan_batch == null) {
/* 052 */ scan_nextBatch();
/* 053 */ }
/* 054 */ while (scan_batch != null) {
/* 055 */ int numRows = scan_batch.numRows();
/* 056 */ while (scan_batchIdx < numRows) {
/* 057 */ int scan_rowIdx = scan_batchIdx++;
/* 058 */ // do aggregate
/* 059 */ // common sub-expressions
/* 060 */
/* 061 */ // evaluate aggregate function
/* 062 */ boolean agg_isNull1 = false;
/* 063 */
/* 064 */ long agg_value1 = -1L;
/* 065 */ agg_value1 = agg_bufValue + 1L;
/* 066 */ // update aggregation buffer
/* 067 */ agg_bufIsNull = false;
/* 068 */ agg_bufValue = agg_value1;
/* 069 */ if (shouldStop()) return;
/* 070 */ }
/* 071 */ scan_batch = null;
/* 072 */ scan_nextBatch();
/* 073 */ }
/* 074 */ scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
/* 075 */ scan_scanTime1 = 0;
/* 076 */
/* 077 */ }
/* 078 */
/* 079 */ private void scan_nextBatch() throws java.io.IOException {
/* 080 */ long getBatchStart = System.nanoTime();
/* 081 */ if (scan_input.hasNext()) {
/* 082 */ scan_batch =
(org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 083 */ scan_numOutputRows.add(scan_batch.numRows());
/* 084 */ scan_batchIdx = 0;
/* 085 */
/* 086 */ }
/* 087 */ scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 088 */ }
/* 089 */
/* 090 */ protected void processNext() throws java.io.IOException {
/* 091 */ while (!agg_initAgg) {
/* 092 */ agg_initAgg = true;
/* 093 */ long agg_beforeAgg = System.nanoTime();
/* 094 */ agg_doAggregateWithoutKey();
/* 095 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) /
1000000);
/* 096 */
/* 097 */ // output the result
/* 098 */
/* 099 */ agg_numOutputRows.add(1);
/* 100 */ agg_rowWriter.zeroOutNullBytes();
/* 101 */
/* 102 */ if (agg_bufIsNull) {
/* 103 */ agg_rowWriter.setNullAt(0);
/* 104 */ } else {
/* 105 */ agg_rowWriter.write(0, agg_bufValue);
/* 106 */ }
/* 107 */ append(agg_result);
/* 108 */ }
/* 109 */ }
/* 110 */ }
```
Generated code with this PR
```java
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private boolean agg_initAgg;
/* 009 */ private boolean agg_bufIsNull;
/* 010 */ private long agg_bufValue;
/* 011 */ private scala.collection.Iterator scan_input;
/* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric
scan_numOutputRows;
/* 013 */ private org.apache.spark.sql.execution.metric.SQLMetric
scan_scanTime;
/* 014 */ private long scan_scanTime1;
/* 015 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch
scan_batch;
/* 016 */ private int scan_batchIdx;
/* 017 */ private org.apache.spark.sql.execution.metric.SQLMetric
agg_numOutputRows;
/* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric
agg_aggTime;
/* 019 */ private UnsafeRow agg_result;
/* 020 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 021 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 022 */
/* 023 */ public GeneratedIterator(Object[] references) {
/* 024 */ this.references = references;
/* 025 */ }
/* 026 */
/* 027 */ public void init(int index, scala.collection.Iterator[] inputs)
{
/* 028 */ partitionIndex = index;
/* 029 */ this.inputs = inputs;
/* 030 */ agg_initAgg = false;
/* 031 */
/* 032 */ scan_input = inputs[0];
/* 033 */ this.scan_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 034 */ this.scan_scanTime =
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 035 */ scan_scanTime1 = 0;
/* 036 */ scan_batch = null;
/* 037 */ scan_batchIdx = 0;
/* 038 */ this.agg_numOutputRows =
(org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 039 */ this.agg_aggTime =
(org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 040 */ agg_result = new UnsafeRow(1);
/* 041 */ this.agg_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 042 */ this.agg_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder,
1);
/* 043 */
/* 044 */ }
/* 045 */
/* 046 */ private void agg_doAggregateWithoutKey() throws
java.io.IOException {
/* 047 */ // initialize aggregation buffer
/* 048 */ agg_bufIsNull = false;
/* 049 */ agg_bufValue = 0L;
/* 050 */
/* 051 */ if (scan_batch == null) {
/* 052 */ scan_nextBatch();
/* 053 */ }
/* 054 */ while (scan_batch != null) {
/* 055 */ int numRows = scan_batch.numRows();
/* 056 */ int scan_localEnd = numRows - scan_batchIdx;
/* 057 */ for (int scan_localIdx = 0; scan_localIdx < scan_localEnd;
scan_localIdx++) {
/* 058 */ int scan_rowIdx = scan_batchIdx + scan_localIdx;
/* 059 */ // do aggregate
/* 060 */ // common sub-expressions
/* 061 */
/* 062 */ // evaluate aggregate function
/* 063 */ boolean agg_isNull1 = false;
/* 064 */
/* 065 */ long agg_value1 = -1L;
/* 066 */ agg_value1 = agg_bufValue + 1L;
/* 067 */ // update aggregation buffer
/* 068 */ agg_bufIsNull = false;
/* 069 */ agg_bufValue = agg_value1;
/* 070 */ // shouldStop check is eliminated
/* 071 */ }
/* 072 */ scan_batchIdx = numRows;
/* 073 */ scan_batch = null;
/* 074 */ scan_nextBatch();
/* 075 */ }
/* 079 */ }
/* 080 */
/* 081 */ private void scan_nextBatch() throws java.io.IOException {
/* 082 */ long getBatchStart = System.nanoTime();
/* 083 */ if (scan_input.hasNext()) {
/* 084 */ scan_batch =
(org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 085 */ scan_numOutputRows.add(scan_batch.numRows());
/* 086 */ scan_batchIdx = 0;
/* 087 */
/* 088 */ }
/* 089 */ scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 090 */ }
/* 091 */
/* 092 */ protected void processNext() throws java.io.IOException {
/* 093 */ while (!agg_initAgg) {
/* 094 */ agg_initAgg = true;
/* 095 */ long agg_beforeAgg = System.nanoTime();
/* 096 */ agg_doAggregateWithoutKey();
/* 097 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) /
1000000);
/* 098 */
/* 099 */ // output the result
/* 100 */
/* 101 */ agg_numOutputRows.add(1);
/* 102 */ agg_rowWriter.zeroOutNullBytes();
/* 103 */
/* 104 */ if (agg_bufIsNull) {
/* 105 */ agg_rowWriter.setNullAt(0);
/* 106 */ } else {
/* 107 */ agg_rowWriter.write(0, agg_bufValue);
/* 108 */ }
/* 109 */ append(agg_result);
/* 110 */ }
/* 111 */ }
/* 112 */ }
```
## How was this patch tested?
Tested existing test suites
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kiszk/spark SPARK-20046
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/17378.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #17378
----
commit d74b6cf5fb63479040e940e5797e0b226367b227
Author: Kazuaki Ishizaki <[email protected]>
Date: 2017-03-21T19:23:38Z
initial commit
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]