sadikovi commented on a change in pull request #34611:
URL: https://github.com/apache/spark/pull/34611#discussion_r751632478



##########
File path: sql/core/benchmarks/DataSourceReadBenchmark-results.txt
##########
@@ -1,252 +1,275 @@
+================================================================================================
+SQL Single Boolean Column Scan
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+SQL Single BOOLEAN Column Scan:           Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+SQL CSV                                           13472          13878         
574          1.2         856.5       1.0X
+SQL Json                                          10036          10477         
623          1.6         638.0       1.3X
+SQL Parquet Vectorized                              144            167         
 12        109.2           9.2      93.5X
+SQL Parquet MR                                     2224           2230         
  7          7.1         141.4       6.1X
+SQL ORC Vectorized                                  191            203         
  6         82.3          12.2      70.5X
+SQL ORC MR                                         1865           1870         
  7          8.4         118.6       7.2X
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Parquet Reader Single BOOLEAN Column Scan:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+-------------------------------------------------------------------------------------------------------------------------
+ParquetReader Vectorized                             119            125        
   8        131.9           7.6       1.0X
+ParquetReader Vectorized -> Row                       60             63        
   2        260.2           3.8       2.0X
+
+
 
================================================================================================
 SQL Single Numeric Column Scan
 
================================================================================================
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
 SQL Single TINYINT Column Scan:           Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-SQL CSV                                           15943          15956         
 18          1.0        1013.6       1.0X
-SQL Json                                           9109           9158         
 70          1.7         579.1       1.8X
-SQL Parquet Vectorized                              168            191         
 16         93.8          10.7      95.1X
-SQL Parquet MR                                     1938           1950         
 17          8.1         123.2       8.2X
-SQL ORC Vectorized                                  191            199         
  6         82.2          12.2      83.3X
-SQL ORC MR                                         1523           1537         
 20         10.3          96.8      10.5X
-
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+SQL CSV                                           16820          16859         
 54          0.9        1069.4       1.0X
+SQL Json                                          11583          11586         
  4          1.4         736.4       1.5X
+SQL Parquet Vectorized                              164            177         
 11         96.0          10.4     102.7X
+SQL Parquet MR                                     2839           2857         
 25          5.5         180.5       5.9X
+SQL ORC Vectorized                                  150            161         
  7        104.8           9.5     112.1X
+SQL ORC MR                                         1915           1923         
 12          8.2         121.7       8.8X
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
 Parquet Reader Single TINYINT Column Scan:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------
-ParquetReader Vectorized                             203            206        
   3         77.5          12.9       1.0X
-ParquetReader Vectorized -> Row                       97            100        
   2        161.6           6.2       2.1X
+ParquetReader Vectorized                             211            218        
   5         74.6          13.4       1.0X
+ParquetReader Vectorized -> Row                      286            293        
   7         55.1          18.2       0.7X

Review comment:
       Interesting, a similar pattern for TINYINT.

##########
File path: sql/core/benchmarks/DataSourceReadBenchmark-jdk11-results.txt
##########
@@ -1,252 +1,275 @@
+================================================================================================
+SQL Single Boolean Column Scan
+================================================================================================
+
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+SQL Single BOOLEAN Column Scan:           Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+SQL CSV                                            9748           9907         
225          1.6         619.7       1.0X
+SQL Json                                           8466           8468         
  3          1.9         538.2       1.2X
+SQL Parquet Vectorized                              124            149         
 21        127.2           7.9      78.8X
+SQL Parquet MR                                     2057           2071         
 20          7.6         130.8       4.7X
+SQL ORC Vectorized                                  183            232         
 40         86.1          11.6      53.3X
+SQL ORC MR                                         1517           1546         
 41         10.4          96.4       6.4X
+
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+Parquet Reader Single BOOLEAN Column Scan:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+-------------------------------------------------------------------------------------------------------------------------
+ParquetReader Vectorized                             100            107        
  13        157.1           6.4       1.0X
+ParquetReader Vectorized -> Row                       52             54        
   3        303.1           3.3       1.9X
+
+
 
================================================================================================
 SQL Single Numeric Column Scan
 
================================================================================================
 
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 SQL Single TINYINT Column Scan:           Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-SQL CSV                                           13405          13422         
 24          1.2         852.3       1.0X
-SQL Json                                          10723          10788         
 92          1.5         681.7       1.3X
