This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4237251 [SPARK-31178][SQL] Prevent V2 exec nodes from executing multiple times 4237251 is described below commit 4237251861c79f3176de7cf5232f0388ec5d946e Author: Burak Yavuz <brk...@gmail.com> AuthorDate: Wed Mar 18 18:07:24 2020 -0700 [SPARK-31178][SQL] Prevent V2 exec nodes from executing multiple times ### What changes were proposed in this pull request? This PR prevents the execution of V2 DataSource exec nodes multiple times when `collect()` is called on them. For V1 DataSources, commands would be executed as a RunnableCommand, which would cache the result as part of the `ExecutedCommandExec` node. We extend `V2CommandExec` for all the data writing commands so that they only get executed once as well. ### Why are the changes needed? Calling `collect()` on a SQL command that inserts data or creates a table gets executed multiple times otherwise. ### Does this PR introduce any user-facing change? Fixes a bug ### How was this patch tested? Unit tests Closes #27941 from brkyvz/doubleInsert. Authored-by: Burak Yavuz <brk...@gmail.com> Signed-off-by: Burak Yavuz <brk...@gmail.com> --- .../datasources/v2/ShowNamespacesExec.scala | 4 +- .../execution/datasources/v2/ShowTablesExec.scala | 4 +- .../datasources/v2/V1FallbackWriters.scala | 10 ++-- .../execution/datasources/v2/V2CommandExec.scala | 6 +- .../datasources/v2/WriteToDataSourceV2Exec.scala | 28 ++++----- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 68 ++++++++++++++++++++++ .../spark/sql/connector/InsertIntoTests.scala | 17 ++++++ 7 files changed, 112 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala index fe3ab80..6f96848 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchem import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.connector.catalog.SupportsNamespaces +import org.apache.spark.sql.execution.LeafExecNode /** * Physical plan node for showing namespaces. @@ -33,8 +34,7 @@ case class ShowNamespacesExec( output: Seq[Attribute], catalog: SupportsNamespaces, namespace: Seq[String], - pattern: Option[String]) - extends V2CommandExec { + pattern: Option[String]) extends V2CommandExec with LeafExecNode { override protected def run(): Seq[InternalRow] = { val namespaces = if (namespace.nonEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala index 995b008..c740e0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchem import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.execution.LeafExecNode /** * Physical plan node for showing tables. @@ -33,8 +34,7 @@ case class ShowTablesExec( output: Seq[Attribute], catalog: TableCatalog, namespace: Seq[String], - pattern: Option[String]) - extends V2CommandExec { + pattern: Option[String]) extends V2CommandExec with LeafExecNode { override protected def run(): Seq[InternalRow] = { val rows = new ArrayBuffer[InternalRow]() val encoder = RowEncoder(schema).resolveAndBind() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index f973000..7502a87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -41,7 +41,7 @@ case class AppendDataExecV1( writeOptions: CaseInsensitiveStringMap, plan: LogicalPlan) extends V1FallbackWriters { - override protected def doExecute(): RDD[InternalRow] = { + override protected def run(): Seq[InternalRow] = { writeWithV1(newWriteBuilder().buildForV1Write()) } } @@ -67,7 +67,7 @@ case class OverwriteByExpressionExecV1( filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] } - override protected def doExecute(): RDD[InternalRow] = { + override protected def run(): Seq[InternalRow] = { newWriteBuilder() match { case builder: SupportsTruncate if isTruncate(deleteWhere) => writeWithV1(builder.truncate().asV1Builder.buildForV1Write()) @@ -82,7 +82,7 @@ case class OverwriteByExpressionExecV1( } /** Some helper interfaces that use V2 write semantics through the V1 writer interface. */ -sealed trait V1FallbackWriters extends SupportsV1Write { +sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write { override def output: Seq[Attribute] = Nil override final def children: Seq[SparkPlan] = Nil @@ -115,8 +115,8 @@ trait SupportsV1Write extends SparkPlan { // TODO: We should be able to work on SparkPlans at this point. def plan: LogicalPlan - protected def writeWithV1(relation: InsertableRelation): RDD[InternalRow] = { + protected def writeWithV1(relation: InsertableRelation): Seq[InternalRow] = { relation.insert(Dataset.ofRows(sqlContext.sparkSession, plan), overwrite = false) - sparkContext.emptyRDD + Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2CommandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2CommandExec.scala index a1f685d..4be4a6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2CommandExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2CommandExec.scala @@ -19,13 +19,13 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.SparkPlan /** * A physical operator that executes run() and saves the result to prevent multiple executions. * Any V2 commands that do not require triggering a spark job should extend this class. */ -abstract class V2CommandExec extends LeafExecNode { +abstract class V2CommandExec extends SparkPlan { /** * Abstract method that each concrete command needs to implement to compute the result. @@ -53,4 +53,6 @@ abstract class V2CommandExec extends LeafExecNode { protected override def doExecute(): RDD[InternalRow] = { sqlContext.sparkContext.parallelize(result, 1) } + + override def children: Seq[SparkPlan] = Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index e360a9e..616e18e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -70,10 +70,10 @@ case class CreateTableAsSelectExec( import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper - override protected def doExecute(): RDD[InternalRow] = { + override protected def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { if (ifNotExists) { - return sparkContext.parallelize(Seq.empty, 1) + return Nil } throw new TableAlreadyExistsException(ident) @@ -125,10 +125,10 @@ case class AtomicCreateTableAsSelectExec( writeOptions: CaseInsensitiveStringMap, ifNotExists: Boolean) extends AtomicTableWriteExec { - override protected def doExecute(): RDD[InternalRow] = { + override protected def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { if (ifNotExists) { - return sparkContext.parallelize(Seq.empty, 1) + return Nil } throw new TableAlreadyExistsException(ident) @@ -161,7 +161,7 @@ case class ReplaceTableAsSelectExec( import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper - override protected def doExecute(): RDD[InternalRow] = { + override protected def run(): Seq[InternalRow] = { // Note that this operation is potentially unsafe, but these are the strict semantics of // RTAS if the catalog does not support atomic operations. // @@ -225,7 +225,7 @@ case class AtomicReplaceTableAsSelectExec( writeOptions: CaseInsensitiveStringMap, orCreate: Boolean) extends AtomicTableWriteExec { - override protected def doExecute(): RDD[InternalRow] = { + override protected def run(): Seq[InternalRow] = { val schema = query.schema.asNullable val staged = if (orCreate) { catalog.stageCreateOrReplace( @@ -255,7 +255,7 @@ case class AppendDataExec( writeOptions: CaseInsensitiveStringMap, query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { - override protected def doExecute(): RDD[InternalRow] = { + override protected def run(): Seq[InternalRow] = { writeWithV2(newWriteBuilder().buildForBatch()) } } @@ -280,7 +280,7 @@ case class OverwriteByExpressionExec( filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] } - override protected def doExecute(): RDD[InternalRow] = { + override protected def run(): Seq[InternalRow] = { newWriteBuilder() match { case builder: SupportsTruncate if isTruncate(deleteWhere) => writeWithV2(builder.truncate().buildForBatch()) @@ -308,7 +308,7 @@ case class OverwritePartitionsDynamicExec( writeOptions: CaseInsensitiveStringMap, query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { - override protected def doExecute(): RDD[InternalRow] = { + override protected def run(): Seq[InternalRow] = { newWriteBuilder() match { case builder: SupportsDynamicOverwrite => writeWithV2(builder.overwriteDynamicPartitions().buildForBatch()) @@ -325,7 +325,7 @@ case class WriteToDataSourceV2Exec( def writeOptions: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty() - override protected def doExecute(): RDD[InternalRow] = { + override protected def run(): Seq[InternalRow] = { writeWithV2(batchWrite) } } @@ -350,7 +350,7 @@ trait BatchWriteHelper { /** * The base physical plan for writing data into data source v2. */ -trait V2TableWriteExec extends UnaryExecNode { +trait V2TableWriteExec extends V2CommandExec with UnaryExecNode { def query: SparkPlan var commitProgress: Option[StreamWriterCommitProgress] = None @@ -358,7 +358,7 @@ trait V2TableWriteExec extends UnaryExecNode { override def child: SparkPlan = query override def output: Seq[Attribute] = Nil - protected def writeWithV2(batchWrite: BatchWrite): RDD[InternalRow] = { + protected def writeWithV2(batchWrite: BatchWrite): Seq[InternalRow] = { val rdd: RDD[InternalRow] = { val tempRdd = query.execute() // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single @@ -415,7 +415,7 @@ trait V2TableWriteExec extends UnaryExecNode { } } - sparkContext.emptyRDD + Nil } } @@ -485,7 +485,7 @@ private[v2] trait AtomicTableWriteExec extends V2TableWriteExec with SupportsV1W protected def writeToStagedTable( stagedTable: StagedTable, writeOptions: CaseInsensitiveStringMap, - ident: Identifier): RDD[InternalRow] = { + ident: Identifier): Seq[InternalRow] = { Utils.tryWithSafeFinallyAndFailureCallbacks({ stagedTable match { case table: SupportsWrite => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index ba4200d..07e0959 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -319,6 +319,37 @@ class DataSourceV2SQLSuite } } + test("CreateTableAsSelect: do not double execute on collect(), take() and other queries") { + val basicCatalog = catalog("testcat").asTableCatalog + val atomicCatalog = catalog("testcat_atomic").asTableCatalog + val basicIdentifier = "testcat.table_name" + val atomicIdentifier = "testcat_atomic.table_name" + + Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach { + case (catalog, identifier) => + val df = spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM source") + + df.collect() + df.take(5) + df.tail(5) + df.where("true").collect() + df.where("true").take(5) + df.where("true").tail(5) + + val table = catalog.loadTable(Identifier.of(Array(), "table_name")) + + assert(table.name == identifier) + assert(table.partitioning.isEmpty) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) + assert(table.schema == new StructType() + .add("id", LongType) + .add("data", StringType)) + + val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) + checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) + } + } + test("ReplaceTableAsSelect: basic v2 implementation.") { val basicCatalog = catalog("testcat").asTableCatalog val atomicCatalog = catalog("testcat_atomic").asTableCatalog @@ -346,6 +377,43 @@ class DataSourceV2SQLSuite } } + Seq("REPLACE", "CREATE OR REPLACE").foreach { cmd => + test(s"ReplaceTableAsSelect: do not double execute $cmd on collect()") { + val basicCatalog = catalog("testcat").asTableCatalog + val atomicCatalog = catalog("testcat_atomic").asTableCatalog + val basicIdentifier = "testcat.table_name" + val atomicIdentifier = "testcat_atomic.table_name" + + Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach { + case (catalog, identifier) => + spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM source") + val originalTable = catalog.loadTable(Identifier.of(Array(), "table_name")) + + val df = spark.sql(s"$cmd TABLE $identifier USING foo AS SELECT id FROM source") + + df.collect() + df.take(5) + df.tail(5) + df.where("true").collect() + df.where("true").take(5) + df.where("true").tail(5) + + val replacedTable = catalog.loadTable(Identifier.of(Array(), "table_name")) + + assert(replacedTable != originalTable, "Table should have been replaced.") + assert(replacedTable.name == identifier) + assert(replacedTable.partitioning.isEmpty) + assert(replacedTable.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) + assert(replacedTable.schema == new StructType().add("id", LongType)) + + val rdd = spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows) + checkAnswer( + spark.internalCreateDataFrame(rdd, replacedTable.schema), + spark.table("source").select("id")) + } + } + } + test("ReplaceTableAsSelect: Non-atomic catalog drops the table if the write fails.") { spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source") val testCatalog = catalog("testcat").asTableCatalog diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala index 0fd6cf1..b88ad52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala @@ -463,5 +463,22 @@ trait InsertIntoSQLOnlyTests } } } + + test("do not double insert on INSERT INTO collect()") { + val t1 = s"${catalogAndNamespace}tbl" + withTableAndData(t1) { view => + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") + val df = sql(s"INSERT INTO TABLE $t1 SELECT * FROM $view") + + df.collect() + df.take(5) + df.tail(5) + df.where("true").collect() + df.where("true").take(5) + df.where("true").tail(5) + + verifyTable(t1, spark.table(view)) + } + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org