heyihong commented on code in PR #53872:
URL: https://github.com/apache/spark/pull/53872#discussion_r2727847798
##########
sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala:
##########
@@ -753,6 +767,111 @@ class ClientStreamingQuerySuite extends QueryTest with
RemoteSparkSession with L
terminate = terminate :+ event.json
}
}
+
+ // Tests for DataStreamReader.name() method
+ testWithSourceEvolution("stream reader name() with valid source names") {
+ Seq("mySource", "my_source", "MySource123", "_private", "source_123_test",
"123source")
+ .foreach { name =>
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ spark.range(10).write.parquet(path)
+
+ val df = spark.readStream
+ .format("parquet")
+ .schema("id LONG")
+ .name(name)
+ .load(path)
+
+ assert(df.isStreaming, s"DataFrame should be streaming for name:
$name")
+ }
+ }
+ }
+
+ testWithSourceEvolution("stream reader name() method chaining") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ spark.range(10).write.parquet(path)
+
+ val df = spark.readStream
+ .format("parquet")
+ .schema("id LONG")
+ .name("my_source")
+ .option("maxFilesPerTrigger", "1")
+ .load(path)
+
+ assert(df.isStreaming, "DataFrame should be streaming")
+ }
+ }
+
+ test("stream reader invalid source name - contains hyphen") {
Review Comment:
For these tests, it seems that the only changing parts are the input and the
expected errors, so consider using table-driven tests. Below is the example
code:
```scala
// Seq of (sourceName, expectedExceptionClass, expectedConditionOpt)
val invalidSourceNames = Seq(
("my-source", classOf[AnalysisException],
Some("STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME")),
("my space", classOf[AnalysisException],
Some("STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME")),
("my.source", classOf[AnalysisException],
Some("STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME")),
("", classOf[IllegalArgumentException], None) // empty string case
)
invalidSourceNames.foreach { case (sourceName, exceptionClass, conditionOpt)
=>
test(s"stream reader invalid source name - '$sourceName'") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.range(10).write.parquet(path)
val thrown = intercept[Exception] {
spark.readStream
.format("parquet")
.schema("id LONG")
.name(sourceName)
.load(path)
}
// Verify exception type
assert(exceptionClass.isInstance(thrown))
// Verify error condition only for AnalysisException cases
conditionOpt.foreach { condition =>
checkError(
exception = thrown.asInstanceOf[AnalysisException],
condition = condition,
parameters = Map("sourceName" -> sourceName)
)
}
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]