anishshri-db commented on code in PR #54373:
URL: https://github.com/apache/spark/pull/54373#discussion_r2830785259
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala:
##########
@@ -159,16 +193,258 @@ class StreamingQueryEvolutionSuite extends StreamTest {
assert(union.isStreaming, "Union should be streaming")
}
+ test("without enforcement - naming sources throws error") {
+ withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("mySource")
+ .load()
+ },
+ condition =
"STREAMING_QUERY_EVOLUTION_ERROR.SOURCE_NAMING_NOT_SUPPORTED",
+ parameters = Map("name" -> "mySource"))
+ }
+ }
+
+ // =======================
+ // Metadata Path Tests
+ // =======================
+
+ testWithSourceEvolution("named sources - metadata path uses source name") {
+ LastOptions.clear()
+
+ val checkpointLocation = new Path(newMetadataDir)
+
+ val df1 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source1")
+ .load()
+
+ val df2 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source2")
+ .load()
+
+ val q = df1.union(df2).writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q.processAllAvailable()
+ q.stop()
+
+ verifySourcePath(checkpointLocation, "source1")
+ verifySourcePath(checkpointLocation, "source2")
+ }
+
+ test("unnamed sources use positional IDs for metadata path") {
+ withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") {
+ LastOptions.clear()
+
+ val checkpointLocation = new Path(newMetadataDir)
+
+ val df1 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .load()
+
+ val df2 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .load()
+
+ val q = df1.union(df2).writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q.processAllAvailable()
+ q.stop()
+
+ // Without naming, sources get sequential IDs (Unassigned -> 0, 1, ...)
+ verifySourcePath(checkpointLocation, "0")
+ verifySourcePath(checkpointLocation, "1")
+ }
+ }
+
+ // ========================
+ // Source Evolution Tests
+ // ========================
+
+ testWithSourceEvolution("source evolution - reorder sources with named
sources") {
+ LastOptions.clear()
+
+ val checkpointLocation = new Path(newMetadataDir)
+
+ // First query: source1 then source2
+ val df1a = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source1")
+ .load()
+
+ val df2a = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source2")
+ .load()
+
+ val q1 = df1a.union(df2a).writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q1.processAllAvailable()
+ q1.stop()
+
+ LastOptions.clear()
+
+ // Second query: source2 then source1 (reordered) - should still work
+ val df1b = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source1")
+ .load()
+
+ val df2b = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source2")
+ .load()
+
+ val q2 = df2b.union(df1b).writeStream // Note: reversed order
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q2.processAllAvailable()
+ q2.stop()
+
+ // Both sources should still use their named paths
+ verifySourcePath(checkpointLocation, "source1", atLeastOnce())
+ verifySourcePath(checkpointLocation, "source2", atLeastOnce())
+ }
+
+ testWithSourceEvolution("source evolution - add new source with named
sources") {
+ LastOptions.clear()
+
+ val checkpointLocation = new Path(newMetadataDir)
+
+ // First query: only source1
+ val df1 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source1")
+ .load()
+
+ val q1 = df1.writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q1.processAllAvailable()
+ q1.stop()
+
+ LastOptions.clear()
+
+ // Second query: add source2
+ val df1b = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source1")
+ .load()
+
+ val df2 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source2")
+ .load()
+
+ val q2 = df1b.union(df2).writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q2.processAllAvailable()
+ q2.stop()
+
+ // Both sources should have been created
+ verifySourcePath(checkpointLocation, "source1", atLeastOnce())
+ verifySourcePath(checkpointLocation, "source2")
+ }
+
+ testWithSourceEvolution("named sources enforcement uses V2 offset log
format") {
+ LastOptions.clear()
+
+ val checkpointLocation = new Path(newMetadataDir)
+
+ val df1 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source1")
+ .load()
+
+ val df2 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("source2")
+ .load()
+
+ val q = df1.union(df2).writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q.processAllAvailable()
+ q.stop()
+
+ import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap,
OffsetSeqLog}
+ val offsetLog = new OffsetSeqLog(spark,
+ makeQualifiedPath(checkpointLocation.toString).toString + "/offsets")
+ val offsetSeq = offsetLog.get(0)
+ assert(offsetSeq.isDefined, "Offset log should have batch 0")
+ assert(offsetSeq.get.isInstanceOf[OffsetMap],
+ s"Expected OffsetMap but got ${offsetSeq.get.getClass.getSimpleName}")
+ }
+
+ testWithSourceEvolution("names preserved through union operations") {
+ LastOptions.clear()
+
+ val checkpointLocation = new Path(newMetadataDir)
+
+ val df1 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("alpha")
+ .load()
+
+ val df2 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("beta")
+ .load()
+
+ val df3 = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .name("gamma")
+ .load()
+
+ // Complex union: (alpha union beta) union gamma
+ val q = df1.union(df2).union(df3).writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation.toString)
+ .trigger(ProcessingTime(10.seconds))
+ .start()
+ q.processAllAvailable()
+ q.stop()
+
+ // All three sources should use their named paths
+ verifySourcePath(checkpointLocation, "alpha")
+ verifySourcePath(checkpointLocation, "beta")
+ verifySourcePath(checkpointLocation, "gamma")
+ }
+
// ==============
// Helper Methods
// ==============
/**
* Helper method to run tests with source evolution enabled.
+ * Sets offset log format to V2 (OffsetMap) since named sources require it.
*/
def testWithSourceEvolution(testName: String, testTags: Tag*)(testBody: =>
Any): Unit = {
test(testName, testTags: _*) {
- withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") {
+ withSQLConf(
+ SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true",
Review Comment:
Do we have some negative tests with this feature set to `false` also ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]