Github user attilapiros commented on a diff in the pull request:
https://github.com/apache/spark/pull/20958#discussion_r214063180
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
---
@@ -256,6 +257,58 @@ class TextSocketStreamSuite extends StreamTest with
SharedSQLContext with Before
}
}
+ test("fail on recovery - true") {
+ val checkpointDir =
Files.createTempDirectory("checkpoint").toFile.getAbsolutePath
+ runSocketStream(checkpointDir, true, Seq(("hello", "hello"), ("world",
"world")))
+
+ // Rerun socket stream will throw an exception, because it wrongly
honors the recovered offsets.
+ val exception = intercept[StreamingQueryException](
+ runSocketStream(checkpointDir, true, Seq(("hello", "hello"),
("world", "world"))))
+ assert(exception.getMessage.contains(
+ "terminated with exception: Offsets committed out of order"))
+ }
+
+ test("fail on recovery - false") {
+ val checkpointDir =
Files.createTempDirectory("checkpoint").toFile.getAbsolutePath
+ runSocketStream(checkpointDir, false, Seq(("hello", "hello"),
("world", "world")))
+
+ // Rerun socket stream will not throw exception
+ runSocketStream(checkpointDir, false, Seq(("hello", "hello"),
("world", "world")))
+ }
+
+ private def runSocketStream(
+ chkpointDir: String,
+ failOnRecovery: Boolean,
+ inputAndRets: Seq[(String, String)]): Unit = withSQLConf(
+ "spark.sql.streaming.unsupportedOperationCheck" -> "false") {
+ serverThread = new ServerThread()
+ serverThread.start()
+
+ try {
+ val ref = spark
+ import ref.implicits._
+ val socket = spark
+ .readStream
+ .format("socket")
+ .options(Map("host" -> "localhost", "port" ->
serverThread.port.toString))
+ .option("failonrecovery", failOnRecovery.toString)
+ .load()
+ .as[String]
+
+ val actions = Seq(StartStream(checkpointLocation = chkpointDir)) ++
+ inputAndRets.flatMap {
+ case (input, ret) => Seq(AddSocketData(input),
CheckLastBatch(ret))
+ } ++ Seq(StopStream)
+ testStream(socket)(actions : _*)
+ } finally {
+ if (serverThread != null) {
+ serverThread.interrupt()
+ serverThread.join()
+ serverThread = null
--- End diff --
Nit: assignment not needed
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]