heyihong commented on code in PR #53872:
URL: https://github.com/apache/spark/pull/53872#discussion_r2727847798


##########
sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala:
##########
@@ -753,6 +767,111 @@ class ClientStreamingQuerySuite extends QueryTest with 
RemoteSparkSession with L
       terminate = terminate :+ event.json
     }
   }
+
+  // Tests for DataStreamReader.name() method
+  testWithSourceEvolution("stream reader name() with valid source names") {
+    Seq("mySource", "my_source", "MySource123", "_private", "source_123_test", 
"123source")
+      .foreach { name =>
+        withTempPath { dir =>
+          val path = dir.getCanonicalPath
+          spark.range(10).write.parquet(path)
+
+          val df = spark.readStream
+            .format("parquet")
+            .schema("id LONG")
+            .name(name)
+            .load(path)
+
+          assert(df.isStreaming, s"DataFrame should be streaming for name: 
$name")
+        }
+      }
+  }
+
+  testWithSourceEvolution("stream reader name() method chaining") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      spark.range(10).write.parquet(path)
+
+      val df = spark.readStream
+        .format("parquet")
+        .schema("id LONG")
+        .name("my_source")
+        .option("maxFilesPerTrigger", "1")
+        .load(path)
+
+      assert(df.isStreaming, "DataFrame should be streaming")
+    }
+  }
+
+  test("stream reader invalid source name - contains hyphen") {

Review Comment:
   For these tests, it seems that the only changing parts are the input and the 
expected errors, so consider using table-driven tests. Below is an example:
   ```scala
   // Seq of (sourceName, expectedExceptionClass, expectedConditionOpt)
   val invalidSourceNames = Seq(
     ("my-source", classOf[AnalysisException], 
Some("STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME")),
     ("my space",  classOf[AnalysisException], 
Some("STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME")),
     ("my.source", classOf[AnalysisException], 
Some("STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME")),
     ("",          classOf[IllegalArgumentException], None) // empty string case
   )
   
   invalidSourceNames.foreach { case (sourceName, exceptionClass, conditionOpt) 
=>
     test(s"stream reader invalid source name - '$sourceName'") {
       withTempPath { dir =>
         val path = dir.getCanonicalPath
         spark.range(10).write.parquet(path)
   
         val thrown = intercept[Exception] {
           spark.readStream
             .format("parquet")
             .schema("id LONG")
             .name(sourceName)
             .load(path)
         }
   
         // Verify exception type
         assert(exceptionClass.isInstance(thrown))
   
         // Verify error condition only for AnalysisException cases
         conditionOpt.foreach { condition =>
           checkError(
             exception = thrown.asInstanceOf[AnalysisException],
             condition = condition,
             parameters = Map("sourceName" -> sourceName)
           )
         }
       }
     }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to