brkyvz commented on a change in pull request #26957: [SPARK-30314] Add identifier and catalog information to DataSourceV2Relation URL: https://github.com/apache/spark/pull/26957#discussion_r366593674
########## File path: sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala ########## @@ -196,11 +199,81 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with assert(e.getMessage.contains("not support user specified schema")) } + test("DataFrameReader creates v2Relation with identifiers") { + sql(s"create table $catalogName.t1 (id bigint) using $format") + val df = load("t1", Some(catalogName)) + checkV2Identifiers(df.logicalPlan) + } + + test("DataFrameWriter creates v2Relation with identifiers") { + sql(s"create table $catalogName.t1 (id bigint) using $format") + + var plan: LogicalPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.analyzed + + } + override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + } + + spark.listenerManager.register(listener) + + try { + // Test append + save("t1", SaveMode.Append, Some(catalogName)) + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[AppendData]) + val appendRelation = plan.asInstanceOf[AppendData].table + checkV2Identifiers(appendRelation) + + // Test overwrite + save("t1", SaveMode.Overwrite, Some(catalogName)) + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[OverwriteByExpression]) + val overwriteRelation = plan.asInstanceOf[OverwriteByExpression].table + checkV2Identifiers(overwriteRelation) + + // Test insert + spark.range(10).write.format(format).insertInto(s"$catalogName.t1") + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[AppendData]) + val insertRelation = plan.asInstanceOf[AppendData].table + checkV2Identifiers(insertRelation) + + // Test saveAsTable append + spark.range(10).write.format(format).mode(SaveMode.Append).saveAsTable(s"$catalogName.t1") + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[AppendData]) + val saveAsTableRelation = plan.asInstanceOf[AppendData].table + checkV2Identifiers(saveAsTableRelation) + } finally { + spark.listenerManager.unregister(listener) + } + } + + private def checkV2Identifiers( + plan: LogicalPlan, + identifiers: Seq[String] = Seq("t1"), + catalogName: String = catalogName): Unit = { + assert(plan.isInstanceOf[DataSourceV2Relation]) + val v2 = plan.asInstanceOf[DataSourceV2Relation] + assert(v2.identifiers.length == identifiers.length) + assert(identifiers.forall(t => v2.identifiers.exists(_.name() == t))) + assert(v2.catalogIdentifier.exists(_ == catalogName)) + } + private def load(name: String, catalogOpt: Option[String]): DataFrame = { - val dfr = spark.read.format(format).option("name", "t1") + val dfr = spark.read.format(format).option("name", name) catalogOpt.foreach(cName => dfr.option("catalog", cName)) dfr.load() } + + private def save(name: String, mode: SaveMode, catalogOpt: Option[String]): Unit = { + val df = spark.range(10) + df.write.format(format).option("name", name).option("catalog", catalogName) Review comment: `catalogOpt.foreach(cName => dfr.option("catalog", cName))` you're accidentally using `catalogName` here ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org