[
https://issues.apache.org/jira/browse/SPARK-26222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xiao Li updated SPARK-26222:
Comment: was deleted
(was: xuanyuanking closed pull request #23298: [SPARK-26222][SQL] Track file
listing time
URL: https://github.com/apache/spark/pull/23298
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
index cd75407c7ee7a..9d391741f596f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.BoundedPriorityQueue
* There are two separate concepts we track:
*
* 1. Phases: These are broad scope phases in query planning, as listed below,
i.e. analysis,
- * optimizationm and physical planning (just planning).
+ * optimization and physical planning (just planning).
*
* 2. Rules: These are the individual Catalyst rules that we track. In
addition to time, we also
* track the number of invocations and effective invocations.
@@ -41,6 +41,10 @@ object QueryPlanningTracker {
val OPTIMIZATION = "optimization"
val PLANNING = "planning"
+ // File listing relative phases.
+ val FILE_LISTING = "fileListing"
+ val PARTITION_PRUNING = "partitionPruning"
+
/**
* Summary for a rule.
* @param totalTimeNs total amount of time, in nanosecs, spent in this rule.
@@ -79,7 +83,11 @@ object QueryPlanningTracker {
}
/** Returns the current tracker in scope, based on the thread local
variable. */
- def get: Option[QueryPlanningTracker] = Option(localTracker.get())
+ def get: QueryPlanningTracker =
Option(localTracker.get()).getOrElse(NoopTracker)
+
+ /** Returns the current tracker in scope or create a new one. */
+ def getOrCreate: QueryPlanningTracker =
+Option(localTracker.get()).getOrElse(new QueryPlanningTracker)
/** Sets the current tracker for the execution of function f. We assume f is
single-threaded. */
def withTracker[T](tracker: QueryPlanningTracker)(f: => T): T = {
@@ -161,3 +169,9 @@ class QueryPlanningTracker {
}
}
+
+/** A no-op tracker used for current tracker in scope not set. */
+object NoopTracker extends QueryPlanningTracker {
+ override def measurePhase[T](phase: String)(f: => T): T = f
+ override def recordRuleInvocation(rule: String, timeNs: Long, effective:
Boolean): Unit = {}
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index cf6ff4f986399..f1a0b1ae2b6e7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -86,7 +86,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends
Logging {
var curPlan = plan
val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
val planChangeLogger = new PlanChangeLogger()
-val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get
+val tracker: QueryPlanningTracker = QueryPlanningTracker.get
batches.foreach { batch =>
val batchStartPlan = curPlan
@@ -112,7 +112,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]]
extends Logging {
queryExecutionMetrics.incNumExecution(rule.ruleName)
// Record timing information using QueryPlanningTracker
-tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime,
effective))
+tracker.recordRuleInvocation(rule.ruleName, runTime, effective)
// Run the structural integrity checker against the plan after
each rule.
if (!isPlanIntegral(result)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 9751528654ffb..917c9cc371fc0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -28,6 +28,7 @@ import org.apache.spark.annotation.Stable
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions,
UnivocityParser}
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import