Github user gaborgsomogyi commented on a diff in the pull request:
https://github.com/apache/spark/pull/20745#discussion_r175186800
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
---
@@ -405,4 +406,52 @@ class FileStreamSinkSuite extends StreamTest {
}
}
}
+
+ test("SPARK-23288 writing and checking output metrics") {
+ Seq("parquet", "orc", "text", "json").foreach { format =>
+ val inputData = MemoryStream[String]
+ val df = inputData.toDF()
+
+ val outputDir = Utils.createTempDir(namePrefix =
"stream.output").getCanonicalPath
+ val checkpointDir = Utils.createTempDir(namePrefix =
"stream.checkpoint").getCanonicalPath
+
+ var query: StreamingQuery = null
+
+ var numTasks = 0
+ var recordsWritten: Long = 0L
+ var bytesWritten: Long = 0L
+ try {
+ spark.sparkContext.addSparkListener(new SparkListener() {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ val outputMetrics = taskEnd.taskMetrics.outputMetrics
+ recordsWritten += outputMetrics.recordsWritten
+ bytesWritten += outputMetrics.bytesWritten
+ numTasks += 1
+ }
+ })
+
+ query =
+ df.writeStream
+ .option("checkpointLocation", checkpointDir)
+ .format(format)
+ .start(outputDir)
+
+ inputData.addData("1", "2", "3")
+ inputData.addData("4", "5")
+
+ failAfter(streamingTimeout) {
+ query.processAllAvailable()
+ }
+
--- End diff --
Added.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]