rdblue commented on a change in pull request #24832: [SPARK-27845][SQL]
DataSourceV2: InsertTable
URL: https://github.com/apache/spark/pull/24832#discussion_r300795247
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
##########
@@ -344,4 +345,113 @@ class DataSourceV2SQLSuite extends QueryTest with
SharedSQLContext with BeforeAn
df_joined)
}
}
+
+ test("InsertTable: append") {
+ val t1 = "testcat.ns1.ns2.tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo")
+ sql(s"INSERT INTO $t1 SELECT id, data FROM source")
+ checkAnswer(spark.table(t1), spark.table("source"))
+ }
+ }
+
+ test("InsertTable: append - across catalog") {
+ val t1 = "testcat.ns1.ns2.tbl"
+ val t2 = "testcat2.db.tbl"
+ withTable(t1, t2) {
+ sql(s"CREATE TABLE $t1 USING foo AS TABLE source")
+ sql(s"CREATE TABLE $t2 (id bigint, data string) USING foo")
+ sql(s"INSERT INTO $t2 SELECT * FROM $t1")
+ checkAnswer(spark.table(t2), spark.table("source"))
+ }
+ }
+
+ test("InsertTable: append partitioned table - dynamic clause") {
+ val t1 = "testcat.ns1.ns2.tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED BY
(id)")
+ sql(s"INSERT INTO TABLE $t1 TABLE source")
+ checkAnswer(spark.table(t1), spark.table("source"))
+ }
+ }
+
+ test("InsertTable: append partitioned table - static clause") {
+ val t1 = "testcat.ns1.ns2.tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED BY
(id)")
+ sql(s"INSERT INTO $t1 PARTITION (id = 23) SELECT data FROM source")
+ checkAnswer(spark.table(t1), sql("SELECT 23, data FROM source"))
+ }
+ }
+
+ test("InsertTable: overwrite non-partitioned table") {
+ val t1 = "testcat.ns1.ns2.tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 USING foo AS TABLE source")
+ sql(s"INSERT OVERWRITE TABLE $t1 TABLE source2")
+ checkAnswer(spark.table(t1), spark.table("source2"))
+ }
+ }
+
+ test("InsertTable: overwrite - dynamic clause - static mode") {
+ withSQLConf(PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.STATIC.toString) {
+ val t1 = "testcat.ns1.ns2.tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED
BY (id)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy'), (4L, 'keep')")
+ sql(s"INSERT OVERWRITE TABLE $t1 TABLE source")
+ checkAnswer(spark.table(t1), spark.table("source"))
+ }
+ }
+ }
+
+ test("InsertTable: overwrite - dynamic clause - dynamic mode") {
+ withSQLConf(PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.DYNAMIC.toString) {
+ val t1 = "testcat.ns1.ns2.tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED
BY (id)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy'), (4L, 'keep')")
+ sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id) TABLE source")
+ checkAnswer(spark.table(t1),
+ spark.table("source").union(sql("SELECT 4L, 'keep'")))
+ }
+ }
+ }
+
+ test("InsertTable: overwrite - static clause") {
+ val t1 = "testcat.ns1.ns2.tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string, p1 int) USING foo
PARTITIONED BY (p1)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 23), (4L, 'keep', 4)")
+ sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p1 = 23) TABLE source")
+ checkAnswer(spark.table(t1),
+ sql("SELECT id, data, 23 FROM source UNION SELECT 4L, 'keep', 4"))
+ }
+ }
+
+ test("InsertTable: overwrite - mixed clause - static mode") {
+ withSQLConf(PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.STATIC.toString) {
+ val t1 = "testcat.ns1.ns2.tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string, p int) USING foo
PARTITIONED BY (id, p)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'keep', 4)")
+ sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p = 2) TABLE source")
+ checkAnswer(spark.table(t1),
+ sql("SELECT id, data, 2 FROM source UNION SELECT 4L, 'keep', 4"))
+ }
+ }
+ }
+
+ test("InsertTable: overwrite - mixed clause - dynamic mode") {
+ withSQLConf(PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.DYNAMIC.toString) {
+ val t1 = "testcat.ns1.ns2.tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string, p int) USING foo
PARTITIONED BY (id, p)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'keep', 4)")
+ sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p = 2, id) TABLE source")
+ checkAnswer(spark.table(t1),
+ sql("SELECT id, data, 2 FROM source UNION SELECT 4L, 'keep', 4"))
Review comment:
The results of this test are identical to the static case. I think both
tests should be updated to include a record that is deleted by static mode and
kept for dynamic mode. Changing `(4L, 'keep', 4)` to `(4L, 'keep', 2)` would do
that, I think.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]