Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20958#discussion_r214063180
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
    @@ -256,6 +257,58 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSQLContext with Before
         }
       }
     
    +  test("fail on recovery - true") {
    +    val checkpointDir = 
Files.createTempDirectory("checkpoint").toFile.getAbsolutePath
    +    runSocketStream(checkpointDir, true, Seq(("hello", "hello"), ("world", 
"world")))
    +
    +    // Rerun socket stream will throw an exception, because it wrongly 
honors the recovered offsets.
    +    val exception = intercept[StreamingQueryException](
    +      runSocketStream(checkpointDir, true, Seq(("hello", "hello"), 
("world", "world"))))
    +    assert(exception.getMessage.contains(
    +      "terminated with exception: Offsets committed out of order"))
    +  }
    +
    +  test("fail on recovery - false") {
    +    val checkpointDir = 
Files.createTempDirectory("checkpoint").toFile.getAbsolutePath
    +    runSocketStream(checkpointDir, false, Seq(("hello", "hello"), 
("world", "world")))
    +
    +    // Rerun socket stream will not throw exception
    +    runSocketStream(checkpointDir, false, Seq(("hello", "hello"), 
("world", "world")))
    +  }
    +
    +  private def runSocketStream(
    +      chkpointDir: String,
    +      failOnRecovery: Boolean,
    +      inputAndRets: Seq[(String, String)]): Unit = withSQLConf(
    +    "spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      serverThread = new ServerThread()
    +      serverThread.start()
    +
    +      try {
    +        val ref = spark
    +        import ref.implicits._
    +        val socket = spark
    +          .readStream
    +          .format("socket")
    +          .options(Map("host" -> "localhost", "port" -> 
serverThread.port.toString))
    +          .option("failonrecovery", failOnRecovery.toString)
    +          .load()
    +          .as[String]
    +
    +        val actions = Seq(StartStream(checkpointLocation = chkpointDir)) ++
    +          inputAndRets.flatMap {
    +            case (input, ret) => Seq(AddSocketData(input), 
CheckLastBatch(ret))
    +          } ++ Seq(StopStream)
    +        testStream(socket)(actions : _*)
    +      } finally {
    +        if (serverThread != null) {
    +          serverThread.interrupt()
    +          serverThread.join()
    +          serverThread = null
    --- End diff --
    
    Nit: assignment not needed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to