brkyvz commented on a change in pull request #26957: [SPARK-30314] Add 
identifier and catalog information to DataSourceV2Relation
URL: https://github.com/apache/spark/pull/26957#discussion_r366593674
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
 ##########
 @@ -196,11 +199,81 @@ class SupportsCatalogOptionsSuite extends QueryTest with 
SharedSparkSession with
     assert(e.getMessage.contains("not support user specified schema"))
   }
 
+  test("DataFrameReader creates v2Relation with identifiers") {
+    sql(s"create table $catalogName.t1 (id bigint) using $format")
+    val df = load("t1", Some(catalogName))
+    checkV2Identifiers(df.logicalPlan)
+  }
+
+  test("DataFrameWriter creates v2Relation with identifiers") {
+    sql(s"create table $catalogName.t1 (id bigint) using $format")
+
+    var plan: LogicalPlan = null
+    val listener = new QueryExecutionListener {
+      override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
+        plan = qe.analyzed
+
+      }
+      override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = {}
+    }
+
+    spark.listenerManager.register(listener)
+
+    try {
+      // Test append
+      save("t1", SaveMode.Append, Some(catalogName))
+      sparkContext.listenerBus.waitUntilEmpty()
+      assert(plan.isInstanceOf[AppendData])
+      val appendRelation = plan.asInstanceOf[AppendData].table
+      checkV2Identifiers(appendRelation)
+
+      // Test overwrite
+      save("t1", SaveMode.Overwrite, Some(catalogName))
+      sparkContext.listenerBus.waitUntilEmpty()
+      assert(plan.isInstanceOf[OverwriteByExpression])
+      val overwriteRelation = plan.asInstanceOf[OverwriteByExpression].table
+      checkV2Identifiers(overwriteRelation)
+
+      // Test insert
+      spark.range(10).write.format(format).insertInto(s"$catalogName.t1")
+      sparkContext.listenerBus.waitUntilEmpty()
+      assert(plan.isInstanceOf[AppendData])
+      val insertRelation = plan.asInstanceOf[AppendData].table
+      checkV2Identifiers(insertRelation)
+
+      // Test saveAsTable append
+      
spark.range(10).write.format(format).mode(SaveMode.Append).saveAsTable(s"$catalogName.t1")
+      sparkContext.listenerBus.waitUntilEmpty()
+      assert(plan.isInstanceOf[AppendData])
+      val saveAsTableRelation = plan.asInstanceOf[AppendData].table
+      checkV2Identifiers(saveAsTableRelation)
+    } finally {
+      spark.listenerManager.unregister(listener)
+    }
+  }
+
+  private def checkV2Identifiers(
+      plan: LogicalPlan,
+      identifiers: Seq[String] = Seq("t1"),
+      catalogName: String = catalogName): Unit = {
+    assert(plan.isInstanceOf[DataSourceV2Relation])
+    val v2 = plan.asInstanceOf[DataSourceV2Relation]
+    assert(v2.identifiers.length == identifiers.length)
+    assert(identifiers.forall(t => v2.identifiers.exists(_.name() == t)))
+    assert(v2.catalogIdentifier.exists(_ == catalogName))
+  }
+
   private def load(name: String, catalogOpt: Option[String]): DataFrame = {
-    val dfr = spark.read.format(format).option("name", "t1")
+    val dfr = spark.read.format(format).option("name", name)
     catalogOpt.foreach(cName => dfr.option("catalog", cName))
     dfr.load()
   }
+
+  private def save(name: String, mode: SaveMode, catalogOpt: Option[String]): 
Unit = {
+    val df = spark.range(10)
+    df.write.format(format).option("name", name).option("catalog", catalogName)
 
 Review comment:
   `catalogOpt.foreach(cName => dfr.option("catalog", cName))` you're 
accidentally using `catalogName` here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to