Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20225#discussion_r161098483
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
    @@ -219,6 +201,42 @@ class ContinuousSuite extends ContinuousSuiteBase {
           StopStream)
       }
     
    +  test("task failure kills the query") {
    +    val df = spark.readStream
    +      .format("rate")
    +      .option("numPartitions", "5")
    +      .option("rowsPerSecond", "5")
    +      .load()
    +      .select('value)
    +
    +    // Get an arbitrary task from this query to kill. It doesn't matter 
which one.
    +    var taskId: Long = -1
    +    val listener = new SparkListener() {
    +      override def onTaskStart(start: SparkListenerTaskStart): Unit = {
    +        taskId = start.taskInfo.taskId
    +      }
    +    }
    +    spark.sparkContext.addSparkListener(listener)
    +
    +    testStream(df, useV2Sink = true)(
    +      StartStream(Trigger.Continuous(100)),
    +      Execute(waitForRateSourceTriggers(_, 2)),
    +      Execute { query =>
    +        // Wait until a task is started, then kill its first attempt.
    +        eventually(timeout(streamingTimeout)) { assert(taskId != -1) }
    +        spark.sparkContext.killTaskAttempt(taskId)
    +        eventually(timeout(streamingTimeout)) {
    --- End diff --
    
    Can this be checked with a "ExpectFailure" test? Better to test using the 
same harness that is used for microbatch so that we are sure they failure 
behavior is the same.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to