gatorsmile closed pull request #19560: [SPARK-22334][SQL] Check table size from
filesystem in case the size in metastore is wrong.
URL: https://github.com/apache/spark/pull/19560
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4cfe53b2c115b..6fa006fa7c9b5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -187,6 +187,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val VERIFY_STATS_FROM_FILESYSTEM_WHEN_JOIN =
+ buildConf("spark.sql.statistics.verifyStatsFromFileSystemWhenJoin")
+ .doc("If table size in metastore is below
spark.sql.autoBroadcastJoinThreshold, check the" +
+ " real size on file system and set table size to be the bigger one. This
is for defense" +
+ " and help avoid OOM caused by broadcast join. It's useful when
metastore failed to" +
+ " update the stats of table previously.")
+ .booleanConf
+ .createWithDefault(false)
+
val DEFAULT_SIZE_IN_BYTES = buildConf("spark.sql.defaultSizeInBytes")
.internal()
.doc("The default table size used in query planning. By default, it is set
to Long.MaxValue " +
@@ -1104,6 +1113,9 @@ class SQLConf extends Serializable with Logging {
def fallBackToHdfsForStatsEnabled: Boolean =
getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)
+ def verifyStatsFromFileSystemWhenJoin: Boolean =
+ getConf(VERIFY_STATS_FROM_FILESYSTEM_WHEN_JOIN)
+
def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 3592b8f4846d1..e19cfdcacabff 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive
import java.io.IOException
+import java.net.URI
import java.util.Locale
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -26,8 +27,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir,
InsertIntoTable, LogicalPlan,
- ScriptTransformation}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
@@ -120,22 +120,53 @@ class DetermineTableStats(session: SparkSession) extends
Rule[LogicalPlan] {
if DDLUtils.isHiveTable(relation.tableMeta) &&
relation.tableMeta.stats.isEmpty =>
val table = relation.tableMeta
val sizeInBytes = if
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
- try {
- val hadoopConf = session.sessionState.newHadoopConf()
- val tablePath = new Path(table.location)
- val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
- fs.getContentSummary(tablePath).getLength
- } catch {
- case e: IOException =>
- logWarning("Failed to get table size from hdfs.", e)
- session.sessionState.conf.defaultSizeInBytes
- }
+ getSizeFromFileSystem(table.location)
} else {
session.sessionState.conf.defaultSizeInBytes
}
val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes =
BigInt(sizeInBytes))))
relation.copy(tableMeta = withStats)
+
+ case r: Join =>
+ r.transformUp {
+ case relation: HiveTableRelation => verifySize(relation)
+ }
+ }
+
+ private[this] def verifySize(relation: HiveTableRelation): HiveTableRelation
= {
+ if (DDLUtils.isHiveTable(relation.tableMeta) &&
relation.tableMeta.stats.nonEmpty &&
+ session.sessionState.conf.verifyStatsFromFileSystemWhenJoin &&
+ relation.tableMeta.stats.get.sizeInBytes <
+ session.sessionState.conf.autoBroadcastJoinThreshold) {
+ val table = relation.tableMeta
+ val sizeInBytes = getSizeFromFileSystem(table.location)
+ if (sizeInBytes > relation.tableMeta.stats.get.sizeInBytes) {
+ logWarning(s"For hive table ${relation.tableMeta.qualifiedName}, its
size" +
+ s" ${relation.tableMeta.stats.get.sizeInBytes} from metastore is
smaller than its" +
+ s" real size $sizeInBytes on file system. Please update stats in
metastore accurately.")
+ val newTable =
+ table.copy(stats = Some(CatalogStatistics(sizeInBytes =
BigInt(sizeInBytes))))
+ relation.copy(tableMeta = newTable)
+ } else {
+ relation
+ }
+ } else {
+ relation
+ }
+ }
+
+ private[this] def getSizeFromFileSystem(loc: URI): Long = {
+ try {
+ val hadoopConf = session.sessionState.newHadoopConf()
+ val tablePath = new Path(loc)
+ val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+ fs.getContentSummary(tablePath).getLength
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to get table size from hdfs.", e)
+ session.sessionState.conf.defaultSizeInBytes
+ }
}
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index b9a5ad7657134..857b61eddb90c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -79,6 +79,59 @@ class StatisticsSuite extends StatisticsCollectionTestBase
with TestHiveSingleto
}
}
+ test("Verify stats from file system when join.") {
+ withSQLConf(SQLConf.VERIFY_STATS_FROM_FILESYSTEM_WHEN_JOIN.key -> "true") {
+ withTable("csv_table") {
+ withTempDir { tempDir =>
+ // EXTERNAL OpenCSVSerde table pointing to LOCATION
+ val file1 = new File(tempDir + "/data1")
+ val writer1 = new PrintWriter(file1)
+ writer1.write("1,2")
+ writer1.close()
+
+ val file2 = new File(tempDir + "/data2")
+ val writer2 = new PrintWriter(file2)
+ writer2.write("1,2")
+ writer2.close()
+
+ sql(
+ s"""
+ |CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+ |WITH SERDEPROPERTIES (
+ |\"separatorChar\" = \",\",
+ |\"quoteChar\" = \"\\\"\",
+ |\"escapeChar\" = \"\\\\\")
+ |LOCATION '${tempDir.toURI}'
+ |tblproperties(
+ | 'numFiles'='1',
+ | 'rawDataSize'='1',
+ | 'totalSize'='1'
+ |)
+ |""".stripMargin)
+ val relation =
spark.table("csv_table").queryExecution.analyzed.children.head
+ .asInstanceOf[HiveTableRelation]
+ val sizeInBytes = relation.stats.sizeInBytes
+ assert(sizeInBytes === 1)
+
+ val df = sql(
+ """
+ |SELECT * FROM
+ |csv_table a LEFT JOIN csv_table b
+ |on a.page_id = b.page_id
+ """.stripMargin)
+ val relations = df.queryExecution.analyzed.collect {
+ case relation: HiveTableRelation => relation
+ }
+ assert(relations.length === 2)
+ val realSize = BigInt(file1.length() + file2.length())
+ assert(relations(0).stats.sizeInBytes === realSize)
+ assert(relations(1).stats.sizeInBytes === realSize)
+ }
+ }
+ }
+ }
+
test("analyze Hive serde tables") {
def queryTotalSize(tableName: String): BigInt =
spark.table(tableName).queryExecution.analyzed.stats.sizeInBytes
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]