Github user gaborgsomogyi commented on a diff in the pull request:
https://github.com/apache/spark/pull/20745#discussion_r175994267
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
---
@@ -405,4 +406,55 @@ class FileStreamSinkSuite extends StreamTest {
}
}
}
+
+ test("SPARK-23288 writing and checking output metrics") {
+ Seq("parquet", "orc", "text", "json").foreach { format =>
+ val inputData = MemoryStream[String]
+ val df = inputData.toDF()
+
+ withTempDir { outputDir =>
+ withTempDir { checkpointDir =>
+
+ var query: StreamingQuery = null
+
+ var numTasks = 0
+ var recordsWritten: Long = 0L
+ var bytesWritten: Long = 0L
+ try {
+ spark.sparkContext.addSparkListener(new SparkListener() {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ val outputMetrics = taskEnd.taskMetrics.outputMetrics
+ recordsWritten += outputMetrics.recordsWritten
+ bytesWritten += outputMetrics.bytesWritten
--- End diff --
Without registering statsTrackers output metrics are not filled and
`assert(recordsWritten === 5)` and `assert(bytesWritten > 0)` blows up.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]