-SQL Parquet Vectorized                              164            217         
 50         95.9          10.4      81.8X
-SQL Parquet MR                                     2349           2440         
129          6.7         149.3       5.7X
-SQL ORC Vectorized                                  312            346         
 23         50.4          19.8      43.0X
-SQL ORC MR                                         1610           1659         
 69          9.8         102.4       8.3X
-
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+SQL CSV                                           11664          11685         
 30          1.3         741.6       1.0X
+SQL Json                                           9144           9154         
 14          1.7         581.3       1.3X
+SQL Parquet Vectorized                              136            152         
 24        115.7           8.6      85.8X
+SQL Parquet MR                                     2157           2172         
 22          7.3         137.1       5.4X
+SQL ORC Vectorized                                  212            251         
 30         74.0          13.5      54.9X
+SQL ORC MR                                         1626           1628         
  3          9.7         103.4       7.2X
+
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 Parquet Reader Single TINYINT Column Scan:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------
-ParquetReader Vectorized                             187            209        
  20         84.3          11.9       1.0X
-ParquetReader Vectorized -> Row                       89             95        
   5        177.6           5.6       2.1X
+ParquetReader Vectorized                             183            192        
  10         85.8          11.7       1.0X
+ParquetReader Vectorized -> Row                       93             97        
   9        169.9           5.9       2.0X
 
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 SQL Single SMALLINT Column Scan:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-SQL CSV                                           14214          14549         
474          1.1         903.7       1.0X
-SQL Json                                          11866          11934         
 95          1.3         754.4       1.2X
-SQL Parquet Vectorized                              294            342         
 53         53.6          18.7      48.4X
-SQL Parquet MR                                     2929           3004         
107          5.4         186.2       4.9X
-SQL ORC Vectorized                                  312            328         
 15         50.4          19.8      45.5X
-SQL ORC MR                                         2037           2097         
 84          7.7         129.5       7.0X
-
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+SQL CSV                                           12278          12303         
 35          1.3         780.6       1.0X
+SQL Json                                           9534           9546         
 16          1.6         606.2       1.3X
+SQL Parquet Vectorized                              167            205         
 32         93.9          10.6      73.3X
+SQL Parquet MR                                     2543           2564         
 30          6.2         161.7       4.8X
+SQL ORC Vectorized                                  217            265         
 32         72.6          13.8      56.7X
+SQL ORC MR                                         1832           1861         
 41          8.6         116.4       6.7X
+
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.11.0-1020-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 Parquet Reader Single SMALLINT Column Scan:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
--------------------------------------------------------------------------------------------------------------------------
-ParquetReader Vectorized                              249            266       
   18         63.1          15.8       1.0X
-ParquetReader Vectorized -> Row                       192            247       
   36         82.1          12.2       1.3X
+ParquetReader Vectorized                              230            238       
    9         68.3          14.7       1.0X
+ParquetReader Vectorized -> Row                       238            276       
   16         66.1          15.1       1.0X

Review comment:
       It looks like there is a regression when it comes to SMALLINT, huh?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##########
@@ -53,20 +53,45 @@ public void skip() {
     throw new UnsupportedOperationException();
   }
 
