JiaqiWang18 commented on code in PR #52119: URL: https://github.com/apache/spark/pull/52119#discussion_r2314903396
########## sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala: ########## @@ -434,6 +434,34 @@ class PythonPipelineSuite .map(_.identifier) == Seq(graphIdentifier("a"), graphIdentifier("something"))) } + test("MV/ST with partition columns works") { + val graph = buildGraph(""" + |from pyspark.sql.functions import col + | + |@dp.materialized_view(partition_cols = ["id_mod"]) + |def mv(): + | return spark.range(5).withColumn("id_mod", col("id") % 2) + | + |@dp.table(partition_cols = ["id_mod"]) + |def st(): + | return spark.readStream.table("mv") + |""".stripMargin) + + val updateContext = new PipelineUpdateContextImpl(graph, eventCallback = _ => ()) + updateContext.pipelineExecution.runPipeline() + updateContext.pipelineExecution.awaitCompletion() + + // check table is created with correct partitioning + Seq("mv", "st").foreach { tableName => + val table = spark.sessionState.catalog.getTableMetadata(graphIdentifier(tableName)) + assert(table.partitionColumnNames == Seq("id_mod")) + + val rows = spark.table(tableName).collect().map(r => (r.getLong(0), r.getLong(1))).toSet + val expected = (0 until 5).map(id => (id.toLong, (id % 2).toLong)).toSet + assert(rows == expected) Review Comment: `checkAnswer` is also provided by `PipelineTest` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org