[GitHub] [spark] cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] Support InsertInto through the V2SessionCatalog

2019-08-26 Thread GitBox
cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] 
Support InsertInto through the V2SessionCatalog 
URL: https://github.com/apache/spark/pull/25507#discussion_r317866727
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ##
 @@ -770,40 +760,35 @@ class Analyzer(
 
   object ResolveInsertInto extends Rule[LogicalPlan] {
 override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators 
{
-  case i @ InsertIntoStatement(
-  UnresolvedRelation(CatalogObjectIdentifier(Some(tableCatalog), 
ident)), _, _, _, _)
-  if i.query.resolved =>
-loadTable(tableCatalog, ident)
-.map(DataSourceV2Relation.create)
-.map(relation => {
-  // ifPartitionNotExists is append with validation, but 
validation is not supported
-  if (i.ifPartitionNotExists) {
-throw new AnalysisException(
-  s"Cannot write, IF NOT EXISTS is not supported for table: 
${relation.table.name}")
-  }
-
-  val partCols = partitionColumnNames(relation.table)
-  validatePartitionSpec(partCols, i.partitionSpec)
+  case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if 
i.query.resolved =>
+lookupV2Relation(u) match {
+  case scala.Right(Some(v2Table: Table)) =>
+val relation = DataSourceV2Relation.create(v2Table)
+// ifPartitionNotExists is append with validation, but validation 
is not supported
+if (i.ifPartitionNotExists) {
+  throw new AnalysisException(
+s"Cannot write, IF NOT EXISTS is not supported for table: 
${relation.table.name}")
+}
 
-  val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
-  val query = addStaticPartitionColumns(relation, i.query, 
staticPartitions)
-  val dynamicPartitionOverwrite = partCols.size > 
staticPartitions.size &&
-  conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
+val partCols = partitionColumnNames(relation.table)
+validatePartitionSpec(partCols, i.partitionSpec)
 
-  if (!i.overwrite) {
-AppendData.byPosition(relation, query)
-  } else if (dynamicPartitionOverwrite) {
-OverwritePartitionsDynamic.byPosition(relation, query)
-  } else {
-OverwriteByExpression.byPosition(
-  relation, query, staticDeleteExpression(relation, 
staticPartitions))
-  }
-})
-.getOrElse(i)
+val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
+val query = addStaticPartitionColumns(relation, i.query, 
staticPartitions)
+val dynamicPartitionOverwrite = partCols.size > 
staticPartitions.size &&
+  conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
 
-  case i @ InsertIntoStatement(UnresolvedRelation(AsTableIdentifier(_)), 
_, _, _, _)
-  if i.query.resolved =>
-InsertIntoTable(i.table, i.partitionSpec, i.query, i.overwrite, 
i.ifPartitionNotExists)
+if (!i.overwrite) {
+  AppendData.byPosition(relation, query)
+} else if (dynamicPartitionOverwrite) {
+  OverwritePartitionsDynamic.byPosition(relation, query)
+} else {
+  OverwriteByExpression.byPosition(
+relation, query, staticDeleteExpression(relation, 
staticPartitions))
+}
+  case _ =>
+InsertIntoTable(i.table, i.partitionSpec, i.query, i.overwrite, 
i.ifPartitionNotExists)
 
 Review comment:
   ok let's leave it as it is and clean it up after ALTER TABLE.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] Support InsertInto through the V2SessionCatalog

2019-08-26 Thread GitBox
cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] 
Support InsertInto through the V2SessionCatalog 
URL: https://github.com/apache/spark/pull/25507#discussion_r317867654
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/InsertIntoTests.scala
 ##
 @@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, 
PartitionOverwriteMode}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * A collection of "INSERT INTO" tests that can be run through the SQL or 
DataFrameWriter APIs.
+ * Extending test suites can implement the `doInsert` method to run the insert 
through either
+ * API.
 
 Review comment:
   Thanks for the clarification! Now I understand. `InsertIntoSQLOnlyTests` 
tests features that are only available in SQL, like the PARTITION clause.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] Support InsertInto through the V2SessionCatalog

2019-08-26 Thread GitBox
cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] 
Support InsertInto through the V2SessionCatalog 
URL: https://github.com/apache/spark/pull/25507#discussion_r317866727
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ##
 @@ -770,40 +760,35 @@ class Analyzer(
 
   object ResolveInsertInto extends Rule[LogicalPlan] {
 override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators 
{
-  case i @ InsertIntoStatement(
-  UnresolvedRelation(CatalogObjectIdentifier(Some(tableCatalog), 
ident)), _, _, _, _)
-  if i.query.resolved =>
-loadTable(tableCatalog, ident)
-.map(DataSourceV2Relation.create)
-.map(relation => {
-  // ifPartitionNotExists is append with validation, but 
validation is not supported
-  if (i.ifPartitionNotExists) {
-throw new AnalysisException(
-  s"Cannot write, IF NOT EXISTS is not supported for table: 
${relation.table.name}")
-  }
-
-  val partCols = partitionColumnNames(relation.table)
-  validatePartitionSpec(partCols, i.partitionSpec)
+  case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if 
i.query.resolved =>
+lookupV2Relation(u) match {
+  case scala.Right(Some(v2Table: Table)) =>
+val relation = DataSourceV2Relation.create(v2Table)
+// ifPartitionNotExists is append with validation, but validation 
is not supported
+if (i.ifPartitionNotExists) {
+  throw new AnalysisException(
+s"Cannot write, IF NOT EXISTS is not supported for table: 
${relation.table.name}")
+}
 
-  val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
-  val query = addStaticPartitionColumns(relation, i.query, 
staticPartitions)
-  val dynamicPartitionOverwrite = partCols.size > 
staticPartitions.size &&
-  conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
+val partCols = partitionColumnNames(relation.table)
+validatePartitionSpec(partCols, i.partitionSpec)
 
-  if (!i.overwrite) {
-AppendData.byPosition(relation, query)
-  } else if (dynamicPartitionOverwrite) {
-OverwritePartitionsDynamic.byPosition(relation, query)
-  } else {
-OverwriteByExpression.byPosition(
-  relation, query, staticDeleteExpression(relation, 
staticPartitions))
-  }
-})
-.getOrElse(i)
+val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
+val query = addStaticPartitionColumns(relation, i.query, 
staticPartitions)
+val dynamicPartitionOverwrite = partCols.size > 
staticPartitions.size &&
+  conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
 
-  case i @ InsertIntoStatement(UnresolvedRelation(AsTableIdentifier(_)), 
_, _, _, _)
-  if i.query.resolved =>
-InsertIntoTable(i.table, i.partitionSpec, i.query, i.overwrite, 
i.ifPartitionNotExists)
+if (!i.overwrite) {
+  AppendData.byPosition(relation, query)
+} else if (dynamicPartitionOverwrite) {
+  OverwritePartitionsDynamic.byPosition(relation, query)
+} else {
+  OverwriteByExpression.byPosition(
+relation, query, staticDeleteExpression(relation, 
staticPartitions))
+}
+  case _ =>
+InsertIntoTable(i.table, i.partitionSpec, i.query, i.overwrite, 
i.ifPartitionNotExists)
 
 Review comment:
   ok let's leave it as it is and clean it up after ALTER TABLE.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] Support InsertInto through the V2SessionCatalog

2019-08-26 Thread GitBox
cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] 
Support InsertInto through the V2SessionCatalog 
URL: https://github.com/apache/spark/pull/25507#discussion_r317608095
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/V1WriteFallbackSuite.scala
 ##
 @@ -68,13 +69,25 @@ class V1WriteFallbackSuite extends QueryTest with 
SharedSparkSession with Before
 }
 
 class V1WriteFallbackSessionCatalogSuite
-  extends SessionCatalogTest[InMemoryTableWithV1Fallback, 
V1FallbackTableCatalog] {
+  extends InsertIntoTests(supportsDynamicOverwrite = false, includeSQLTests = 
true)
+  with SessionCatalogTest[InMemoryTableWithV1Fallback, V1FallbackTableCatalog] 
{
+
   override protected val v2Format = classOf[InMemoryV1Provider].getName
   override protected val catalogClassName: String = 
classOf[V1FallbackTableCatalog].getName
+  override protected val catalogAndNamespace: String = ""
 
   override protected def verifyTable(tableName: String, expected: DataFrame): 
Unit = {
 checkAnswer(InMemoryV1Provider.getTableData(spark, s"default.$tableName"), 
expected)
   }
+
+  protected def doInsert(tableName: String, insert: DataFrame, mode: 
SaveMode): Unit = {
+val tmpView = "tmp_view"
+withTempView(tmpView) {
+  insert.createOrReplaceTempView(tmpView)
+  val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO"
+  sql(s"INSERT $overwrite TABLE $tableName SELECT * FROM $tmpView")
 
 Review comment:
   ok seems we do want to run SQL test cases only here. Shouldn't we extend 
`InsertIntoSQLTests` instead of `InsertIntoTests`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] Support InsertInto through the V2SessionCatalog

2019-08-26 Thread GitBox
cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] 
Support InsertInto through the V2SessionCatalog 
URL: https://github.com/apache/spark/pull/25507#discussion_r317607046
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/V1WriteFallbackSuite.scala
 ##
 @@ -68,13 +69,25 @@ class V1WriteFallbackSuite extends QueryTest with 
SharedSparkSession with Before
 }
 
 class V1WriteFallbackSessionCatalogSuite
-  extends SessionCatalogTest[InMemoryTableWithV1Fallback, 
V1FallbackTableCatalog] {
+  extends InsertIntoTests(supportsDynamicOverwrite = false, includeSQLTests = 
true)
+  with SessionCatalogTest[InMemoryTableWithV1Fallback, V1FallbackTableCatalog] 
{
+
   override protected val v2Format = classOf[InMemoryV1Provider].getName
   override protected val catalogClassName: String = 
classOf[V1FallbackTableCatalog].getName
+  override protected val catalogAndNamespace: String = ""
 
   override protected def verifyTable(tableName: String, expected: DataFrame): 
Unit = {
 checkAnswer(InMemoryV1Provider.getTableData(spark, s"default.$tableName"), 
expected)
   }
+
+  protected def doInsert(tableName: String, insert: DataFrame, mode: 
SaveMode): Unit = {
+val tmpView = "tmp_view"
+withTempView(tmpView) {
+  insert.createOrReplaceTempView(tmpView)
+  val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO"
+  sql(s"INSERT $overwrite TABLE $tableName SELECT * FROM $tmpView")
 
 Review comment:
   shouldn't we use `DataFrameWriter.insertInto` here? Otherwise we only test 
the SQL test cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] Support InsertInto through the V2SessionCatalog

2019-08-26 Thread GitBox
cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] 
Support InsertInto through the V2SessionCatalog 
URL: https://github.com/apache/spark/pull/25507#discussion_r317603133
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/InsertIntoTests.scala
 ##
 @@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, 
PartitionOverwriteMode}
+import org.apache.spark.sql.test.SharedSparkSession
+
+abstract class InsertIntoTests(
+override protected val supportsDynamicOverwrite: Boolean,
+override protected val includeSQLTests: Boolean) extends 
InsertIntoSQLTests {
+
+  import testImplicits._
+
+  test("insertInto: append") {
+val t1 = s"${catalogAndNamespace}tbl"
+sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+doInsert(t1, df)
+verifyTable(t1, df)
+  }
+
+  test("insertInto: append by position") {
+val t1 = s"${catalogAndNamespace}tbl"
+sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id")
+
+doInsert(t1, dfr)
+verifyTable(t1, df)
+  }
+
+  test("insertInto: append partitioned table") {
+val t1 = s"${catalogAndNamespace}tbl"
+withTable(t1) {
+  sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
+  val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+  doInsert(t1, df)
+  verifyTable(t1, df)
+}
+  }
+
+  test("insertInto: overwrite non-partitioned table") {
+val t1 = s"${catalogAndNamespace}tbl"
+sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+val df2 = Seq((4L, "d"), (5L, "e"), (6L, "f")).toDF("id", "data")
+doInsert(t1, df)
+doInsert(t1, df2, SaveMode.Overwrite)
+verifyTable(t1, df2)
+  }
+
+  test("insertInto: overwrite partitioned table in static mode") {
+withSQLConf(PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.STATIC.toString) {
+  val t1 = s"${catalogAndNamespace}tbl"
+  sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
+  val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+  doInsert(t1, init)
+
+  val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+  doInsert(t1, df, SaveMode.Overwrite)
+  verifyTable(t1, df)
+}
+  }
+
+
+  test("insertInto: overwrite partitioned table in static mode by position") {
+withSQLConf(PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.STATIC.toString) {
+  val t1 = s"${catalogAndNamespace}tbl"
+  withTable(t1) {
+sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
+val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+doInsert(t1, init)
+
+val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id")
+doInsert(t1, dfr, SaveMode.Overwrite)
+
+val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+verifyTable(t1, df)
+  }
+}
+  }
+
+  test("insertInto: fails when missing a column") {
+val t1 = s"${catalogAndNamespace}tbl"
+sql(s"CREATE TABLE $t1 (id bigint, data string, missing string) USING 
$v2Format")
+val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+val exc = intercept[AnalysisException] {
+  doInsert(t1, df)
+}
+
+verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data", 
"missing"))
+val tableName = if (catalogAndNamespace.isEmpty) s"default.$t1" else t1
+assert(exc.getMessage.contains(s"Cannot write to '$tableName', not enough 
data columns"))
+  }
+
+  test("insertInto: fails when an extra column is present") {
+val t1 = s"${catalogAndNamespace}tbl"
+withTable(t1) {
+  

[GitHub] [spark] cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] Support InsertInto through the V2SessionCatalog

2019-08-26 Thread GitBox
cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] 
Support InsertInto through the V2SessionCatalog 
URL: https://github.com/apache/spark/pull/25507#discussion_r317572985
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ##
 @@ -770,40 +760,35 @@ class Analyzer(
 
   object ResolveInsertInto extends Rule[LogicalPlan] {
 override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators 
{
-  case i @ InsertIntoStatement(
-  UnresolvedRelation(CatalogObjectIdentifier(Some(tableCatalog), 
ident)), _, _, _, _)
-  if i.query.resolved =>
-loadTable(tableCatalog, ident)
-.map(DataSourceV2Relation.create)
-.map(relation => {
-  // ifPartitionNotExists is append with validation, but 
validation is not supported
-  if (i.ifPartitionNotExists) {
-throw new AnalysisException(
-  s"Cannot write, IF NOT EXISTS is not supported for table: 
${relation.table.name}")
-  }
-
-  val partCols = partitionColumnNames(relation.table)
-  validatePartitionSpec(partCols, i.partitionSpec)
+  case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if 
i.query.resolved =>
+lookupV2Relation(u) match {
+  case scala.Right(Some(v2Table: Table)) =>
+val relation = DataSourceV2Relation.create(v2Table)
+// ifPartitionNotExists is append with validation, but validation 
is not supported
+if (i.ifPartitionNotExists) {
+  throw new AnalysisException(
+s"Cannot write, IF NOT EXISTS is not supported for table: 
${relation.table.name}")
+}
 
-  val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
-  val query = addStaticPartitionColumns(relation, i.query, 
staticPartitions)
-  val dynamicPartitionOverwrite = partCols.size > 
staticPartitions.size &&
-  conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
+val partCols = partitionColumnNames(relation.table)
+validatePartitionSpec(partCols, i.partitionSpec)
 
-  if (!i.overwrite) {
-AppendData.byPosition(relation, query)
-  } else if (dynamicPartitionOverwrite) {
-OverwritePartitionsDynamic.byPosition(relation, query)
-  } else {
-OverwriteByExpression.byPosition(
-  relation, query, staticDeleteExpression(relation, 
staticPartitions))
-  }
-})
-.getOrElse(i)
+val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
+val query = addStaticPartitionColumns(relation, i.query, 
staticPartitions)
+val dynamicPartitionOverwrite = partCols.size > 
staticPartitions.size &&
+  conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
 
-  case i @ InsertIntoStatement(UnresolvedRelation(AsTableIdentifier(_)), 
_, _, _, _)
-  if i.query.resolved =>
-InsertIntoTable(i.table, i.partitionSpec, i.query, i.overwrite, 
i.ifPartitionNotExists)
+if (!i.overwrite) {
+  AppendData.byPosition(relation, query)
+} else if (dynamicPartitionOverwrite) {
+  OverwritePartitionsDynamic.byPosition(relation, query)
+} else {
+  OverwriteByExpression.byPosition(
+relation, query, staticDeleteExpression(relation, 
staticPartitions))
+}
+  case _ =>
+InsertIntoTable(i.table, i.partitionSpec, i.query, i.overwrite, 
i.ifPartitionNotExists)
 
 Review comment:
   `lookupV2Relation` needs to return `Either` because we need to catch this 
case and create v1 insert command. I have a simpler idea:
   1. `ResolveInsertInto` only handles v2 insert command, and leave 
`InsertIntoStatement(UnresolvedRelation, ...)` unchanged if we can't load a v2 
table.
   2. `ResolveRelations` catches the `InsertIntoStatement(UnresolvedRelation, 
...)` and convert it to v1 insert command if we can load a v1 table.
   
   By doing this, `lookupV2Relation` doesn't need to distinguish between normal 
v2 catalog and v2 session catalog during table lookup, and can return 
`Option[Table]`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] Support InsertInto through the V2SessionCatalog

2019-08-26 Thread GitBox
cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] 
Support InsertInto through the V2SessionCatalog 
URL: https://github.com/apache/spark/pull/25507#discussion_r317572985
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ##
 @@ -770,40 +760,35 @@ class Analyzer(
 
   object ResolveInsertInto extends Rule[LogicalPlan] {
 override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators 
{
-  case i @ InsertIntoStatement(
-  UnresolvedRelation(CatalogObjectIdentifier(Some(tableCatalog), 
ident)), _, _, _, _)
-  if i.query.resolved =>
-loadTable(tableCatalog, ident)
-.map(DataSourceV2Relation.create)
-.map(relation => {
-  // ifPartitionNotExists is append with validation, but 
validation is not supported
-  if (i.ifPartitionNotExists) {
-throw new AnalysisException(
-  s"Cannot write, IF NOT EXISTS is not supported for table: 
${relation.table.name}")
-  }
-
-  val partCols = partitionColumnNames(relation.table)
-  validatePartitionSpec(partCols, i.partitionSpec)
+  case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if 
i.query.resolved =>
+lookupV2Relation(u) match {
+  case scala.Right(Some(v2Table: Table)) =>
+val relation = DataSourceV2Relation.create(v2Table)
+// ifPartitionNotExists is append with validation, but validation 
is not supported
+if (i.ifPartitionNotExists) {
+  throw new AnalysisException(
+s"Cannot write, IF NOT EXISTS is not supported for table: 
${relation.table.name}")
+}
 
-  val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
-  val query = addStaticPartitionColumns(relation, i.query, 
staticPartitions)
-  val dynamicPartitionOverwrite = partCols.size > 
staticPartitions.size &&
-  conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
+val partCols = partitionColumnNames(relation.table)
+validatePartitionSpec(partCols, i.partitionSpec)
 
-  if (!i.overwrite) {
-AppendData.byPosition(relation, query)
-  } else if (dynamicPartitionOverwrite) {
-OverwritePartitionsDynamic.byPosition(relation, query)
-  } else {
-OverwriteByExpression.byPosition(
-  relation, query, staticDeleteExpression(relation, 
staticPartitions))
-  }
-})
-.getOrElse(i)
+val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
+val query = addStaticPartitionColumns(relation, i.query, 
staticPartitions)
+val dynamicPartitionOverwrite = partCols.size > 
staticPartitions.size &&
+  conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
 
-  case i @ InsertIntoStatement(UnresolvedRelation(AsTableIdentifier(_)), 
_, _, _, _)
-  if i.query.resolved =>
-InsertIntoTable(i.table, i.partitionSpec, i.query, i.overwrite, 
i.ifPartitionNotExists)
+if (!i.overwrite) {
+  AppendData.byPosition(relation, query)
+} else if (dynamicPartitionOverwrite) {
+  OverwritePartitionsDynamic.byPosition(relation, query)
+} else {
+  OverwriteByExpression.byPosition(
+relation, query, staticDeleteExpression(relation, 
staticPartitions))
+}
+  case _ =>
+InsertIntoTable(i.table, i.partitionSpec, i.query, i.overwrite, 
i.ifPartitionNotExists)
 
 Review comment:
   `lookupV2Relation` needs to return `Either` because we need to catch this 
case and create v1 insert command. I have a simpler idea:
   1. `ResolveInsertInto` only handles v2 insert command, and leave 
`InsertIntoStatement(UnresolvedRelation, ...)` unchanged if we can't load a v2 
table.
   2. `ResolveRelations` catches the `InsertIntoStatement(UnresolvedRelation, 
...)` and convert it to v1 insert command if we can load a v1 table.
   
   By doing this, `ResolveInsertInto` doesn't need to distinguish between 
normal v2 catalog and v2 session catalog during table lookup, and can return 
`Option[Table]`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] Support InsertInto through the V2SessionCatalog

2019-08-26 Thread GitBox
cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] 
Support InsertInto through the V2SessionCatalog 
URL: https://github.com/apache/spark/pull/25507#discussion_r317519172
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ##
 @@ -770,40 +760,35 @@ class Analyzer(
 
   object ResolveInsertInto extends Rule[LogicalPlan] {
 override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators 
{
-  case i @ InsertIntoStatement(
 
 Review comment:
   Since we are cleaning up the code here, how about we make the responsibility 
of each rule more clear? What I have in my mind is:
   1. `ResolveInsertInto` should only resolve the `InsertIntoStatement`, 
instead of the relation itself. Ideally it should only match 
`InsertIntoStatement(DataSourceV2Relation, ...)`, and let other rules to 
resolve relations.
   2. `ResolveTables` should resolve `UnresolvedRelation` inside 
`InsertIntoStatement`, similar to the existing `ResolveRelations` rule.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] Support InsertInto through the V2SessionCatalog

2019-08-26 Thread GitBox
cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] 
Support InsertInto through the V2SessionCatalog 
URL: https://github.com/apache/spark/pull/25507#discussion_r317519172
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ##
 @@ -770,40 +760,35 @@ class Analyzer(
 
   object ResolveInsertInto extends Rule[LogicalPlan] {
 override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators 
{
-  case i @ InsertIntoStatement(
 
 Review comment:
   Since we are cleaning up the code here, how about we make the responsibility 
of each rule more clear? What I have in my mind is:
   1. `ResolveInsertInto` should only resolve the `InsertIntoStatement`, 
instead of the relation itself. Ideally it should only match 
`InsertIntoStatement(DataSourceV2Relation, ...)`, and let other rules to 
resolve relations.
   2. `ResolveTables` should resolve `UnresolvedRelation` inside 
`InsertIntoStatement`, similar to the existing `ResolveRelations` rule.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] Support InsertInto through the V2SessionCatalog

2019-08-26 Thread GitBox
cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] 
Support InsertInto through the V2SessionCatalog 
URL: https://github.com/apache/spark/pull/25507#discussion_r317519172
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ##
 @@ -770,40 +760,35 @@ class Analyzer(
 
   object ResolveInsertInto extends Rule[LogicalPlan] {
 override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators 
{
-  case i @ InsertIntoStatement(
 
 Review comment:
   Since we are cleaning up the code here, how about we make the responsibility 
of each rule more clear? What I have in my mind is:
   1. `ResolveInsertInto` should only resolve the `InsertIntoStatement`, 
instead of the relation itself. Ideally it should only match 
`InsertIntoStatement(DataSourceV2Relation, ...)`, and let other rules to 
resolve relations.
   2. `ResolveTables` should resolve `UnresolvedRelation` inside 
`InsertIntoStatement`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org