HeartSaVioR commented on a change in pull request #30521:
URL: https://github.com/apache/spark/pull/30521#discussion_r535883318
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
##########
@@ -262,8 +250,107 @@ class DataStreamTableAPISuite extends StreamTest with
BeforeAndAfter {
val exc = intercept[AnalysisException] {
runStreamQueryAppendMode(viewIdentifier, checkpointDir, Seq.empty,
Seq.empty)
}
- assert(exc.getMessage.contains("doesn't support streaming write"))
+ assert(exc.getMessage.contains(s"Streaming into views $viewIdentifier is
not supported"))
+ }
+ }
+
+ test("write: write to an external table") {
+ withTempDir { dir =>
+ val tableName = "stream_test"
+ withTable(tableName) {
+ checkForStreamTable(Some(dir), tableName)
+ }
+ }
+ }
+
+ test("write: write to a managed table") {
+ val tableName = "stream_test"
+ withTable(tableName) {
+ checkForStreamTable(None, tableName)
+ }
+ }
+
+ test("write: write to an external table with existing path") {
+ withTempDir { dir =>
+ val tableName = "stream_test"
+ withTable(tableName) {
+ // The file written by batch will not be seen after the table was
written by a streaming
+ // query. This is because we loads files from the metadata log instead
of listing them
Review comment:
nit: `load` instead of `loads`, but as it's a nit, OK to fix in
follow-up PR.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
##########
@@ -262,8 +250,107 @@ class DataStreamTableAPISuite extends StreamTest with
BeforeAndAfter {
val exc = intercept[AnalysisException] {
runStreamQueryAppendMode(viewIdentifier, checkpointDir, Seq.empty,
Seq.empty)
}
- assert(exc.getMessage.contains("doesn't support streaming write"))
+ assert(exc.getMessage.contains(s"Streaming into views $viewIdentifier is
not supported"))
+ }
+ }
+
+ test("write: write to an external table") {
+ withTempDir { dir =>
+ val tableName = "stream_test"
+ withTable(tableName) {
+ checkForStreamTable(Some(dir), tableName)
+ }
+ }
+ }
+
+ test("write: write to a managed table") {
+ val tableName = "stream_test"
+ withTable(tableName) {
+ checkForStreamTable(None, tableName)
+ }
+ }
+
+ test("write: write to an external table with existing path") {
+ withTempDir { dir =>
+ val tableName = "stream_test"
+ withTable(tableName) {
+ // The file written by batch will not be seen after the table was
written by a streaming
+ // query. This is because we loads files from the metadata log instead
of listing them
+ // using HDFS API.
+ Seq(4, 5, 6).toDF("value").write.format("parquet")
+ .option("path", dir.getCanonicalPath).saveAsTable(tableName)
+
+ checkForStreamTable(Some(dir), tableName)
+ }
+ }
+ }
+
+ test("write: write to a managed table with existing path") {
+ val tableName = "stream_test"
+ withTable(tableName) {
+ // The file written by batch will not be seen after the table was
written by a streaming
+ // query. This is because we loads files from the metadata log instead
of listing them
Review comment:
same here
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
##########
@@ -262,8 +250,107 @@ class DataStreamTableAPISuite extends StreamTest with
BeforeAndAfter {
val exc = intercept[AnalysisException] {
runStreamQueryAppendMode(viewIdentifier, checkpointDir, Seq.empty,
Seq.empty)
}
- assert(exc.getMessage.contains("doesn't support streaming write"))
+ assert(exc.getMessage.contains(s"Streaming into views $viewIdentifier is
not supported"))
+ }
+ }
+
+ test("write: write to an external table") {
+ withTempDir { dir =>
+ val tableName = "stream_test"
+ withTable(tableName) {
+ checkForStreamTable(Some(dir), tableName)
+ }
+ }
+ }
+
+ test("write: write to a managed table") {
+ val tableName = "stream_test"
+ withTable(tableName) {
+ checkForStreamTable(None, tableName)
+ }
+ }
+
+ test("write: write to an external table with existing path") {
+ withTempDir { dir =>
+ val tableName = "stream_test"
+ withTable(tableName) {
+ // The file written by batch will not be seen after the table was
written by a streaming
+ // query. This is because we loads files from the metadata log instead
of listing them
+ // using HDFS API.
+ Seq(4, 5, 6).toDF("value").write.format("parquet")
+ .option("path", dir.getCanonicalPath).saveAsTable(tableName)
+
+ checkForStreamTable(Some(dir), tableName)
+ }
+ }
+ }
+
+ test("write: write to a managed table with existing path") {
+ val tableName = "stream_test"
+ withTable(tableName) {
+ // The file written by batch will not be seen after the table was
written by a streaming
+ // query. This is because we loads files from the metadata log instead
of listing them
+ // using HDFS API.
+ Seq(4, 5, 6).toDF("value").write.format("parquet").saveAsTable(tableName)
+
+ checkForStreamTable(None, tableName)
+ }
+ }
+
+ test("write: write to an external path and create table") {
+ withTempDir { dir =>
+ val tableName = "stream_test"
+ withTable(tableName) {
+ // The file written by batch will not be seen after the table was
written by a streaming
+ // query. This is because we loads files from the metadata log instead
of listing them
Review comment:
same here
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]