+  private void updateCurrentByte() {
+    try {
+      currentByte = in.read();
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Failed to read a byte", e);
+    }
+  }
+
   @Override
   public final void readBooleans(int total, WritableColumnVector c, int rowId) 
{
-    // TODO: properly vectorize this
-    for (int i = 0; i < total; i++) {
-      c.putBoolean(rowId + i, readBoolean());
+    int i = 0;
+    if (bitOffset > 0) {
+      i = Math.min(8 - bitOffset, total);
+      c.putBooleans(rowId, i, currentByte, bitOffset);
+      bitOffset = (bitOffset + i) & 7;
+    }
+    for (; i + 7 < total; i += 8) {
+      updateCurrentByte();
+      c.putBooleans(rowId + i, currentByte);
+    }
+    if (i < total) {
+      updateCurrentByte();
+      bitOffset = total - i;
+      c.putBooleans(rowId + i, bitOffset, currentByte, 0);
     }
   }
 
   @Override
   public final void skipBooleans(int total) {
-    // TODO: properly vectorize this
-    for (int i = 0; i < total; i++) {
-      readBoolean();
+    int totalByte = total / 8;

Review comment:
       nit: `totalBytes`?

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
##########
@@ -119,31 +119,36 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
 
         prepareTable(dir, spark.sql(s"SELECT CAST(value as ${dataType.sql}) id 
FROM t1"))
 
+        val query = dataType match {

Review comment:
       Why is this change required?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##########
@@ -53,20 +53,45 @@ public void skip() {
     throw new UnsupportedOperationException();
   }
 
+  private void updateCurrentByte() {
+    try {
+      currentByte = in.read();
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Failed to read a byte", e);
+    }
+  }
+
   @Override
   public final void readBooleans(int total, WritableColumnVector c, int rowId) 
{
-    // TODO: properly vectorize this
-    for (int i = 0; i < total; i++) {
-      c.putBoolean(rowId + i, readBoolean());
+    int i = 0;
+    if (bitOffset > 0) {
+      i = Math.min(8 - bitOffset, total);
+      c.putBooleans(rowId, i, currentByte, bitOffset);
+      bitOffset = (bitOffset + i) & 7;
+    }
+    for (; i + 7 < total; i += 8) {
+      updateCurrentByte();
+      c.putBooleans(rowId + i, currentByte);
+    }
+    if (i < total) {
+      updateCurrentByte();
+      bitOffset = total - i;
+      c.putBooleans(rowId + i, bitOffset, currentByte, 0);
     }
   }
 
   @Override
   public final void skipBooleans(int total) {
-    // TODO: properly vectorize this
-    for (int i = 0; i < total; i++) {
-      readBoolean();
+    int totalByte = total / 8;
+    if (totalByte > 0) {
+      try {
+        in.skipFully(totalByte - 1L);
+        currentByte = in.read();
+      } catch (IOException e) {
+        throw new ParquetDecodingException("Failed to skip bytes", e);
+      }
     }
+    bitOffset = (bitOffset + total % 8) & 7;

Review comment:
       I am not sure this is correct. I thought bitOffset should never exceed 
8, i.e. the sum should not require modulo.

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##########
@@ -53,20 +53,45 @@ public void skip() {
     throw new UnsupportedOperationException();
   }
 
+  private void updateCurrentByte() {
+    try {
+      currentByte = in.read();
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Failed to read a byte", e);
+    }
+  }
+
   @Override
   public final void readBooleans(int total, WritableColumnVector c, int rowId) 
{
-    // TODO: properly vectorize this
-    for (int i = 0; i < total; i++) {
-      c.putBoolean(rowId + i, readBoolean());
+    int i = 0;
+    if (bitOffset > 0) {
+      i = Math.min(8 - bitOffset, total);
+      c.putBooleans(rowId, i, currentByte, bitOffset);
+      bitOffset = (bitOffset + i) & 7;
+    }
+    for (; i + 7 < total; i += 8) {
+      updateCurrentByte();
+      c.putBooleans(rowId + i, currentByte);
+    }
+    if (i < total) {
+      updateCurrentByte();
+      bitOffset = total - i;
+      c.putBooleans(rowId + i, bitOffset, currentByte, 0);
     }
   }
 
   @Override
   public final void skipBooleans(int total) {
-    // TODO: properly vectorize this
-    for (int i = 0; i < total; i++) {
-      readBoolean();
+    int totalByte = total / 8;
+    if (totalByte > 0) {
+      try {
+        in.skipFully(totalByte - 1L);

Review comment:
       Why -1? If you skip 9 values, then you need to skip 1 byte and load 
another one to offset, isn't it?
   
   Also, what happens when you read 7 values and then need to skip 11? How do 
you skip the last value in a current byte? Do you need to adjust for the 
current bit offset before calculating totalBytes and calling skipFully?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
##########
@@ -179,6 +181,18 @@ public WritableColumnVector reserveDictionaryIds(int 
capacity) {
    */
   protected abstract void reserveInternal(int capacity);
 
+  /**
+   * Each byte of the returned value (long) has one bit from `bits`.

Review comment:
       Please expand on Javadoc. Although you explained how the method works, 
it is unclear what this method does, i.e. why we need it.

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
##########
@@ -46,6 +47,7 @@
  * WritableColumnVector are intended to be reused.
  */
 public abstract class WritableColumnVector extends ColumnVector {
+  private byte[] byte8 = new byte[8];

Review comment:
       Do you mean making it static so it is initialised once? IMHO, it is only 
created once per WritableColumnVector.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to