HeartSaVioR commented on a change in pull request #26018: 
[SPARK-29352][SQL][SS] Track active streaming queries in the 
SparkSession.sharedState
URL: https://github.com/apache/spark/pull/26018#discussion_r331725084
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
 ##########
 @@ -242,6 +243,83 @@ class StreamingQueryManagerSuite extends StreamTest {
     }
   }
 
+  testQuietly("can't start a streaming query with the same name in the same 
session") {
+    val ds1 = makeDataset._2
+    val ds2 = makeDataset._2
+    val queryName = "abc"
+
+    val query1 = ds1.writeStream.format("noop").queryName(queryName).start()
+    try {
+      val e = intercept[IllegalArgumentException] {
+        ds2.writeStream.format("noop").queryName(queryName).start()
+      }
+      assert(e.getMessage.contains("query with that name is already active"))
+    } finally {
+      query1.stop()
+    }
+  }
+
+  testQuietly("can start a streaming query with the same name in a different 
session") {
+    val session2 = spark.cloneSession()
+
+    val ds1 = MemoryStream(Encoders.INT, spark.sqlContext).toDS()
+    val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
+    val queryName = "abc"
+
+    val query1 = ds1.writeStream.format("noop").queryName(queryName).start()
+    val query2 = ds2.writeStream.format("noop").queryName(queryName).start()
+
+    query1.stop()
+    query2.stop()
+  }
+
+  testQuietly("can't start multiple instances of the same streaming query in 
the same session") {
+    withTempDir { dir =>
+      val session2 = spark.cloneSession()
+
+      val ms1 = MemoryStream(Encoders.INT, spark.sqlContext)
+      val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
+      val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
+      val dataLocation = new File(dir, "data").getCanonicalPath
+
+      val query1 = ms1.toDS().writeStream.format("parquet")
+        .option("checkpointLocation", chkLocation).start(dataLocation)
+      ms1.addData(1, 2, 3)
+      try {
+        val e = intercept[IllegalStateException] {
+          ds2.writeStream.format("parquet")
+            .option("checkpointLocation", chkLocation).start(dataLocation)
+        }
+        assert(e.getMessage.contains("same id"))
+      } finally {
+        query1.stop()
+      }
+    }
+  }
+
+  testQuietly(
 
 Review comment:
   Just a sake of understanding, this patch is intended to prevent this case, 
right?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to