HeartSaVioR commented on a change in pull request #26158: 
[SPARK-29509][SQL][SS] Deduplicate codes from Kafka data source
URL: https://github.com/apache/spark/pull/26158#discussion_r339281575
 
 

 ##########
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
 ##########
 @@ -422,4 +349,54 @@ class KafkaContinuousSinkSuite extends 
KafkaContinuousTest {
     withOptions.foreach(opt => stream.option(opt._1, opt._2))
     stream.start()
   }
+
+  private def runAndVerifyStreamingQueryException(
+      inputTopic: String,
+      expectErrorMsg: String)(
+      writerFn: => StreamingQuery): Unit = {
+    var writer: StreamingQuery = null
+    val ex: Exception = try {
+      intercept[StreamingQueryException] {
+        writer = writerFn
+        testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
+        eventually(timeout(streamingTimeout)) {
+          assert(writer.exception.isDefined)
+        }
+        throw writer.exception.get
+      }
+    } finally {
+      if (writer != null) writer.stop()
+    }
+    
assert(ex.getCause.getCause.getMessage.toLowerCase(Locale.ROOT).contains(expectErrorMsg))
+  }
+
+  private def verifyException[T <: Exception : ClassTag](
+      expectErrorMsg: String)(
+      writerFn: => StreamingQuery): Unit = {
+    var writer: StreamingQuery = null
+    val ex: Exception = try {
+      intercept[T] {
+        writer = writerFn
+      }
+    } finally {
+      if (writer != null) writer.stop()
+    }
+    assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(expectErrorMsg))
+  }
+
+  private def assertWrongSchema(
+      input: DataFrame,
+      selectExpr: Seq[String],
+      expectErrorMsg: String): Unit = {
+    verifyException[AnalysisException](expectErrorMsg)(
 
 Review comment:
   I separated both because one does need actual input topic and the other 
doesn't, but no harm to provide input topic for latter as well. I'll make a 
change.
   
   Btw, there's some difference of waiting the query result and checking 
exception between batch/micro-batch and continuous so it doesn't seem to be a 
duplication. Actually the purpose of this PR was deduplicating the code which 
is due to the number of fields, and scope seems to be continuously increasing. 
Maybe we can revisit deduplicating code between batch/micro-batch and 
continuous once more in follow-up PR. WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to