Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12313#discussion_r65758947
  
    --- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
 ---
    @@ -284,8 +284,128 @@ class InsertIntoHiveTableSuite extends QueryTest with 
TestHiveSingleton with Bef
           val data = (1 to 10).map(i => (i.toLong, s"data-$i")).toDF("id", 
"data")
     
           val logical = InsertIntoTable(spark.table("partitioned").logicalPlan,
    -        Map("part" -> None), data.logicalPlan, overwrite = false, 
ifNotExists = false)
    +        Map("part" -> None), data.logicalPlan, overwrite = false, 
ifNotExists = false,
    +        Map("matchByName" -> "true"))
           assert(!logical.resolved, "Should not resolve: missing partition 
data")
         }
       }
    +
    +  test("Insert unnamed expressions by position") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (id bigint, part string)")
    +      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED 
BY (part string)")
    +
    +      val expected = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) 
"even" else "odd"))
    +          .toDF("id", "data", "part")
    +      val data = expected.select("id", "part")
    +
    +      data.write.insertInto("source")
    +      checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
    +
    +      // should be able to insert an expression when NOT mapping columns 
by name
    +      spark.table("source").selectExpr("id", "part", "CONCAT('data-', id)")
    +          .write.insertInto("partitioned")
    +      checkAnswer(sql("SELECT * FROM partitioned"), 
expected.collect().toSeq)
    +    }
    +  }
    +
    +  test("Insert expression by name") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (id bigint, part string)")
    +      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED 
BY (part string)")
    +
    +      val expected = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) 
"even" else "odd"))
    +          .toDF("id", "data", "part")
    +      val data = expected.select("id", "part")
    +
    +      data.write.insertInto("source")
    +      checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
    +
    +      intercept[AnalysisException] {
    +        // also a problem when mapping by name
    +        spark.table("source").selectExpr("id", "part", "CONCAT('data-', 
id)")
    +            .write.option("matchByName", true).insertInto("partitioned")
    +      }
    +
    +      // should be able to insert an expression using AS when mapping 
columns by name
    +      spark.table("source").selectExpr("id", "part", "CONCAT('data-', id) 
as data")
    +          .write.option("matchByName", true).insertInto("partitioned")
    +      checkAnswer(sql("SELECT * FROM partitioned"), 
expected.collect().toSeq)
    +    }
    +  }
    +
    +  test("Reject missing columns") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (id bigint, part string)")
    +      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED 
BY (part string)")
    +
    +      intercept[AnalysisException] {
    +        spark.table("source").write.insertInto("partitioned")
    +      }
    +
    +      intercept[AnalysisException] {
    +        // also a problem when mapping by name
    +        spark.table("source").write.option("matchByName", 
true).insertInto("partitioned")
    +      }
    +    }
    +  }
    +
    +  test("Reject extra columns") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (id bigint, data string, extra string, part 
string)")
    +      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED 
BY (part string)")
    +
    +      intercept[AnalysisException] {
    +        spark.table("source").write.insertInto("partitioned")
    +      }
    +
    +      val data = (1 to 10)
    +          .map(i => (i, s"data-$i", s"${i * i}", if ((i % 2) == 0) "even" 
else "odd"))
    +          .toDF("id", "data", "extra", "part")
    +      data.write.insertInto("source")
    +      checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
    +
    +      spark.table("source").write.option("matchByName", 
true).insertInto("partitioned")
    +
    +      val expected = data.select("id", "data", "part")
    +      checkAnswer(sql("SELECT * FROM partitioned"), 
expected.collect().toSeq)
    +    }
    +  }
    +
    +  test("Ignore names when writing by position") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (id bigint, part string, data string)") // 
part, data transposed
    +      sql("CREATE TABLE destination (id bigint, data string, part string)")
    +
    +      val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) 
"even" else "odd"))
    +          .toDF("id", "data", "part")
    +
    +      // write into the reordered table by name
    +      data.write.option("matchByName", true).insertInto("source")
    +      checkAnswer(sql("SELECT id, data, part FROM source"), 
data.collect().toSeq)
    +
    +      val expected = data.select($"id", $"part" as "data", $"data" as 
"part")
    +
    +      // this produces a warning, but writes src.part -> dest.data and 
src.data -> dest.part
    +      spark.table("source").write.insertInto("destination")
    +      checkAnswer(sql("SELECT id, data, part FROM destination"), 
expected.collect().toSeq)
    +    }
    +  }
    +
    +  test("Reorder columns by name") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (data string, part string, id bigint)")
    +      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED 
BY (part string)")
    +
    +      val data = (1 to 10).map(i => (s"data-$i", if ((i % 2) == 0) "even" 
else "odd", i))
    +          .toDF("data", "part", "id")
    +      data.write.insertInto("source")
    +      checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
    +
    +      spark.table("source").write.option("matchByName", 
true).insertInto("partitioned")
    --- End diff --
    
    Yes.
    
    Users call `insertInto` when they need to insert data to an existing table 
and `saveAsTable` when they need to create and insert. Since the latter is 
backed by `CreateTableAsSelect`, it looks like there's a straight-forward 
mapping from Hive SQL to writer commands.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to