dongjoon-hyun closed pull request #23503: [SPARK-26584][SQL] Remove
`spark.sql.orc.copyBatchToSpark` internal conf
URL: https://github.com/apache/spark/pull/23503
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index fe445e0019353..9804af7dff179 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -524,13 +524,6 @@ object SQLConf {
.intConf
.createWithDefault(4096)
- val ORC_COPY_BATCH_TO_SPARK = buildConf("spark.sql.orc.copyBatchToSpark")
- .doc("Whether or not to copy the ORC columnar batch to Spark columnar
batch in the " +
- "vectorized ORC reader.")
- .internal()
- .booleanConf
- .createWithDefault(false)
-
val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.booleanConf
diff --git a/sql/core/benchmarks/DataSourceReadBenchmark-results.txt
b/sql/core/benchmarks/DataSourceReadBenchmark-results.txt
index b07e8b1197ff0..f547f61654b5f 100644
--- a/sql/core/benchmarks/DataSourceReadBenchmark-results.txt
+++ b/sql/core/benchmarks/DataSourceReadBenchmark-results.txt
@@ -11,7 +11,6 @@ SQL Json 8709 / 8724
1.8 5
SQL Parquet Vectorized 166 / 187 94.8
10.5 159.0X
SQL Parquet MR 1706 / 1720 9.2
108.4 15.5X
SQL ORC Vectorized 167 / 174 94.2
10.6 157.9X
-SQL ORC Vectorized with copy 226 / 231 69.6
14.4 116.7X
SQL ORC MR 1433 / 1465 11.0
91.1 18.4X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -30,7 +29,6 @@ SQL Json 8990 / 8998
1.7 5
SQL Parquet Vectorized 209 / 221 75.1
13.3 126.5X
SQL Parquet MR 1949 / 1949 8.1
123.9 13.6X
SQL ORC Vectorized 221 / 228 71.3
14.0 120.1X
-SQL ORC Vectorized with copy 315 / 319 49.9
20.1 84.0X
SQL ORC MR 1527 / 1549 10.3
97.1 17.3X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -49,7 +47,6 @@ SQL Json 9703 / 9733
1.6 6
SQL Parquet Vectorized 176 / 182 89.2
11.2 157.0X
SQL Parquet MR 2164 / 2173 7.3
137.6 12.8X
SQL ORC Vectorized 307 / 314 51.2
19.5 90.2X
-SQL ORC Vectorized with copy 312 / 319 50.4
19.8 88.7X
SQL ORC MR 1690 / 1700 9.3
107.4 16.4X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -68,7 +65,6 @@ SQL Json 12570 / 12617
1.3 7
SQL Parquet Vectorized 270 / 308 58.2
17.2 128.9X
SQL Parquet MR 2427 / 2431 6.5
154.3 14.3X
SQL ORC Vectorized 388 / 398 40.6
24.6 89.8X
-SQL ORC Vectorized with copy 395 / 402 39.9
25.1 88.2X
SQL ORC MR 1819 / 1851 8.6
115.7 19.1X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -87,7 +83,6 @@ SQL Json 12039 / 12215
1.3 7
SQL Parquet Vectorized 170 / 177 92.4
10.8 169.0X
SQL Parquet MR 2184 / 2196 7.2
138.9 13.2X
SQL ORC Vectorized 432 / 440 36.4
27.5 66.5X
-SQL ORC Vectorized with copy 439 / 442 35.9
27.9 65.6X
SQL ORC MR 1812 / 1833 8.7
115.2 15.9X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -106,7 +101,6 @@ SQL Json 18895 / 18898
0.8 12
SQL Parquet Vectorized 267 / 276 58.9
17.0 135.6X
SQL Parquet MR 2355 / 2363 6.7
149.7 15.4X
SQL ORC Vectorized 543 / 546 29.0
34.5 66.6X
-SQL ORC Vectorized with copy 548 / 557 28.7
34.8 66.0X
SQL ORC MR 2246 / 2258 7.0
142.8 16.1X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -130,7 +124,6 @@ SQL Json 12145 / 12174
0.9 11
SQL Parquet Vectorized 2363 / 2377 4.4
225.3 8.9X
SQL Parquet MR 4555 / 4557 2.3
434.4 4.6X
SQL ORC Vectorized 2361 / 2388 4.4
225.1 9.0X
-SQL ORC Vectorized with copy 2540 / 2557 4.1
242.2 8.3X
SQL ORC MR 4186 / 4209 2.5
399.2 5.0X
@@ -147,7 +140,6 @@ SQL Json 7025 / 7025
1.5 6
SQL Parquet Vectorized 803 / 821 13.1
76.6 14.6X
SQL Parquet MR 1776 / 1790 5.9
169.4 6.6X
SQL ORC Vectorized 491 / 494 21.4
46.8 23.8X
-SQL ORC Vectorized with copy 723 / 725 14.5
68.9 16.2X
SQL ORC MR 2050 / 2063 5.1
195.5 5.7X
@@ -164,21 +156,18 @@ Data column - Json 12876 / 12882
1.2 8
Data column - Parquet Vectorized 277 / 282 56.7
17.6 111.6X
Data column - Parquet MR 3398 / 3402 4.6
216.0 9.1X
Data column - ORC Vectorized 399 / 407 39.4
25.4 77.5X
-Data column - ORC Vectorized with copy 407 / 447 38.6
25.9 76.0X
Data column - ORC MR 2583 / 2589 6.1
164.2 12.0X
Partition column - CSV 7403 / 7427 2.1
470.7 4.2X
Partition column - Json 5587 / 5625 2.8
355.2 5.5X
Partition column - Parquet Vectorized 71 / 78 222.6
4.5 438.3X
Partition column - Parquet MR 1798 / 1808 8.7
114.3 17.2X
Partition column - ORC Vectorized 72 / 75 219.0
4.6 431.2X
-Partition column - ORC Vectorized with copy 71 / 77 221.1
4.5 435.4X
Partition column - ORC MR 1772 / 1778 8.9
112.6 17.5X
Both columns - CSV 30211 / 30212 0.5
1920.7 1.0X
Both columns - Json 13382 / 13391 1.2
850.8 2.3X
Both columns - Parquet Vectorized 321 / 333 49.0
20.4 96.4X
Both columns - Parquet MR 3656 / 3661 4.3
232.4 8.5X
Both columns - ORC Vectorized 443 / 448 35.5
28.2 69.9X
-Both column - ORC Vectorized with copy 527 / 533 29.9
33.5 58.8X
Both columns - ORC MR 2626 / 2633 6.0
167.0 11.8X
@@ -196,7 +185,6 @@ SQL Parquet Vectorized 1563 / 1564
6.7 1
SQL Parquet MR 3835 / 3836 2.7
365.8 3.6X
ParquetReader Vectorized 1115 / 1118 9.4
106.4 12.5X
SQL ORC Vectorized 1172 / 1208 8.9
111.8 11.9X
-SQL ORC Vectorized with copy 1630 / 1644 6.4
155.5 8.5X
SQL ORC MR 3708 / 3711 2.8
353.6 3.8X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -209,7 +197,6 @@ SQL Parquet Vectorized 1103 / 1112
9.5 1
SQL Parquet MR 2841 / 2847 3.7
271.0 4.9X
ParquetReader Vectorized 992 / 1012 10.6
94.6 14.1X
SQL ORC Vectorized 1275 / 1349 8.2
121.6 11.0X
-SQL ORC Vectorized with copy 1631 / 1644 6.4
155.5 8.6X
SQL ORC MR 3244 / 3259 3.2
309.3 4.3X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -222,7 +209,6 @@ SQL Parquet Vectorized 238 / 242
44.1
SQL Parquet MR 1730 / 1734 6.1
165.0 6.5X
ParquetReader Vectorized 237 / 238 44.3
22.6 47.4X
SQL ORC Vectorized 459 / 462 22.8
43.8 24.4X
-SQL ORC Vectorized with copy 581 / 583 18.1
55.4 19.3X
SQL ORC MR 1767 / 1783 5.9
168.5 6.4X
@@ -239,7 +225,6 @@ SQL Json 2808 / 2843
0.4 26
SQL Parquet Vectorized 56 / 63 18.9
52.9 59.8X
SQL Parquet MR 215 / 219 4.9
205.4 15.4X
SQL ORC Vectorized 64 / 76 16.4
60.9 52.0X
-SQL ORC Vectorized with copy 64 / 67 16.3
61.3 51.7X
SQL ORC MR 314 / 316 3.3
299.6 10.6X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -251,7 +236,6 @@ SQL Json 10294 / 10325
0.1 98
SQL Parquet Vectorized 72 / 85 14.5
69.0 110.3X
SQL Parquet MR 237 / 241 4.4
226.4 33.6X
SQL ORC Vectorized 82 / 92 12.7
78.5 97.0X
-SQL ORC Vectorized with copy 82 / 88 12.7
78.5 97.0X
SQL ORC MR 900 / 909 1.2
858.5 8.9X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -263,7 +247,6 @@ SQL Json 18813 / 18827
0.1 179
SQL Parquet Vectorized 107 / 111 9.8
101.8 126.3X
SQL Parquet MR 275 / 286 3.8
262.3 49.0X
SQL ORC Vectorized 107 / 115 9.8
101.7 126.4X
-SQL ORC Vectorized with copy 107 / 115 9.8
102.3 125.8X
SQL ORC MR 1659 / 1664 0.6
1582.3 8.1X
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
index 7dc90df05a8fe..efca96e9ce580 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources.orc;
import java.io.IOException;
-import java.util.stream.IntStream;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
@@ -31,16 +30,11 @@
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcInputFormat;
-import org.apache.orc.storage.common.type.HiveDecimal;
import org.apache.orc.storage.ql.exec.vector.*;
-import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
-import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
-import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
-import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnarBatch;
@@ -77,21 +71,10 @@
@VisibleForTesting
public ColumnarBatch columnarBatch;
- // Writable column vectors of the result columnar batch.
- private WritableColumnVector[] columnVectors;
-
- // The wrapped ORC column vectors. It should be null if `copyToSpark` is
true.
+ // The wrapped ORC column vectors.
private org.apache.spark.sql.vectorized.ColumnVector[] orcVectorWrappers;
- // The memory mode of the columnarBatch
- private final MemoryMode MEMORY_MODE;
-
- // Whether or not to copy the ORC columnar batch to Spark columnar batch.
- private final boolean copyToSpark;
-
- public OrcColumnarBatchReader(boolean useOffHeap, boolean copyToSpark, int
capacity) {
- MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
- this.copyToSpark = copyToSpark;
+ public OrcColumnarBatchReader(int capacity) {
this.capacity = capacity;
}
@@ -177,53 +160,32 @@ public void initBatch(
this.requestedDataColIds = requestedDataColIds;
StructType resultSchema = new StructType(requiredFields);
- if (copyToSpark) {
- if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
- columnVectors = OffHeapColumnVector.allocateColumns(capacity,
resultSchema);
- } else {
- columnVectors = OnHeapColumnVector.allocateColumns(capacity,
resultSchema);
- }
-
- // Initialize the partition columns and missing columns once.
- for (int i = 0; i < requiredFields.length; i++) {
- if (requestedPartitionColIds[i] != -1) {
- ColumnVectorUtils.populate(columnVectors[i],
- partitionValues, requestedPartitionColIds[i]);
- columnVectors[i].setIsConstant();
- } else if (requestedDataColIds[i] == -1) {
- columnVectors[i].putNulls(0, capacity);
- columnVectors[i].setIsConstant();
- }
- }
- columnarBatch = new ColumnarBatch(columnVectors);
- } else {
- // Just wrap the ORC column vector instead of copying it to Spark column
vector.
- orcVectorWrappers = new
org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()];
+ // Just wrap the ORC column vector instead of copying it to Spark column
vector.
+ orcVectorWrappers = new
org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()];
- for (int i = 0; i < requiredFields.length; i++) {
- DataType dt = requiredFields[i].dataType();
- if (requestedPartitionColIds[i] != -1) {
- OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity,
dt);
- ColumnVectorUtils.populate(partitionCol, partitionValues,
requestedPartitionColIds[i]);
- partitionCol.setIsConstant();
- orcVectorWrappers[i] = partitionCol;
+ for (int i = 0; i < requiredFields.length; i++) {
+ DataType dt = requiredFields[i].dataType();
+ if (requestedPartitionColIds[i] != -1) {
+ OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt);
+ ColumnVectorUtils.populate(partitionCol, partitionValues,
requestedPartitionColIds[i]);
+ partitionCol.setIsConstant();
+ orcVectorWrappers[i] = partitionCol;
+ } else {
+ int colId = requestedDataColIds[i];
+ // Initialize the missing columns once.
+ if (colId == -1) {
+ OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
+ missingCol.putNulls(0, capacity);
+ missingCol.setIsConstant();
+ orcVectorWrappers[i] = missingCol;
} else {
- int colId = requestedDataColIds[i];
- // Initialize the missing columns once.
- if (colId == -1) {
- OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity,
dt);
- missingCol.putNulls(0, capacity);
- missingCol.setIsConstant();
- orcVectorWrappers[i] = missingCol;
- } else {
- orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]);
- }
+ orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]);
}
}
-
- columnarBatch = new ColumnarBatch(orcVectorWrappers);
}
+
+ columnarBatch = new ColumnarBatch(orcVectorWrappers);
}
/**
@@ -238,325 +200,11 @@ private boolean nextBatch() throws IOException {
}
columnarBatch.setNumRows(batchSize);
- if (!copyToSpark) {
- for (int i = 0; i < requiredFields.length; i++) {
- if (requestedDataColIds[i] != -1) {
- ((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize);
- }
- }
- return true;
- }
-
- for (WritableColumnVector vector : columnVectors) {
- vector.reset();
- }
-
for (int i = 0; i < requiredFields.length; i++) {
- StructField field = requiredFields[i];
- WritableColumnVector toColumn = columnVectors[i];
-
- if (requestedDataColIds[i] >= 0) {
- ColumnVector fromColumn = batch.cols[requestedDataColIds[i]];
-
- if (fromColumn.isRepeating) {
- putRepeatingValues(batchSize, field, fromColumn, toColumn);
- } else if (fromColumn.noNulls) {
- putNonNullValues(batchSize, field, fromColumn, toColumn);
- } else {
- putValues(batchSize, field, fromColumn, toColumn);
- }
+ if (requestedDataColIds[i] != -1) {
+ ((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize);
}
}
return true;
}
-
- private void putRepeatingValues(
- int batchSize,
- StructField field,
- ColumnVector fromColumn,
- WritableColumnVector toColumn) {
- if (fromColumn.isNull[0]) {
- toColumn.putNulls(0, batchSize);
- } else {
- DataType type = field.dataType();
- if (type instanceof BooleanType) {
- toColumn.putBooleans(0, batchSize,
((LongColumnVector)fromColumn).vector[0] == 1);
- } else if (type instanceof ByteType) {
- toColumn.putBytes(0, batchSize,
(byte)((LongColumnVector)fromColumn).vector[0]);
- } else if (type instanceof ShortType) {
- toColumn.putShorts(0, batchSize,
(short)((LongColumnVector)fromColumn).vector[0]);
- } else if (type instanceof IntegerType || type instanceof DateType) {
- toColumn.putInts(0, batchSize,
(int)((LongColumnVector)fromColumn).vector[0]);
- } else if (type instanceof LongType) {
- toColumn.putLongs(0, batchSize,
((LongColumnVector)fromColumn).vector[0]);
- } else if (type instanceof TimestampType) {
- toColumn.putLongs(0, batchSize,
- fromTimestampColumnVector((TimestampColumnVector)fromColumn, 0));
- } else if (type instanceof FloatType) {
- toColumn.putFloats(0, batchSize,
(float)((DoubleColumnVector)fromColumn).vector[0]);
- } else if (type instanceof DoubleType) {
- toColumn.putDoubles(0, batchSize,
((DoubleColumnVector)fromColumn).vector[0]);
- } else if (type instanceof StringType || type instanceof BinaryType) {
- BytesColumnVector data = (BytesColumnVector)fromColumn;
- int size = data.vector[0].length;
- toColumn.arrayData().reserve(size);
- toColumn.arrayData().putBytes(0, size, data.vector[0], 0);
- for (int index = 0; index < batchSize; index++) {
- toColumn.putArray(index, 0, size);
- }
- } else if (type instanceof DecimalType) {
- DecimalType decimalType = (DecimalType)type;
- putDecimalWritables(
- toColumn,
- batchSize,
- decimalType.precision(),
- decimalType.scale(),
- ((DecimalColumnVector)fromColumn).vector[0]);
- } else {
- throw new UnsupportedOperationException("Unsupported Data Type: " +
type);
- }
- }
- }
-
- private void putNonNullValues(
- int batchSize,
- StructField field,
- ColumnVector fromColumn,
- WritableColumnVector toColumn) {
- DataType type = field.dataType();
- if (type instanceof BooleanType) {
- long[] data = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- toColumn.putBoolean(index, data[index] == 1);
- }
- } else if (type instanceof ByteType) {
- long[] data = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- toColumn.putByte(index, (byte)data[index]);
- }
- } else if (type instanceof ShortType) {
- long[] data = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- toColumn.putShort(index, (short)data[index]);
- }
- } else if (type instanceof IntegerType || type instanceof DateType) {
- long[] data = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- toColumn.putInt(index, (int)data[index]);
- }
- } else if (type instanceof LongType) {
- toColumn.putLongs(0, batchSize, ((LongColumnVector)fromColumn).vector,
0);
- } else if (type instanceof TimestampType) {
- TimestampColumnVector data = ((TimestampColumnVector)fromColumn);
- for (int index = 0; index < batchSize; index++) {
- toColumn.putLong(index, fromTimestampColumnVector(data, index));
- }
- } else if (type instanceof FloatType) {
- double[] data = ((DoubleColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- toColumn.putFloat(index, (float)data[index]);
- }
- } else if (type instanceof DoubleType) {
- toColumn.putDoubles(0, batchSize,
((DoubleColumnVector)fromColumn).vector, 0);
- } else if (type instanceof StringType || type instanceof BinaryType) {
- BytesColumnVector data = ((BytesColumnVector)fromColumn);
- WritableColumnVector arrayData = toColumn.arrayData();
- int totalNumBytes = IntStream.of(data.length).sum();
- arrayData.reserve(totalNumBytes);
- for (int index = 0, pos = 0; index < batchSize; pos +=
data.length[index], index++) {
- arrayData.putBytes(pos, data.length[index], data.vector[index],
data.start[index]);
- toColumn.putArray(index, pos, data.length[index]);
- }
- } else if (type instanceof DecimalType) {
- DecimalType decimalType = (DecimalType)type;
- DecimalColumnVector data = ((DecimalColumnVector)fromColumn);
- if (decimalType.precision() > Decimal.MAX_LONG_DIGITS()) {
- toColumn.arrayData().reserve(batchSize * 16);
- }
- for (int index = 0; index < batchSize; index++) {
- putDecimalWritable(
- toColumn,
- index,
- decimalType.precision(),
- decimalType.scale(),
- data.vector[index]);
- }
- } else {
- throw new UnsupportedOperationException("Unsupported Data Type: " +
type);
- }
- }
-
- private void putValues(
- int batchSize,
- StructField field,
- ColumnVector fromColumn,
- WritableColumnVector toColumn) {
- DataType type = field.dataType();
- if (type instanceof BooleanType) {
- long[] vector = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putBoolean(index, vector[index] == 1);
- }
- }
- } else if (type instanceof ByteType) {
- long[] vector = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putByte(index, (byte)vector[index]);
- }
- }
- } else if (type instanceof ShortType) {
- long[] vector = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putShort(index, (short)vector[index]);
- }
- }
- } else if (type instanceof IntegerType || type instanceof DateType) {
- long[] vector = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putInt(index, (int)vector[index]);
- }
- }
- } else if (type instanceof LongType) {
- long[] vector = ((LongColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putLong(index, vector[index]);
- }
- }
- } else if (type instanceof TimestampType) {
- TimestampColumnVector vector = ((TimestampColumnVector)fromColumn);
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putLong(index, fromTimestampColumnVector(vector, index));
- }
- }
- } else if (type instanceof FloatType) {
- double[] vector = ((DoubleColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putFloat(index, (float)vector[index]);
- }
- }
- } else if (type instanceof DoubleType) {
- double[] vector = ((DoubleColumnVector)fromColumn).vector;
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- toColumn.putDouble(index, vector[index]);
- }
- }
- } else if (type instanceof StringType || type instanceof BinaryType) {
- BytesColumnVector vector = (BytesColumnVector)fromColumn;
- WritableColumnVector arrayData = toColumn.arrayData();
- int totalNumBytes = IntStream.of(vector.length).sum();
- arrayData.reserve(totalNumBytes);
- for (int index = 0, pos = 0; index < batchSize; pos +=
vector.length[index], index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- arrayData.putBytes(pos, vector.length[index], vector.vector[index],
vector.start[index]);
- toColumn.putArray(index, pos, vector.length[index]);
- }
- }
- } else if (type instanceof DecimalType) {
- DecimalType decimalType = (DecimalType)type;
- HiveDecimalWritable[] vector = ((DecimalColumnVector)fromColumn).vector;
- if (decimalType.precision() > Decimal.MAX_LONG_DIGITS()) {
- toColumn.arrayData().reserve(batchSize * 16);
- }
- for (int index = 0; index < batchSize; index++) {
- if (fromColumn.isNull[index]) {
- toColumn.putNull(index);
- } else {
- putDecimalWritable(
- toColumn,
- index,
- decimalType.precision(),
- decimalType.scale(),
- vector[index]);
- }
- }
- } else {
- throw new UnsupportedOperationException("Unsupported Data Type: " +
type);
- }
- }
-
- /**
- * Returns the number of micros since epoch from an element of
TimestampColumnVector.
- */
- private static long fromTimestampColumnVector(TimestampColumnVector vector,
int index) {
- return vector.time[index] * 1000 + (vector.nanos[index] / 1000 % 1000);
- }
-
- /**
- * Put a `HiveDecimalWritable` to a `WritableColumnVector`.
- */
- private static void putDecimalWritable(
- WritableColumnVector toColumn,
- int index,
- int precision,
- int scale,
- HiveDecimalWritable decimalWritable) {
- HiveDecimal decimal = decimalWritable.getHiveDecimal();
- Decimal value =
- Decimal.apply(decimal.bigDecimalValue(), decimal.precision(),
decimal.scale());
- value.changePrecision(precision, scale);
-
- if (precision <= Decimal.MAX_INT_DIGITS()) {
- toColumn.putInt(index, (int) value.toUnscaledLong());
- } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
- toColumn.putLong(index, value.toUnscaledLong());
- } else {
- byte[] bytes = value.toJavaBigDecimal().unscaledValue().toByteArray();
- toColumn.arrayData().putBytes(index * 16, bytes.length, bytes, 0);
- toColumn.putArray(index, index * 16, bytes.length);
- }
- }
-
- /**
- * Put `HiveDecimalWritable`s to a `WritableColumnVector`.
- */
- private static void putDecimalWritables(
- WritableColumnVector toColumn,
- int size,
- int precision,
- int scale,
- HiveDecimalWritable decimalWritable) {
- HiveDecimal decimal = decimalWritable.getHiveDecimal();
- Decimal value =
- Decimal.apply(decimal.bigDecimalValue(), decimal.precision(),
decimal.scale());
- value.changePrecision(precision, scale);
-
- if (precision <= Decimal.MAX_INT_DIGITS()) {
- toColumn.putInts(0, size, (int) value.toUnscaledLong());
- } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
- toColumn.putLongs(0, size, value.toUnscaledLong());
- } else {
- byte[] bytes = value.toJavaBigDecimal().unscaledValue().toByteArray();
- toColumn.arrayData().reserve(bytes.length);
- toColumn.arrayData().putBytes(0, bytes.length, bytes, 0);
- for (int index = 0; index < size; index++) {
- toColumn.putArray(index, 0, bytes.length);
- }
- }
- }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index cd10ad21cd820..14779cdba4178 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
@@ -162,10 +161,8 @@ class OrcFileFormat
val resultSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
val sqlConf = sparkSession.sessionState.conf
- val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
val capacity = sqlConf.orcVectorizedReaderBatchSize
- val copyToSpark =
sparkSession.sessionState.conf.getConf(SQLConf.ORC_COPY_BATCH_TO_SPARK)
val broadcastedConf =
sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
@@ -197,10 +194,8 @@ class OrcFileFormat
val attemptId = new TaskAttemptID(new TaskID(new JobID(),
TaskType.MAP, 0), 0)
val taskAttemptContext = new TaskAttemptContextImpl(taskConf,
attemptId)
- val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
- val batchReader = new OrcColumnarBatchReader(
- enableOffHeapColumnVector && taskContext.isDefined, copyToSpark,
capacity)
+ val batchReader = new OrcColumnarBatchReader(capacity)
// SPARK-23399 Register a task completion listener first to call
`close()` in all cases.
// There is a possibility that `initialize` and `initBatch` hit some
errors (like OOM)
// after opening a file.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
index ecd9ead0ae39a..aca7081c0d3d8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -56,7 +56,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with
SQLHelper {
// Set default configs. Individual cases will change them if necessary.
spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
- spark.conf.set(SQLConf.ORC_COPY_BATCH_TO_SPARK.key, "false")
spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "true")
spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
@@ -139,12 +138,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with
SQLHelper {
spark.sql("SELECT sum(id) FROM orcTable").collect()
}
- sqlBenchmark.addCase("SQL ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(id) FROM orcTable").collect()
- }
- }
-
sqlBenchmark.addCase("SQL ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("SELECT sum(id) FROM orcTable").collect()
@@ -261,12 +254,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with
SQLHelper {
spark.sql("SELECT sum(c1), sum(length(c2)) FROM orcTable").collect()
}
- benchmark.addCase("SQL ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(c1), sum(length(c2)) FROM
orcTable").collect()
- }
- }
-
benchmark.addCase("SQL ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("SELECT sum(c1), sum(length(c2)) FROM
orcTable").collect()
@@ -312,12 +299,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with
SQLHelper {
spark.sql("select sum(length(c1)) from orcTable").collect()
}
- benchmark.addCase("SQL ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("select sum(length(c1)) from orcTable").collect()
- }
- }
-
benchmark.addCase("SQL ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("select sum(length(c1)) from orcTable").collect()
@@ -361,12 +342,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with
SQLHelper {
spark.sql("SELECT sum(id) FROM orcTable").collect()
}
- benchmark.addCase("Data column - ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(id) FROM orcTable").collect()
- }
- }
-
benchmark.addCase("Data column - ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("SELECT sum(id) FROM orcTable").collect()
@@ -395,12 +370,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with
SQLHelper {
spark.sql("SELECT sum(p) FROM orcTable").collect()
}
- benchmark.addCase("Partition column - ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(p) FROM orcTable").collect()
- }
- }
-
benchmark.addCase("Partition column - ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("SELECT sum(p) FROM orcTable").collect()
@@ -429,12 +398,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with
SQLHelper {
spark.sql("SELECT sum(p), sum(id) FROM orcTable").collect()
}
- benchmark.addCase("Both column - ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(p), sum(id) FROM orcTable").collect()
- }
- }
-
benchmark.addCase("Both columns - ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("SELECT sum(p), sum(id) FROM orcTable").collect()
@@ -513,13 +476,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with
SQLHelper {
"WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
}
- benchmark.addCase("SQL ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT SUM(LENGTH(c2)) FROM orcTable " +
- "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
- }
- }
-
benchmark.addCase("SQL ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("SELECT SUM(LENGTH(c2)) FROM orcTable " +
@@ -570,12 +526,6 @@ object DataSourceReadBenchmark extends BenchmarkBase with
SQLHelper {
spark.sql(s"SELECT sum(c$middle) FROM orcTable").collect()
}
- benchmark.addCase("SQL ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql(s"SELECT sum(c$middle) FROM orcTable").collect()
- }
- }
-
benchmark.addCase("SQL ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql(s"SELECT sum(c$middle) FROM orcTable").collect()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
index 52abeb20e7f25..c16fcc67f8dd1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
@@ -43,7 +43,7 @@ class OrcColumnarBatchReaderSuite extends QueryTest with
SQLTestUtils with Share
requestedDataColIds: Array[Int],
requestedPartitionColIds: Array[Int],
resultFields: Array[StructField]): OrcColumnarBatchReader = {
- val reader = new OrcColumnarBatchReader(false, false, 4096)
+ val reader = new OrcColumnarBatchReader(4096)
reader.initBatch(
orcFileSchema,
resultFields,
diff --git a/sql/hive/benchmarks/OrcReadBenchmark-results.txt
b/sql/hive/benchmarks/OrcReadBenchmark-results.txt
index 80c2f5e93405a..caa78b9a8f102 100644
--- a/sql/hive/benchmarks/OrcReadBenchmark-results.txt
+++ b/sql/hive/benchmarks/OrcReadBenchmark-results.txt
@@ -8,7 +8,6 @@ SQL Single TINYINT Column Scan: Best/Avg Time(ms)
Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1725 / 1759 9.1
109.7 1.0X
Native ORC Vectorized 272 / 316 57.8
17.3 6.3X
-Native ORC Vectorized with copy 239 / 254 65.7
15.2 7.2X
Hive built-in ORC 1970 / 1987 8.0
125.3 0.9X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -17,7 +16,6 @@ SQL Single SMALLINT Column Scan: Best/Avg Time(ms)
Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1633 / 1672 9.6
103.8 1.0X
Native ORC Vectorized 238 / 255 66.0
15.1 6.9X
-Native ORC Vectorized with copy 235 / 253 66.8
15.0 6.9X
Hive built-in ORC 2293 / 2305 6.9
145.8 0.7X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -26,7 +24,6 @@ SQL Single INT Column Scan: Best/Avg Time(ms)
Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1677 / 1699 9.4
106.6 1.0X
Native ORC Vectorized 325 / 342 48.3
20.7 5.2X
-Native ORC Vectorized with copy 328 / 341 47.9
20.9 5.1X
Hive built-in ORC 2561 / 2569 6.1
162.8 0.7X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -35,7 +32,6 @@ SQL Single BIGINT Column Scan: Best/Avg Time(ms)
Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1791 / 1795 8.8
113.9 1.0X
Native ORC Vectorized 400 / 408 39.3
25.4 4.5X
-Native ORC Vectorized with copy 410 / 417 38.4
26.1 4.4X
Hive built-in ORC 2713 / 2720 5.8
172.5 0.7X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -44,7 +40,6 @@ SQL Single FLOAT Column Scan: Best/Avg Time(ms)
Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1791 / 1805 8.8
113.8 1.0X
Native ORC Vectorized 433 / 438 36.3
27.5 4.1X
-Native ORC Vectorized with copy 441 / 447 35.7
28.0 4.1X
Hive built-in ORC 2690 / 2803 5.8
171.0 0.7X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -53,7 +48,6 @@ SQL Single DOUBLE Column Scan: Best/Avg Time(ms)
Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1911 / 1930 8.2
121.5 1.0X
Native ORC Vectorized 543 / 552 29.0
34.5 3.5X
-Native ORC Vectorized with copy 547 / 555 28.8
34.8 3.5X
Hive built-in ORC 2967 / 3065 5.3
188.6 0.6X
@@ -67,7 +61,6 @@ Int and String Scan: Best/Avg Time(ms)
Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 4160 / 4188 2.5
396.7 1.0X
Native ORC Vectorized 2405 / 2406 4.4
229.4 1.7X
-Native ORC Vectorized with copy 2588 / 2592 4.1
246.8 1.6X
Hive built-in ORC 5514 / 5562 1.9
525.9 0.8X
@@ -81,15 +74,12 @@ Partitioned Table: Best/Avg Time(ms)
Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Data column - Native ORC MR 1863 / 1867 8.4
118.4 1.0X
Data column - Native ORC Vectorized 411 / 418 38.2
26.2 4.5X
-Data column - Native ORC Vectorized with copy 417 / 422 37.8
26.5 4.5X
Data column - Hive built-in ORC 3297 / 3308 4.8
209.6 0.6X
Partition column - Native ORC MR 1505 / 1506 10.4
95.7 1.2X
Partition column - Native ORC Vectorized 80 / 93 195.6
5.1 23.2X
-Partition column - Native ORC Vectorized with copy 78 / 86
201.4 5.0 23.9X
Partition column - Hive built-in ORC 1960 / 1979 8.0
124.6 1.0X
Both columns - Native ORC MR 2076 / 2090 7.6
132.0 0.9X
Both columns - Native ORC Vectorized 450 / 463 34.9
28.6 4.1X
-Both column - Native ORC Vectorized with copy 532 / 538 29.6
33.8 3.5X
Both columns - Hive built-in ORC 3528 / 3548 4.5
224.3 0.5X
@@ -103,7 +93,6 @@ Repeated String: Best/Avg Time(ms)
Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1727 / 1733 6.1
164.7 1.0X
Native ORC Vectorized 375 / 379 28.0
35.7 4.6X
-Native ORC Vectorized with copy 552 / 556 19.0
52.6 3.1X
Hive built-in ORC 2665 / 2666 3.9
254.2 0.6X
@@ -117,7 +106,6 @@ String with Nulls Scan (0.0%): Best/Avg Time(ms)
Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 3324 / 3325 3.2
317.0 1.0X
Native ORC Vectorized 1085 / 1106 9.7
103.4 3.1X
-Native ORC Vectorized with copy 1463 / 1471 7.2
139.5 2.3X
Hive built-in ORC 5272 / 5299 2.0
502.8 0.6X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -126,7 +114,6 @@ String with Nulls Scan (50.0%): Best/Avg Time(ms)
Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 3045 / 3046 3.4
290.4 1.0X
Native ORC Vectorized 1248 / 1260 8.4
119.0 2.4X
-Native ORC Vectorized with copy 1609 / 1624 6.5
153.5 1.9X
Hive built-in ORC 3989 / 3999 2.6
380.4 0.8X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -135,7 +122,6 @@ String with Nulls Scan (95.0%): Best/Avg Time(ms)
Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1692 / 1694 6.2
161.3 1.0X
Native ORC Vectorized 471 / 493 22.3
44.9 3.6X
-Native ORC Vectorized with copy 588 / 590 17.8
56.1 2.9X
Hive built-in ORC 2398 / 2411 4.4
228.7 0.7X
@@ -149,7 +135,6 @@ Single Column Scan from 100 columns: Best/Avg Time(ms)
Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 1371 / 1379 0.8
1307.5 1.0X
Native ORC Vectorized 121 / 135 8.6
115.8 11.3X
-Native ORC Vectorized with copy 122 / 138 8.6
116.2 11.3X
Hive built-in ORC 521 / 561 2.0
497.1 2.6X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -158,7 +143,6 @@ Single Column Scan from 200 columns: Best/Avg Time(ms)
Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 2711 / 2767 0.4
2585.5 1.0X
Native ORC Vectorized 210 / 232 5.0
200.5 12.9X
-Native ORC Vectorized with copy 208 / 219 5.0
198.4 13.0X
Hive built-in ORC 764 / 775 1.4
728.3 3.5X
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
@@ -167,7 +151,6 @@ Single Column Scan from 300 columns: Best/Avg Time(ms)
Rate(M/s) Per Ro
------------------------------------------------------------------------------------------------
Native ORC MR 3979 / 3988 0.3
3794.4 1.0X
Native ORC Vectorized 357 / 366 2.9
340.2 11.2X
-Native ORC Vectorized with copy 361 / 371 2.9
344.5 11.0X
Hive built-in ORC 1091 / 1095 1.0
1040.5 3.6X
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
index eb3cde8472dac..c03ae144a1595 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
@@ -96,12 +96,6 @@ object OrcReadBenchmark extends BenchmarkBase with SQLHelper
{
spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
}
- benchmark.addCase("Native ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
- }
- }
-
benchmark.addCase("Hive built-in ORC") { _ =>
spark.sql("SELECT sum(id) FROM hiveOrcTable").collect()
}
@@ -133,12 +127,6 @@ object OrcReadBenchmark extends BenchmarkBase with
SQLHelper {
spark.sql("SELECT sum(c1), sum(length(c2)) FROM
nativeOrcTable").collect()
}
- benchmark.addCase("Native ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(c1), sum(length(c2)) FROM
nativeOrcTable").collect()
- }
- }
-
benchmark.addCase("Hive built-in ORC") { _ =>
spark.sql("SELECT sum(c1), sum(length(c2)) FROM
hiveOrcTable").collect()
}
@@ -168,12 +156,6 @@ object OrcReadBenchmark extends BenchmarkBase with
SQLHelper {
spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
}
- benchmark.addCase("Data column - Native ORC Vectorized with copy") { _
=>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
- }
- }
-
benchmark.addCase("Data column - Hive built-in ORC") { _ =>
spark.sql("SELECT sum(id) FROM hiveOrcTable").collect()
}
@@ -188,12 +170,6 @@ object OrcReadBenchmark extends BenchmarkBase with
SQLHelper {
spark.sql("SELECT sum(p) FROM nativeOrcTable").collect()
}
- benchmark.addCase("Partition column - Native ORC Vectorized with
copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(p) FROM nativeOrcTable").collect()
- }
- }
-
benchmark.addCase("Partition column - Hive built-in ORC") { _ =>
spark.sql("SELECT sum(p) FROM hiveOrcTable").collect()
}
@@ -208,12 +184,6 @@ object OrcReadBenchmark extends BenchmarkBase with
SQLHelper {
spark.sql("SELECT sum(p), sum(id) FROM nativeOrcTable").collect()
}
- benchmark.addCase("Both column - Native ORC Vectorized with copy") { _
=>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(p), sum(id) FROM nativeOrcTable").collect()
- }
- }
-
benchmark.addCase("Both columns - Hive built-in ORC") { _ =>
spark.sql("SELECT sum(p), sum(id) FROM hiveOrcTable").collect()
}
@@ -242,12 +212,6 @@ object OrcReadBenchmark extends BenchmarkBase with
SQLHelper {
spark.sql("SELECT sum(length(c1)) FROM nativeOrcTable").collect()
}
- benchmark.addCase("Native ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT sum(length(c1)) FROM nativeOrcTable").collect()
- }
- }
-
benchmark.addCase("Hive built-in ORC") { _ =>
spark.sql("SELECT sum(length(c1)) FROM hiveOrcTable").collect()
}
@@ -284,13 +248,6 @@ object OrcReadBenchmark extends BenchmarkBase with
SQLHelper {
"WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
}
- benchmark.addCase("Native ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql("SELECT SUM(LENGTH(c2)) FROM nativeOrcTable " +
- "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
- }
- }
-
benchmark.addCase("Hive built-in ORC") { _ =>
spark.sql("SELECT SUM(LENGTH(c2)) FROM hiveOrcTable " +
"WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
@@ -324,12 +281,6 @@ object OrcReadBenchmark extends BenchmarkBase with
SQLHelper {
spark.sql(s"SELECT sum(c$middle) FROM nativeOrcTable").collect()
}
- benchmark.addCase("Native ORC Vectorized with copy") { _ =>
- withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
- spark.sql(s"SELECT sum(c$middle) FROM nativeOrcTable").collect()
- }
- }
-
benchmark.addCase("Hive built-in ORC") { _ =>
spark.sql(s"SELECT sum(c$middle) FROM hiveOrcTable").collect()
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]