cloud-fan commented on a change in pull request #31522:
URL: https://github.com/apache/spark/pull/31522#discussion_r672965451
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
##########
@@ -48,7 +48,9 @@ case class BasicWriteTaskStats(
/**
* Simple [[WriteTaskStatsTracker]] implementation that produces
[[BasicWriteTaskStats]].
*/
-class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
+class BasicWriteTaskStatsTracker(
+ hadoopConf: Configuration,
+ taskCommitTimeMetrics: Option[SQLMetric] = None)
Review comment:
when this can be None? test?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
##########
@@ -221,16 +226,26 @@ object BasicWriteJobStatsTracker {
private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes"
private val NUM_OUTPUT_ROWS_KEY = "numOutputRows"
private val NUM_PARTS_KEY = "numParts"
+ val TASK_COMMIT_DURATION = "taskCommitDuration"
+ private val JOB_COMMIT_DURATION = "jobCommitDuration"
/** XAttr key of the data length header added in HADOOP-17414. */
val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length"
- def metrics: Map[String, SQLMetric] = {
+ def driverSideMetrics: Map[String, SQLMetric] = {
val sparkContext = SparkContext.getActive.get
Map(
NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of
written files"),
NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext,
"written output"),
NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
- NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of
dynamic part")
+ NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of
dynamic part"),
+ JOB_COMMIT_DURATION ->
+ SQLMetrics.createTimingMetric(sparkContext, "duration of job commit")
)
}
+
+ def taskCommitTimeMetric: Map[String, SQLMetric] = {
Review comment:
why Map? `def taskCommitTimeMetric: SQLMetric ...`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
##########
@@ -221,16 +226,26 @@ object BasicWriteJobStatsTracker {
private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes"
private val NUM_OUTPUT_ROWS_KEY = "numOutputRows"
private val NUM_PARTS_KEY = "numParts"
+ val TASK_COMMIT_DURATION = "taskCommitDuration"
+ private val JOB_COMMIT_DURATION = "jobCommitDuration"
/** XAttr key of the data length header added in HADOOP-17414. */
val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length"
- def metrics: Map[String, SQLMetric] = {
+ def driverSideMetrics: Map[String, SQLMetric] = {
val sparkContext = SparkContext.getActive.get
Map(
NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of
written files"),
NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext,
"written output"),
NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
- NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of
dynamic part")
+ NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of
dynamic part"),
+ JOB_COMMIT_DURATION ->
+ SQLMetrics.createTimingMetric(sparkContext, "duration of job commit")
)
}
+
+ def taskCommitTimeMetric: Map[String, SQLMetric] = {
Review comment:
`def taskCommitTimeMetric: (String, SQLMetric)`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
##########
@@ -91,11 +91,14 @@ abstract class FileFormatDataWriter(
* driver too and used to e.g. update the metrics in UI.
*/
override def commit(): WriteTaskResult = {
- releaseResources()
+ val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs {
+ releaseResources()
Review comment:
shall we include `releaseResources` in the commit time?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
##########
@@ -302,13 +302,9 @@ object FileFormatWriter extends Logging {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
// Execute the task to write rows out and commit the task.
val taskAttemptID = taskAttemptContext.getTaskAttemptID
- val (res, timeCost) = Utils.timeTakenMs {
- logDebug("$taskAttemptID starts to write and commit.")
- dataWriter.writeWithIterator(iterator)
- dataWriter.commit()
- }
- logInfo(s"$taskAttemptID finished to write and commit. Elapsed time:
$timeCost ms.")
- res
+ logDebug(s"$taskAttemptID starts to write and commit.")
Review comment:
This log doesn't exist before
https://github.com/apache/spark/commit/f5a63322def87904ebfa95673a584c094e6062cc#diff-03e7ff19e93cb270d82aca907ac0a1f87463ba6eb5dce78a407bb169b840a6cb
, shall we remove it?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala
##########
@@ -66,10 +66,11 @@ trait WriteTaskStatsTracker {
/**
* Returns the final statistics computed so far.
+ * @param taskCommitTime The task commit duration.
Review comment:
nit: `Time of committing the task`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala
##########
@@ -93,6 +94,7 @@ trait WriteJobStatsTracker extends Serializable {
* Process the given collection of stats computed during this job.
* E.g. aggregate them, write them to memory / disk, issue warnings,
whatever.
* @param stats One [[WriteTaskStats]] object from each successful write
task.
+ * @param jobCommitDuration Duration of job commit.
Review comment:
ditto
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
##########
@@ -221,16 +226,26 @@ object BasicWriteJobStatsTracker {
private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes"
private val NUM_OUTPUT_ROWS_KEY = "numOutputRows"
private val NUM_PARTS_KEY = "numParts"
+ val TASK_COMMIT_DURATION = "taskCommitDuration"
+ private val JOB_COMMIT_DURATION = "jobCommitDuration"
Review comment:
`taskCommitTime` and `jobCommitTime`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
##########
@@ -221,16 +226,26 @@ object BasicWriteJobStatsTracker {
private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes"
private val NUM_OUTPUT_ROWS_KEY = "numOutputRows"
private val NUM_PARTS_KEY = "numParts"
+ val TASK_COMMIT_DURATION = "taskCommitDuration"
+ private val JOB_COMMIT_DURATION = "jobCommitDuration"
/** XAttr key of the data length header added in HADOOP-17414. */
val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length"
- def metrics: Map[String, SQLMetric] = {
+ def driverSideMetrics: Map[String, SQLMetric] = {
val sparkContext = SparkContext.getActive.get
Map(
NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of
written files"),
NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext,
"written output"),
NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
- NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of
dynamic part")
+ NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of
dynamic part"),
+ JOB_COMMIT_DURATION ->
+ SQLMetrics.createTimingMetric(sparkContext, "duration of job commit")
Review comment:
`time of committing the job`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
##########
@@ -221,16 +226,26 @@ object BasicWriteJobStatsTracker {
private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes"
private val NUM_OUTPUT_ROWS_KEY = "numOutputRows"
private val NUM_PARTS_KEY = "numParts"
+ val TASK_COMMIT_DURATION = "taskCommitDuration"
+ private val JOB_COMMIT_DURATION = "jobCommitDuration"
/** XAttr key of the data length header added in HADOOP-17414. */
val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length"
- def metrics: Map[String, SQLMetric] = {
+ def driverSideMetrics: Map[String, SQLMetric] = {
val sparkContext = SparkContext.getActive.get
Map(
NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of
written files"),
NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext,
"written output"),
NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
- NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of
dynamic part")
+ NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of
dynamic part"),
+ JOB_COMMIT_DURATION ->
+ SQLMetrics.createTimingMetric(sparkContext, "duration of job commit")
)
}
+
+ def taskCommitTimeMetric: Map[String, SQLMetric] = {
+ val sparkContext = SparkContext.getActive.get
+ Map(TASK_COMMIT_DURATION ->
+ SQLMetrics.createTimingMetric(sparkContext, "duration of task commit"))
Review comment:
`time of committing tasks`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]