sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r751632478
##########
File path: sql/core/benchmarks/DataSourceReadBenchmark-results.txt
##########
@@ -1,252 +1,275 @@
+================================================================================================
+SQL Single Boolean Column Scan
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+SQL Single BOOLEAN Column Scan: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+SQL CSV 13472 13878
574 1.2 856.5 1.0X
+SQL Json 10036 10477
623 1.6 638.0 1.3X
+SQL Parquet Vectorized 144 167
12 109.2 9.2 93.5X
+SQL Parquet MR 2224 2230
7 7.1 141.4 6.1X
+SQL ORC Vectorized 191 203
6 82.3 12.2 70.5X
+SQL ORC MR 1865 1870
7 8.4 118.6 7.2X
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Parquet Reader Single BOOLEAN Column Scan: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+-------------------------------------------------------------------------------------------------------------------------
+ParquetReader Vectorized 119 125
8 131.9 7.6 1.0X
+ParquetReader Vectorized -> Row 60 63
2 260.2 3.8 2.0X
+
+
================================================================================================
SQL Single Numeric Column Scan
================================================================================================
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
SQL Single TINYINT Column Scan: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-SQL CSV 15943 15956
18 1.0 1013.6 1.0X
-SQL Json 9109 9158
70 1.7 579.1 1.8X
-SQL Parquet Vectorized 168 191
16 93.8 10.7 95.1X
-SQL Parquet MR 1938 1950
17 8.1 123.2 8.2X
-SQL ORC Vectorized 191 199
6 82.2 12.2 83.3X
-SQL ORC MR 1523 1537
20 10.3 96.8 10.5X
-
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+SQL CSV 16820 16859
54 0.9 1069.4 1.0X
+SQL Json 11583 11586
4 1.4 736.4 1.5X
+SQL Parquet Vectorized 164 177
11 96.0 10.4 102.7X
+SQL Parquet MR 2839 2857
25 5.5 180.5 5.9X
+SQL ORC Vectorized 150 161
7 104.8 9.5 112.1X
+SQL ORC MR 1915 1923
12 8.2 121.7 8.8X
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
Parquet Reader Single TINYINT Column Scan: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------
-ParquetReader Vectorized 203 206
3 77.5 12.9 1.0X
-ParquetReader Vectorized -> Row 97 100
2 161.6 6.2 2.1X
+ParquetReader Vectorized 211 218
5 74.6 13.4 1.0X
+ParquetReader Vectorized -> Row 286 293
7 55.1 18.2 0.7X
Review comment:
Interesting, a similar pattern for TINYINT.
##########
File path: sql/core/benchmarks/DataSourceReadBenchmark-jdk11-results.txt
##########
@@ -1,252 +1,275 @@
+================================================================================================
+SQL Single Boolean Column Scan
+================================================================================================
+
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+SQL Single BOOLEAN Column Scan: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+SQL CSV 9748 9907
225 1.6 619.7 1.0X
+SQL Json 8466 8468
3 1.9 538.2 1.2X
+SQL Parquet Vectorized 124 149
21 127.2 7.9 78.8X
+SQL Parquet MR 2057 2071
20 7.6 130.8 4.7X
+SQL ORC Vectorized 183 232
40 86.1 11.6 53.3X
+SQL ORC MR 1517 1546
41 10.4 96.4 6.4X
+
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+Parquet Reader Single BOOLEAN Column Scan: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+-------------------------------------------------------------------------------------------------------------------------
+ParquetReader Vectorized 100 107
13 157.1 6.4 1.0X
+ParquetReader Vectorized -> Row 52 54
3 303.1 3.3 1.9X
+
+
================================================================================================
SQL Single Numeric Column Scan
================================================================================================
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
SQL Single TINYINT Column Scan: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-SQL CSV 13405 13422
24 1.2 852.3 1.0X
-SQL Json 10723 10788
92 1.5 681.7 1.3X
-SQL Parquet Vectorized 164 217
50 95.9 10.4 81.8X
-SQL Parquet MR 2349 2440
129 6.7 149.3 5.7X
-SQL ORC Vectorized 312 346
23 50.4 19.8 43.0X
-SQL ORC MR 1610 1659
69 9.8 102.4 8.3X
-
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+SQL CSV 11664 11685
30 1.3 741.6 1.0X
+SQL Json 9144 9154
14 1.7 581.3 1.3X
+SQL Parquet Vectorized 136 152
24 115.7 8.6 85.8X
+SQL Parquet MR 2157 2172
22 7.3 137.1 5.4X
+SQL ORC Vectorized 212 251
30 74.0 13.5 54.9X
+SQL ORC MR 1626 1628
3 9.7 103.4 7.2X
+
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Parquet Reader Single TINYINT Column Scan: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------
-ParquetReader Vectorized 187 209
20 84.3 11.9 1.0X
-ParquetReader Vectorized -> Row 89 95
5 177.6 5.6 2.1X
+ParquetReader Vectorized 183 192
10 85.8 11.7 1.0X
+ParquetReader Vectorized -> Row 93 97
9 169.9 5.9 2.0X
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
SQL Single SMALLINT Column Scan: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-SQL CSV 14214 14549
474 1.1 903.7 1.0X
-SQL Json 11866 11934
95 1.3 754.4 1.2X
-SQL Parquet Vectorized 294 342
53 53.6 18.7 48.4X
-SQL Parquet MR 2929 3004
107 5.4 186.2 4.9X
-SQL ORC Vectorized 312 328
15 50.4 19.8 45.5X
-SQL ORC MR 2037 2097
84 7.7 129.5 7.0X
-
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+SQL CSV 12278 12303
35 1.3 780.6 1.0X
+SQL Json 9534 9546
16 1.6 606.2 1.3X
+SQL Parquet Vectorized 167 205
32 93.9 10.6 73.3X
+SQL Parquet MR 2543 2564
30 6.2 161.7 4.8X
+SQL ORC Vectorized 217 265
32 72.6 13.8 56.7X
+SQL ORC MR 1832 1861
41 8.6 116.4 6.7X
+
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Parquet Reader Single SMALLINT Column Scan: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------
-ParquetReader Vectorized 249 266
18 63.1 15.8 1.0X
-ParquetReader Vectorized -> Row 192 247
36 82.1 12.2 1.3X
+ParquetReader Vectorized 230 238
9 68.3 14.7 1.0X
+ParquetReader Vectorized -> Row 238 276
16 66.1 15.1 1.0X
Review comment:
It looks like there is a regression when it comes to SMALLINT, huh?
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##########
@@ -53,20 +53,45 @@ public void skip() {
throw new UnsupportedOperationException();
}
+ private void updateCurrentByte() {
+ try {
+ currentByte = in.read();
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read a byte", e);
+ }
+ }
+
@Override
public final void readBooleans(int total, WritableColumnVector c, int rowId)
{
- // TODO: properly vectorize this
- for (int i = 0; i < total; i++) {
- c.putBoolean(rowId + i, readBoolean());
+ int i = 0;
+ if (bitOffset > 0) {
+ i = Math.min(8 - bitOffset, total);
+ c.putBooleans(rowId, i, currentByte, bitOffset);
+ bitOffset = (bitOffset + i) & 7;
+ }
+ for (; i + 7 < total; i += 8) {
+ updateCurrentByte();
+ c.putBooleans(rowId + i, currentByte);
+ }
+ if (i < total) {
+ updateCurrentByte();
+ bitOffset = total - i;
+ c.putBooleans(rowId + i, bitOffset, currentByte, 0);
}
}
@Override
public final void skipBooleans(int total) {
- // TODO: properly vectorize this
- for (int i = 0; i < total; i++) {
- readBoolean();
+ int totalByte = total / 8;
Review comment:
nit: `totalBytes`?
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
##########
@@ -119,31 +119,36 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
prepareTable(dir, spark.sql(s"SELECT CAST(value as ${dataType.sql}) id
FROM t1"))
+ val query = dataType match {
Review comment:
Why is this change required?
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##########
@@ -53,20 +53,45 @@ public void skip() {
throw new UnsupportedOperationException();
}
+ private void updateCurrentByte() {
+ try {
+ currentByte = in.read();
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read a byte", e);
+ }
+ }
+
@Override
public final void readBooleans(int total, WritableColumnVector c, int rowId)
{
- // TODO: properly vectorize this
- for (int i = 0; i < total; i++) {
- c.putBoolean(rowId + i, readBoolean());
+ int i = 0;
+ if (bitOffset > 0) {
+ i = Math.min(8 - bitOffset, total);
+ c.putBooleans(rowId, i, currentByte, bitOffset);
+ bitOffset = (bitOffset + i) & 7;
+ }
+ for (; i + 7 < total; i += 8) {
+ updateCurrentByte();
+ c.putBooleans(rowId + i, currentByte);
+ }
+ if (i < total) {
+ updateCurrentByte();
+ bitOffset = total - i;
+ c.putBooleans(rowId + i, bitOffset, currentByte, 0);
}
}
@Override
public final void skipBooleans(int total) {
- // TODO: properly vectorize this
- for (int i = 0; i < total; i++) {
- readBoolean();
+ int totalByte = total / 8;
+ if (totalByte > 0) {
+ try {
+ in.skipFully(totalByte - 1L);
+ currentByte = in.read();
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to skip bytes", e);
+ }
}
+ bitOffset = (bitOffset + total % 8) & 7;
Review comment:
I am not sure this is correct. I thought bitOffset should never exceed
8, i.e. the sum should not require modulo.
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##########
@@ -53,20 +53,45 @@ public void skip() {
throw new UnsupportedOperationException();
}
+ private void updateCurrentByte() {
+ try {
+ currentByte = in.read();
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read a byte", e);
+ }
+ }
+
@Override
public final void readBooleans(int total, WritableColumnVector c, int rowId)
{
- // TODO: properly vectorize this
- for (int i = 0; i < total; i++) {
- c.putBoolean(rowId + i, readBoolean());
+ int i = 0;
+ if (bitOffset > 0) {
+ i = Math.min(8 - bitOffset, total);
+ c.putBooleans(rowId, i, currentByte, bitOffset);
+ bitOffset = (bitOffset + i) & 7;
+ }
+ for (; i + 7 < total; i += 8) {
+ updateCurrentByte();
+ c.putBooleans(rowId + i, currentByte);
+ }
+ if (i < total) {
+ updateCurrentByte();
+ bitOffset = total - i;
+ c.putBooleans(rowId + i, bitOffset, currentByte, 0);
}
}
@Override
public final void skipBooleans(int total) {
- // TODO: properly vectorize this
- for (int i = 0; i < total; i++) {
- readBoolean();
+ int totalByte = total / 8;
+ if (totalByte > 0) {
+ try {
+ in.skipFully(totalByte - 1L);
Review comment:
Why -1? If you skip 9 values, then you need to skip 1 byte and load
another one to offset, isn't it?
Also, what happens when you read 7 values and then need to skip 11? How do
you skip the last value in a current byte? Do you need to adjust for the
current bit offset before calculating totalBytes and calling skipFully?
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
##########
@@ -179,6 +181,18 @@ public WritableColumnVector reserveDictionaryIds(int
capacity) {
*/
protected abstract void reserveInternal(int capacity);
+ /**
+ * Each byte of the returned value (long) has one bit from `bits`.
Review comment:
Please expand on Javadoc. Although you explained how the method works,
it is unclear what this method does, i.e. why we need it.
##########
File path:
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
##########
@@ -46,6 +47,7 @@
* WritableColumnVector are intended to be reused.
*/
public abstract class WritableColumnVector extends ColumnVector {
+ private byte[] byte8 = new byte[8];
Review comment:
Do you mean making it static so it is initialised once? IMHO, it is only
created once per WritableColumnVector.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]