dongjoon-hyun commented on a change in pull request #26016: [SPARK-24914][SQL]
New statistic to improve data size estimate for columnar storage formats
URL: https://github.com/apache/spark/pull/26016#discussion_r388018047
##########
File path:
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
##########
@@ -1556,4 +1538,150 @@ class StatisticsSuite extends
StatisticsCollectionTestBase with TestHiveSingleto
}
}
}
+
+ private def checkDeserializationFactor(tableName: String, exists: Boolean):
Unit = {
+ spark.sessionState.catalog.refreshTable(TableIdentifier(tableName))
+ val catalogTable = getCatalogTable(tableName)
+ assert(catalogTable.stats.isDefined)
+ assert(catalogTable.stats.get.deserFactor.isDefined === exists)
+ }
+
+ test("SPARK-24914 - test deserialization factor calculation (for ORC
tables)") {
+ val table = s"sizeTest"
+ withTable(table) {
+ import spark.implicits._
+ sql(s"CREATE TABLE $table (key INT, value1 BIGINT, value2 BIGINT, value3
BIGINT, " +
+ s"value4 BIGINT) PARTITIONED BY (ds STRING) STORED AS ORC")
+ spark.range(5000)
+ .map(i => (i * 6, i * 5, i * 4, i * 3, i * 2))
+ .toDF("key", "value1", "value2", "value3", "value4")
+ .createOrReplaceTempView("TMP1")
+ spark.range(5000)
+ .map(i => (1, 1, 1, 1, 1))
+ .toDF("key", "value1", "value2", "value3", "value4")
+ .createOrReplaceTempView("TMP2")
+
+ sql(s"INSERT INTO TABLE $table PARTITION (ds='2010-01-01') SELECT * FROM
TMP1")
+
+ val catalogTable = getCatalogTable(table)
+ assert(catalogTable.stats.isEmpty)
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+ checkDeserializationFactor(table, exists = false)
+ val origSizeInBytes =
spark.table(table).queryExecution.optimizedPlan.stats.sizeInBytes
+ logInfo(s"original sizeInBytes (file size): $origSizeInBytes")
+
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key ->
s"${origSizeInBytes - 1}") {
+ val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key =
t2.key")
+ checkNumBroadcastHashJoins(res, 0,
+ "sort merge join should be taken as threshold is smaller than table
size")
+ }
+
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key ->
s"${origSizeInBytes + 1}") {
+ val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key =
t2.key")
+ checkNumBroadcastHashJoins(res, 1,
+ "broadcast join should be taken as the threshold is greater than
table size")
+ }
+
+ withSQLConf(
+ SQLConf.DESERIALIZATION_FACTOR_CALC_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> s"${origSizeInBytes + 1}")
{
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+ checkDeserializationFactor(table, exists = true)
+ val newSizeInBytes =
spark.table(table).queryExecution.optimizedPlan.stats.sizeInBytes
+ assert(2 * origSizeInBytes <= newSizeInBytes)
+ logInfo(s"sizeInBytes after applying deserFactor: $newSizeInBytes")
+ val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key =
t2.key")
+ checkNumBroadcastHashJoins(res, 0,
+ "sort merge join should be taken despite the threshold is greater
than the table" +
+ "size as the deserialization factor is applied")
+ }
+
+ withSQLConf(
+ SQLConf.DESERIALIZATION_FACTOR_CALC_ENABLED.key -> "false",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> s"${origSizeInBytes + 1}")
{
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+ checkDeserializationFactor(table, exists = true)
+ val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key =
t2.key")
+ checkNumBroadcastHashJoins(res, 0,
+ "sort merge join should be taken despite deserialization factor
calculation is " +
+ "disabled as the old factor is reused")
+ }
+
+ withSQLConf(SQLConf.DESERIALIZATION_FACTOR_CALC_ENABLED.key -> "true") {
+ val catalogTableBefore = getCatalogTable(table)
+ val deserFactorBefore = catalogTableBefore.stats.get.deserFactor.get
+ sql(s"INSERT INTO TABLE $table PARTITION (ds='2010-01-02') SELECT *
FROM TMP1")
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+ spark.sessionState.catalog.refreshTable(TableIdentifier(table))
+ val catalogTable1 = getCatalogTable(table)
+ assert(catalogTable1.stats.isDefined &&
+ catalogTable1.stats.get.deserFactor.isDefined)
+ assert(catalogTable1.stats.get.deserFactor.get === deserFactorBefore,
+ "deserFactor should not change by adding a smaller or same size
partition")
+
+ sql(s"INSERT INTO TABLE $table PARTITION (ds='2010-01-03') SELECT *
FROM TMP2")
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+
+ spark.sessionState.catalog.refreshTable(TableIdentifier(table))
+ val catalogTable2 = getCatalogTable(table)
+ assert(catalogTable2.stats.isDefined &&
+ catalogTable2.stats.get.deserFactor.isDefined)
+ assert(catalogTable2.stats.get.deserFactor.get > deserFactorBefore,
+ "deserialization factor increased after adding a partition which is
more compressed")
+ }
+
+ sql(s"TRUNCATE TABLE $table")
+ withSQLConf(
+ SQLConf.DESERIALIZATION_FACTOR_CALC_ENABLED.key -> "false",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> s"${origSizeInBytes + 1}")
{
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+ checkDeserializationFactor(table, exists = false)
+ val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key =
t2.key")
+ checkNumBroadcastHashJoins(res, 1,
+ "broadcast join should be taken as deserialization factor is deleted
by TRUNCATE")
+ }
+ }
+ }
+
+ test("SPARK-24914 - test changing of the deserialization factor by setting
the table property") {
Review comment:
Instead, you need to verify that the manual setting overrides the automatic
calculation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]