Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12313#discussion_r65776253
  
    --- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
 ---
    @@ -284,8 +284,128 @@ class InsertIntoHiveTableSuite extends QueryTest with 
TestHiveSingleton with Bef
           val data = (1 to 10).map(i => (i.toLong, s"data-$i")).toDF("id", 
"data")
     
           val logical = InsertIntoTable(spark.table("partitioned").logicalPlan,
    -        Map("part" -> None), data.logicalPlan, overwrite = false, 
ifNotExists = false)
    +        Map("part" -> None), data.logicalPlan, overwrite = false, 
ifNotExists = false,
    +        Map("matchByName" -> "true"))
           assert(!logical.resolved, "Should not resolve: missing partition 
data")
         }
       }
    +
    +  test("Insert unnamed expressions by position") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (id bigint, part string)")
    +      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED 
BY (part string)")
    +
    +      val expected = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) 
"even" else "odd"))
    +          .toDF("id", "data", "part")
    +      val data = expected.select("id", "part")
    +
    +      data.write.insertInto("source")
    +      checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
    +
    +      // should be able to insert an expression when NOT mapping columns 
by name
    +      spark.table("source").selectExpr("id", "part", "CONCAT('data-', id)")
    +          .write.insertInto("partitioned")
    +      checkAnswer(sql("SELECT * FROM partitioned"), 
expected.collect().toSeq)
    +    }
    +  }
    +
    +  test("Insert expression by name") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (id bigint, part string)")
    +      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED 
BY (part string)")
    +
    +      val expected = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) 
"even" else "odd"))
    +          .toDF("id", "data", "part")
    +      val data = expected.select("id", "part")
    +
    +      data.write.insertInto("source")
    +      checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
    +
    +      intercept[AnalysisException] {
    +        // also a problem when mapping by name
    +        spark.table("source").selectExpr("id", "part", "CONCAT('data-', 
id)")
    +            .write.option("matchByName", true).insertInto("partitioned")
    +      }
    +
    +      // should be able to insert an expression using AS when mapping 
columns by name
    +      spark.table("source").selectExpr("id", "part", "CONCAT('data-', id) 
as data")
    +          .write.option("matchByName", true).insertInto("partitioned")
    +      checkAnswer(sql("SELECT * FROM partitioned"), 
expected.collect().toSeq)
    +    }
    +  }
    +
    +  test("Reject missing columns") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (id bigint, part string)")
    +      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED 
BY (part string)")
    +
    +      intercept[AnalysisException] {
    +        spark.table("source").write.insertInto("partitioned")
    +      }
    +
    +      intercept[AnalysisException] {
    +        // also a problem when mapping by name
    +        spark.table("source").write.option("matchByName", 
true).insertInto("partitioned")
    +      }
    +    }
    +  }
    +
    +  test("Reject extra columns") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (id bigint, data string, extra string, part 
string)")
    +      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED 
BY (part string)")
    +
    +      intercept[AnalysisException] {
    +        spark.table("source").write.insertInto("partitioned")
    +      }
    +
    +      val data = (1 to 10)
    +          .map(i => (i, s"data-$i", s"${i * i}", if ((i % 2) == 0) "even" 
else "odd"))
    +          .toDF("id", "data", "extra", "part")
    +      data.write.insertInto("source")
    +      checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
    +
    +      spark.table("source").write.option("matchByName", 
true).insertInto("partitioned")
    +
    +      val expected = data.select("id", "data", "part")
    +      checkAnswer(sql("SELECT * FROM partitioned"), 
expected.collect().toSeq)
    +    }
    +  }
    +
    +  test("Ignore names when writing by position") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (id bigint, part string, data string)") // 
part, data transposed
    +      sql("CREATE TABLE destination (id bigint, data string, part string)")
    +
    +      val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) 
"even" else "odd"))
    +          .toDF("id", "data", "part")
    +
    +      // write into the reordered table by name
    +      data.write.option("matchByName", true).insertInto("source")
    +      checkAnswer(sql("SELECT id, data, part FROM source"), 
data.collect().toSeq)
    +
    +      val expected = data.select($"id", $"part" as "data", $"data" as 
"part")
    +
    +      // this produces a warning, but writes src.part -> dest.data and 
src.data -> dest.part
    +      spark.table("source").write.insertInto("destination")
    +      checkAnswer(sql("SELECT id, data, part FROM destination"), 
expected.collect().toSeq)
    +    }
    +  }
    +
    +  test("Reorder columns by name") {
    +    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
    +      sql("CREATE TABLE source (data string, part string, id bigint)")
    +      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED 
BY (part string)")
    +
    +      val data = (1 to 10).map(i => (s"data-$i", if ((i % 2) == 0) "even" 
else "odd", i))
    +          .toDF("data", "part", "id")
    +      data.write.insertInto("source")
    +      checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
    +
    +      spark.table("source").write.option("matchByName", 
true).insertInto("partitioned")
    --- End diff --
    
    I agree that we should take some time to think through the semantics of the 
public API, but I think that it is clear that `InsertIntoTable` should support 
both by-name and by-position resolution. Since that's already implemented but 
not publicly accessible, I think it's fine to continue with this PR rather than 
removing support only to add it later along with the public API. Does that 
sound reasonable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to