Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/20745#discussion_r175992781
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
---
@@ -405,4 +406,55 @@ class FileStreamSinkSuite extends StreamTest {
}
}
}
+
+ test("SPARK-23288 writing and checking output metrics") {
+ Seq("parquet", "orc", "text", "json").foreach { format =>
+ val inputData = MemoryStream[String]
+ val df = inputData.toDF()
+
+ withTempDir { outputDir =>
+ withTempDir { checkpointDir =>
+
+ var query: StreamingQuery = null
+
+ var numTasks = 0
+ var recordsWritten: Long = 0L
+ var bytesWritten: Long = 0L
+ try {
+ spark.sparkContext.addSparkListener(new SparkListener() {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ val outputMetrics = taskEnd.taskMetrics.outputMetrics
+ recordsWritten += outputMetrics.recordsWritten
+ bytesWritten += outputMetrics.bytesWritten
--- End diff --
how does it test
https://github.com/apache/spark/pull/20745/files#diff-bfa54a3a7c3a41ecbb805d45dcfef2a1R101
?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]