[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166394747 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, --- End diff -- I'm not saying that `DataSourceOptions` have to be handled in the relation. Just that the relation should use the same classes to pass data, like `TableIdentifier`, that are used by the rest of the planner. I agree with those benefits of doing this. Is there anything that needs to change in this PR? We can move where the options are created in a follow-up, but let me know if you think this would prevent this from getting merged. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166386661 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, --- End diff -- I think that `TableIdentifier` and a string-to-string `Map` should be passed to `DataSourceV2Relation` and that either the relation or `DataSourceOptions` should be responsible for creating `DataSourceOptions` with well-defined properties to to pass the table information to implementations. This minimizes the number of places that need to handle `DataSourceOptions` (which is specific to v2) and uses `TableIdentifier` and `Map` to match the rest of the planner nodes. For example, other read paths that can create `DataSourceV2Relation`, like resolution rules, use `TableIdentifier`. I'm not currently advocating a position on how to configure `DataFrameReader` or `DataFrameWriter` or how to handle schemas. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 @cloud-fan, this is a single commit on purpose because predicate push-down makes plan changes. I think it's best to do these at once to avoid unnecessary work. That's why I started looking more closely at push-down in the first place: updating the other push-down code for immutable plans was a mess. I also think it is unlikely that we will need to revert the push-down changes here. If we end up redesigning push-down, then it is unlikely that the easiest starting point is to roll back this fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166381800 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator --- End diff -- What do you have in mind to "introduce the concept"? I'm happy to add more docs. Do you want me to add them to this PR or in a follow-up? Are you targeting this for 2.3.0? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166360278 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator --- End diff -- The API is flexible. The problem is that it defaults to no coordination, which cause correctness bugs. The safe option is to coordinate commits by default. If an implementation doesn't change the default, then it at least won't duplicate task outputs in job commit. Worst case is that it takes a little longer for committers that don't need coordination. On the other hand, not making this the default will cause some writers to work most of the time, but duplicate data in some cases. What do you think is the down side to adding this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20495: [SPARK-23327] [SQL] Update the description and te...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20495#discussion_r166358420 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -1655,15 +1655,17 @@ case class Left(str: Expression, len: Expression, child: Expression) extends Run */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(expr) - Returns the character length of `expr` or number of bytes in binary data.", + usage = "_FUNC_(expr) - Returns the character length of `expr` or number of bytes in binary data. " + --- End diff -- +1 for string data / binary data --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166165255 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +"DataSourceV2Relation(" + + s"source=$sourceName${path.orElse(table).map(loc => s"($loc)").getOrElse("")}, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}] options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +path match { + case Some(p) => +updatedOptions.put("path", p) + case None => +updatedOptions.remove("path") +} + +table.map { ident => + updatedOptions.put("table", ident.table) --- End diff -- Opened [SPARK-23341](https://issues.apache.org/jira/browse/SPARK-23341) for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166165005 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +"DataSourceV2Relation(" + + s"source=$sourceName${path.orElse(table).map(loc => s"($loc)").getOrElse("")}, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}] options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +path match { + case Some(p) => +updatedOptions.put("path", p) + case None => +updatedOptions.remove("path") +} + +table.map { ident => + updatedOptions.put("table", ident.table) --- End diff -- I think we agree here. I want to avoid doing this outside of either `DataSourceOptions` or `DataSourceV2Relation`. If we can create `DataSourceOptions` from a `Option[TableIdentifier]` and add the `getTable` accessor, then that works for me. My main motivation is to avoid having this piece of code copied throughout the SQL planner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20490 I've updated this to no longer require #20387. It wasn't relying on those changes at all. @gatorsmile, @cloud-fan, what do you think about getting this into 2.3.0? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166081367 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, --- End diff -- I should clarify this as well: I like the idea of standardizing how implementations access the table name from `DataSourceOptions`. However, I don't think that is sufficient to remove the table and path here in the definition of `DataSourceV2Relation` for the reasons above. I think we *should* follow this commit with a plan for how implementations access these options. For now, it is good to put the creation of those options in a single place. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 > For safety, I wanna keep it unchanged, and start something new for data source v2 only. I disagree. * **#20476 addresses a bug caused by the new implementation that is not a problem if we reuse the current push-down code.** Using an entirely new implementation to push filters and projection is going to introduce bugs, and that problem demonstrates that it is a real risk. * **Using unreliable push-down code is going to make it more difficult for anyone to use the v2 API.** * **This approach throws away work that has accumulated over the past few years that give us confidence in the current push-down code.** The other code paths have push-down tests that will help us catch bugs in the new push-down logic. If we limit the scope of this change to v2, we will not be able to reuse those tests and will have to write entirely new ones that cover all cases. Lastly, I think it is clear that we need a design for a new push-down mechanism. **Adding this to DataSourceV2 as feature creep is not a good way to redesign it.** I'd like to see a design document that addresses some of the open questions. I'd also prefer that this new implementation be removed from the v2 code path for 2.3.0. @marmbrus, what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166030958 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, --- End diff -- @cloud-fan, sorry if it was not clear: Yes, I have considered it and I think it is a bad idea. I sent a note to the dev list about this issue, as well if you want more context. There are two main reasons: 1. Your proposal creates more places that are responsible for creating a `DataSourceOptions` with the right property names. All of the places where we have a `TableIdentifier` and want to convert to a `DataSourceV2Relation` need to copy the same logic and worry about using the same properties. What you propose is hard to maintain and error prone: what happens if we decide not to pass the database if it is `None` in a `TableIdentifier`? We would have to validate every place that creates a v2 relation. On the other hand, if we pass `TableIdentifier` here, we have one code path that converts. It is also easier for us to pass `TableIdentifier` to the data sources if we choose to update the API. 2. There is no reason to use `DataSourceOptions` outside of v2 at this point. This PR doesn't expose the v2-specific options class to other places in the codebase. Instead, it uses a map for generic options and classes that can be used in pattern matching where possible. And again, this has fewer places that create v2 internal classes, which is easier for maintenance. If you want to add those methods to the options class so that implementations can easily access path and table name, then we can do that in a follow-up PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 > Why pushdown is happening in logical optimization and not during query planning. My first instinct would be to have the optimizer get operators as close to the leaves as possible and then fuse (or push down) as we convert to physical plan. I'm probably missing something. I think there are two reasons, but I'm not fully convinced by either one: * [`computeStats`](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala#L232) is defined on logical plans, so the result of filter push-down needs to be a logical plan if we want to be able to use accurate stats for a scan. I'm interested here to ensure that we correctly produce broadcast relations based on the actual scan stats, not the table-level stats. Maybe there's another way to do this? * One of the tests for DSv2 ends up invoking the push-down rule twice, which made me think about whether or not that should be valid. I think it probably should be. For example, what if a plan has nodes that can all be pushed, but they aren't in the right order? Or what if a projection wasn't pushed through a filter because of a rule problem, but it can still be pushed down? Incremental fusing during optimization might be an extensible way to handle odd cases, or it may be useless. I'm not quite sure yet. It would be great to hear your perspective on these. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20490: [SPARK-23323][SQL]: Add support for commit coordinator f...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20490 @dongjoon-hyun, @cloud-fan, @gatorsmile. Once the immutable plan PR is in, this can be reviewed. @steveloughran, I think this is what you were asking for. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Add support for commit coordi...
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/20490 [SPARK-23323][SQL]: Add support for commit coordinator for DataSourceV2 writes ## What changes were proposed in this pull request? DataSourceV2 batch writes should use the output commit coordinator if it is required by the data source. This adds a new method, `DataWriterFactory#useCommitCoordinator`, that determines whether the coordinator will be used. If the write factory returns true, `WriteToDataSourceV2` will use the coordinator for batch writes. This relies on the commits in #20387. Once that is committed, this will be rebased. Only the last commit is part of this PR. ## How was this patch tested? This relies on existing write tests, which now use the commit coordinator. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-23323-add-commit-coordinator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20490.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20490 commit 62c569672083c0fa633da1d6edaba40d0bb05819 Author: Ryan Blue <blue@...> Date: 2018-01-17T21:58:12Z SPARK-22386: DataSourceV2: Use immutable logical plans. commit f0bd45d3c931941b8092cdac738cb29954e0acdd Author: Ryan Blue <blue@...> Date: 2018-01-24T19:34:42Z SPARK-23203: Fix scala style check. commit 2fdeb4556cd22a092630b341a22a16a59e377183 Author: Ryan Blue <blue@...> Date: 2018-01-24T19:54:10Z SPARK-23203: Fix Kafka tests, use StreamingDataSourceV2Relation. This also removes unused imports. commit ab945a19efe666c41deae9c044002f3455220c1d Author: Ryan Blue <blue@...> Date: 2018-02-02T20:30:33Z SPARK-23204: DataFrameReader: Remove v2 table identifier parsing. commit f1d9872a2699cdbd5c87b02e702dc8103335131d Author: Ryan Blue <blue@...> Date: 2018-02-02T21:48:29Z SPARK-23203: Remove import changes from DataSourceV2Utils. commit 288af6a2729c769e0d4075a8f9190958ab5a211c Author: Ryan Blue <blue@...> Date: 2018-02-02T22:21:48Z SPARK-23323: DataSourceV2: support commit coordinator. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20488: [SPARK-23321][SQL]: Validate datasource v2 writes
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/20488 [SPARK-23321][SQL]: Validate datasource v2 writes ## What changes were proposed in this pull request? DataSourceV2 does not currently apply any validation rules when writing. Other write paths attempt to validate that a data frame can be written to a target table or path and these changes add the same logic to v2. This updates the logical plan to use InsertIntoTable and applies the insert preprocess rules to writes. It also adds a conversion rule from InserIntoTable to DataSourceV2Write because InsertIntoTable cannot be used in logical plans after analysis. InsertIntoTable is not necessarily the right logical plan. It assumes that the table exists and can report its schema. ## How was this patch tested? Added a test that fails analysis in the preprocess rule. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-23321-validate-datasource-v2-writes Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20488.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20488 commit 62c569672083c0fa633da1d6edaba40d0bb05819 Author: Ryan Blue <blue@...> Date: 2018-01-17T21:58:12Z SPARK-22386: DataSourceV2: Use immutable logical plans. commit f0bd45d3c931941b8092cdac738cb29954e0acdd Author: Ryan Blue <blue@...> Date: 2018-01-24T19:34:42Z SPARK-23203: Fix scala style check. commit 2fdeb4556cd22a092630b341a22a16a59e377183 Author: Ryan Blue <blue@...> Date: 2018-01-24T19:54:10Z SPARK-23203: Fix Kafka tests, use StreamingDataSourceV2Relation. This also removes unused imports. commit ab945a19efe666c41deae9c044002f3455220c1d Author: Ryan Blue <blue@...> Date: 2018-02-02T20:30:33Z SPARK-23204: DataFrameReader: Remove v2 table identifier parsing. commit 3580daf15497a1d49112a0eddd556f74b9b3e280 Author: Ryan Blue <blue@...> Date: 2018-02-02T19:04:23Z SPARK-23321: Apply preprocess insert rules to DataSourceV2. This updates the DataSourceV2 write path to use DataSourceV2Relation and InsertIntoTable to apply the insert preprocess rules. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 @cloud-fan, @dongjoon-hyun, @gatorsmile, I've rebased this on master and removed the support for SPARK-23204 that parses table identifiers. If you need other changes to get this in, let me know. As far as I'm aware, this isn't targeting 2.3.0 so it makes sense to keep the `PhysicalOperation` push-down rules. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20477: [SPARK-23303][SQL] improve the explain result for...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20477#discussion_r165728696 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -36,11 +38,14 @@ import org.apache.spark.sql.types.StructType */ case class DataSourceV2ScanExec( fullOutput: Seq[AttributeReference], -@transient reader: DataSourceReader) +@transient reader: DataSourceReader, +@transient sourceClass: Class[_ <: DataSourceV2]) extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan { override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec] + override def simpleString: String = s"Scan $metadataString" --- End diff -- +1 for overriding nodeName. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20485: [SPARK-23315][SQL] failed to get output from canonicaliz...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20485 Sounds fine to me, then. My focus is on the long-term design issues. I still think that the changes to make plans immutable and to use the existing push-down code as much as possible is the best way to get a reliable 2.3.0, but it is fine if they don't make the release. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 @cloud-fan, I'll update this PR and we can talk about passing configuration on the dev list. And as a reminder, please close #20445. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 > I tried and can't figure out how to do it with PhysicalOperation, that's why I build something new for data source v2 pushdown. The problem is that we should get DSv2 working independently of a redesign of the push-down rules. Throwing an untested push-down rule into changes for DSv2 makes the new API less reliable, and hurts people that want to try it out and start using it. There is no benefit to doing this for 2.3.0. I also think a redesign of push-down should be properly designed, thought out, and tested. I'm all for fixing this if you can make the case that we need to, but we shouldn't needlessly mix together major changes. @cloud-fan, There's more discussion about this on #20476 that I encourage you to read. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20485: [SPARK-23315][SQL] failed to get output from canonicaliz...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20485 To be clear, the purpose of this commit, like #20476, is just to get something working for the 2.3.0 release? I just want to make sure since I think we should be approaching these problems with a better initial design for the integration. I'm fine getting this in to unblock a release, but if it isn't for that purpose then I think we should fix the design problems first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20476: [SPARK-23301][SQL] data source column pruning should wor...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20476 Yeah, I did review it, but at the time I wasn't familiar with how the other code paths worked and assumed that it was necessary to introduce this. I wasn't very familiar with how it *should* work, so I didn't +1 it. There are a few telling comments though: > How do we know that there aren't more cases that need to be supported? > What are the guarantees made by the previous batches in the optimizer? The work done by FilterAndProject seems redundant to me because the optimizer should already push filters below projection. Is that not guaranteed by the time this runs? In any case, I now think that we should not introduce a new push-down design in conjunction with DSv2. Let's get DSv2 working properly and redesign push-down separately. In parallel is fine by me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20476: [SPARK-23301][SQL] data source column pruning should wor...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20476 @gatorsmile, Do you mean this? > Extensibility is not good and operator push-down capabilities are limited. If so, that's very open to interpretation. I would assume it means that the V2 interfaces should support more than just projection and filter push-down, but not a redesign of how push-down happens in the optimizer. Even if it is called out as a goal, I now see it as a misguided choice. But either way, you make a good point about changing things for a release. I'll defer to your judgement about what should be done for the release. But for the long term, I think this issue underscores my point about reusing code that already works. Let's separate DSv2 from a push-down redesign and get it working reliably without introducing more risk and unknown problems. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20476: [SPARK-23301][SQL] data source column pruning should wor...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20476 @gatorsmile, thanks for the context. If we need to redesign push-down, then I think we should do that separately and with a design plan. **I don't think it's a good idea to bundle it into an unrelated API update.** For one thing, we want to be able to use the existing tests for the redesigned push-down strategy, not reimplement them in pieces. We also don't want to conflate the two changes for early adopters of the new API. V2 should be as reliable as possible by minimizing new behavior. This just isn't the right place to test out experimental designs for push-down operations. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20466: [SPARK-23293][SQL] fix data source v2 self join
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20466 +1 Good to get this in before changes to the relation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20476: [SPARK-23301][SQL] data source column pruning should wor...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20476 @cloud-fan, @gatorsmile, this PR demonstrates why we should use PhysicalOperation. I ported the tests from this PR over to our branch and they pass without modifying the push-down code. That's because it reuses code that we already trust. I'm see no benefit to using a brand new code path for push-down when we can use what is already well tested. I know you want to push other operations, but I've already raised concerns about the design of this new code: it is brittle because it requires matching specific plan nodes. Push-down should work as it always has: by pushing nodes that are adjacent to relations in the logical plan and relying on the optimizer to push projections and filters down as far as possible. The separation of concerns into simple rules is fundamental to the design of the optimizer. I don't think there is a good argument for new code that breaks how the optimizer is intended to work. cc @henryr, who might want to chime in. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 @dongjoon-hyun, @gatorsmile, could you guys weigh in on some this discussion? I'd like to get additional perspectives on the changes I'm proposing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 > Let's keep it general and let the data source to interprete it. I think this is the wrong approach. The reason why we are using a special `DataSourceOptions` object is to ensure that data sources consistently ignore case when reading **their own options**. Consistency across data sources matters and we should be pushing for more consistency, not less. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 @cloud-fan, to your point about push-down order, I'm not saying that order doesn't matter at all, I'm saying that the push-down can run more than once and it should push the closest operators. That way, if you have a situation where operators can't be reordered but they can all be pushed, they all get pushed through multiple runs of the rule, each one further refining the relation. If we do it this way, then we don't need to traverse the logical plan to find out what to push down. We continue pushing projections until the plan stops changing. This is how the rest of the optimizer works, so I think it is a better approach from a design standpoint. My implementation also reuses more existing code that we have higher confidence in, which is a good thing. We can add things like limit pushdown later, by adding it properly to the existing code. I don't see a compelling reason to toss out the existing implementation, especially without the same level of testing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 > `spark.read.format("iceberg").table("db.table").load()` I'm fine with this if you think it is confusing to parse the path as a table name in load. I think it is reasonable. I'd still like to keep the `Option[TableIdentifier]` parameter on the relation, so that we can support `table` or `insertInto` on the write path. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 @felixcheung, yes, we do already have a `table` option. That creates an `UnresolvedRelation` with the parsed table name as a `TableIdentifier`, which is not currently compatible with `DataSourceV2` because there is no standard way to pass the identifier's db and table name. Part of the intent here is to add support in `DataSourceV2Relation` for cases where we have a `TableIdentifier`, so that we can add a resolver rule that replaces `UnresolvedRelation` with `DataSourceV2Relation`. This is what we do in our Spark branch. @cloud-fan, what is your objection to support like this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20454: [SPARK-23202][SQL] Add new API in DataSourceWriter: onDa...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20454 +1 I'd rather not add features without a known use case, but this implementation looks good to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20454: [SPARK-23202][SQL] Add new API in DataSourceWrite...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20454#discussion_r165141464 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -62,6 +62,15 @@ */ DataWriterFactory createWriterFactory(); + /** + * Handles a commit message on receiving from a successful data writer. + * + * If this method fails (by throwing an exception), this writing job is considered to to have been + * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination + * is undefined and {@link #abort(WriterCommitMessage[])} may not be able to deal with it. --- End diff -- What does it mean that "the state of the destination is undefined"? I think it is sufficient to say that `abort` will be called and the contract for aborting commits applies. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20386#discussion_r165138574 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java --- @@ -32,40 +32,44 @@ @InterfaceStability.Evolving public interface StreamWriter extends DataSourceWriter { /** - * Commits this writing job for the specified epoch with a list of commit messages. The commit - * messages are collected from successful data writers and are produced by - * {@link DataWriter#commit()}. + * Commits this writing job for the specified epoch. * - * If this method fails (by throwing an exception), this writing job is considered to have been - * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. + * When this method is called, the number of commit messages added by + * {@link #add(WriterCommitMessage)} equals to the number of input data partitions. + * + * If this method fails (by throwing an exception), this writing job is considered to to have been + * failed, and {@link #abort()} would be called. The state of the destination + * is undefined and @{@link #abort()} may not be able to deal with it. * * To support exactly-once processing, writer implementations should ensure that this method is * idempotent. The execution engine may call commit() multiple times for the same epoch --- End diff -- If that's the case, then this interface should be clear about it instead of including wording about exactly-once. For this interface, there is no exactly-once guarantee. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20448: [SPARK-23203][SQL] make DataSourceV2Relation immutable
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20448 @cloud-fan, **please close this PR**. There is already a pull request for these changes, #20387, and ongoing discussion there. If you want the proposed implementation to change, please ask for changes in a review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20448: [SPARK-23203][SQL] make DataSourceV2Relation immu...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20448#discussion_r165132718 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,36 +17,84 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.StructType +/** + * A logical plan representing a data source relation, which will be planned to a data scan + * operator finally. + * + * @param output The output of this relation. + * @param source The instance of a data source v2 implementation. + * @param options The options specified for this scan, used to create the `DataSourceReader`. + * @param userSpecifiedSchema The user specified schema, used to create the `DataSourceReader`. + * @param filters The predicates which are pushed and handled by this data source. + * @param existingReader A mutable reader carrying some temporary stats during optimization and + * planning. It's always None before optimization, and does not take part in + * the equality of this plan, which means this plan is still immutable. + */ case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) extends LeafNode with DataSourceReaderHolder { +output: Seq[AttributeReference], +source: DataSourceV2, +options: DataSourceOptions, +userSpecifiedSchema: Option[StructType], +filters: Set[Expression], +existingReader: Option[DataSourceReader]) extends LeafNode with DataSourceV2QueryPlan { + + override def references: AttributeSet = AttributeSet.empty + + override def sourceClass: Class[_ <: DataSourceV2] = source.getClass override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] + def reader: DataSourceReader = existingReader.getOrElse { +(source, userSpecifiedSchema) match { + case (ds: ReadSupportWithSchema, Some(schema)) => +ds.createReader(schema, options) + + case (ds: ReadSupport, None) => +ds.createReader(options) + + case (ds: ReadSupport, Some(schema)) => +val reader = ds.createReader(options) +// Sanity check, this should be guaranteed by `DataFrameReader.load` +assert(reader.readSchema() == schema) +reader + + case _ => throw new IllegalStateException() +} + } + override def computeStats(): Statistics = reader match { case r: SupportsReportStatistics => Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes)) case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) } + + override def simpleString: String = s"Relation $metadataString" } /** * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical * to the non-streaming relation. */ -class StreamingDataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) extends DataSourceV2Relation(fullOutput, reader) { +case class StreamingDataSourceV2Relation( --- End diff -- Agreed. That was the plan here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20386#discussion_r165131292 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java --- @@ -32,40 +32,44 @@ @InterfaceStability.Evolving public interface StreamWriter extends DataSourceWriter { /** - * Commits this writing job for the specified epoch with a list of commit messages. The commit - * messages are collected from successful data writers and are produced by - * {@link DataWriter#commit()}. + * Commits this writing job for the specified epoch. * - * If this method fails (by throwing an exception), this writing job is considered to have been - * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. + * When this method is called, the number of commit messages added by + * {@link #add(WriterCommitMessage)} equals to the number of input data partitions. + * + * If this method fails (by throwing an exception), this writing job is considered to to have been + * failed, and {@link #abort()} would be called. The state of the destination + * is undefined and @{@link #abort()} may not be able to deal with it. * * To support exactly-once processing, writer implementations should ensure that this method is * idempotent. The execution engine may call commit() multiple times for the same epoch --- End diff -- For a commit interface, I expect the guarantee to be that data is committed exactly once. If commits are idempotent, data may be reprocessed, and commits may happen more than once, then that is not an exactly-once commit: that is an at-least-once commit. I'm not trying to split hairs. My point is that if there's no difference in behavior between exactly-once and at-least-once because the commit must be idempotent, then you don't actually have a exactly-once guarantee. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20386#discussion_r165121965 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java --- @@ -32,40 +32,44 @@ @InterfaceStability.Evolving public interface StreamWriter extends DataSourceWriter { /** - * Commits this writing job for the specified epoch with a list of commit messages. The commit - * messages are collected from successful data writers and are produced by - * {@link DataWriter#commit()}. + * Commits this writing job for the specified epoch. * - * If this method fails (by throwing an exception), this writing job is considered to have been - * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. + * When this method is called, the number of commit messages added by + * {@link #add(WriterCommitMessage)} equals to the number of input data partitions. + * + * If this method fails (by throwing an exception), this writing job is considered to to have been + * failed, and {@link #abort()} would be called. The state of the destination + * is undefined and @{@link #abort()} may not be able to deal with it. * * To support exactly-once processing, writer implementations should ensure that this method is * idempotent. The execution engine may call commit() multiple times for the same epoch --- End diff -- Thanks for this explanation, I think I see what you're saying. But I think your statement that refers to "true" exactly-once gives away the fact that this does not provide exactly-once semantics. Maybe this is a question for the dev list: why the weaker version? Shouldn't this API provide a check to see whether the data was already committed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20386#discussion_r165119560 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -63,32 +68,42 @@ DataWriterFactory createWriterFactory(); /** - * Commits this writing job with a list of commit messages. The commit messages are collected from - * successful data writers and are produced by {@link DataWriter#commit()}. + * Handles a commit message which is collected from a successful data writer. + * + * Note that, implementations might need to cache all commit messages before calling + * {@link #commit()} or {@link #abort()}. * * If this method fails (by throwing an exception), this writing job is considered to to have been - * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination - * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. + * failed, and {@link #abort()} would be called. The state of the destination + * is undefined and @{@link #abort()} may not be able to deal with it. + */ + void add(WriterCommitMessage message); --- End diff -- +1 for separating and using another PR. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20386#discussion_r165119427 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java --- @@ -32,40 +32,44 @@ @InterfaceStability.Evolving public interface StreamWriter extends DataSourceWriter { /** - * Commits this writing job for the specified epoch with a list of commit messages. The commit - * messages are collected from successful data writers and are produced by - * {@link DataWriter#commit()}. + * Commits this writing job for the specified epoch. * - * If this method fails (by throwing an exception), this writing job is considered to have been - * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. + * When this method is called, the number of commit messages added by + * {@link #add(WriterCommitMessage)} equals to the number of input data partitions. --- End diff -- Passing the messages to commit and abort seems simpler and better to me, but that's for the batch side. And, we shouldn't move forward with this unless there's a use case. As for the docs here, what is an implementer intended to understand as a result of this? "The number of data partitions to write" is also misleading: weren't these already written and committed by tasks? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20386#discussion_r165117779 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java --- @@ -32,40 +32,44 @@ @InterfaceStability.Evolving public interface StreamWriter extends DataSourceWriter { /** - * Commits this writing job for the specified epoch with a list of commit messages. The commit - * messages are collected from successful data writers and are produced by - * {@link DataWriter#commit()}. + * Commits this writing job for the specified epoch. * - * If this method fails (by throwing an exception), this writing job is considered to have been - * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. + * When this method is called, the number of commit messages added by + * {@link #add(WriterCommitMessage)} equals to the number of input data partitions. + * + * If this method fails (by throwing an exception), this writing job is considered to to have been + * failed, and {@link #abort()} would be called. The state of the destination + * is undefined and @{@link #abort()} may not be able to deal with it. * * To support exactly-once processing, writer implementations should ensure that this method is * idempotent. The execution engine may call commit() multiple times for the same epoch * in some circumstances. */ - void commit(long epochId, WriterCommitMessage[] messages); + void commit(long epochId); /** - * Aborts this writing job because some data writers are failed and keep failing when retry, or - * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails. + * Aborts this writing job because some data writers are failed and keep failing when retry, + * or the Spark job fails with some unknown reasons, + * or {@link #commit()} / {@link #add(WriterCommitMessage)} fails * * If this method fails (by throwing an exception), the underlying data source may require manual * cleanup. * - * Unless the abort is triggered by the failure of commit, the given messages should have some - * null slots as there maybe only a few data writers that are committed before the abort - * happens, or some data writers were committed but their commit messages haven't reached the - * driver when the abort is triggered. So this is just a "best effort" for data sources to - * clean up the data left by data writers. + * Unless the abort is triggered by the failure of commit, the number of commit + * messages added by {@link #add(WriterCommitMessage)} should be smaller than the number + * of input data partitions, as there may be only a few data writers that are committed + * before the abort happens, or some data writers were committed but their commit messages + * haven't reached the driver when the abort is triggered. So this is just a "best effort" --- End diff -- Best effort is not just how we describe the behavior, it is a requirement of the contract. Spark should not drop commit messages because it is convenient. Spark knows what tasks succeeded and failed and which ones were authorized to commit. That's enough information to provide the best-effort guarantee. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20386 > I assume this API is necessary . . . it sounds reasonable to provide a callback for task commit. I agree it sounds reasonable, but we shouldn't add methods to a new API blindly and without a use case. The point of a new API, at least in part, is to improve on the old one. If it is never used, then we are carrying support for something that is useless. On the other hand, if it is used we should know what it is needed for so we can design for the use case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20386 @gengliangwang, what is the use case supported by this? In other words, how is `onTaskCommit(taskCommit: TaskCommitMessage)` currently used that requires this change? In general, I'm more concerned with the batch side and I don't have a huge problem with this change. I do want to make sure it is in support of a valid use case. I'd also rather separate the batch and streaming committer APIs because they have so little in common. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20386#discussion_r164909970 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -63,32 +68,42 @@ DataWriterFactory createWriterFactory(); /** - * Commits this writing job with a list of commit messages. The commit messages are collected from - * successful data writers and are produced by {@link DataWriter#commit()}. + * Handles a commit message which is collected from a successful data writer. + * + * Note that, implementations might need to cache all commit messages before calling + * {@link #commit()} or {@link #abort()}. * * If this method fails (by throwing an exception), this writing job is considered to to have been - * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination - * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. + * failed, and {@link #abort()} would be called. The state of the destination + * is undefined and @{@link #abort()} may not be able to deal with it. + */ + void add(WriterCommitMessage message); --- End diff -- This is the only method shared between the stream and batch writers. Why does the streaming interface extend this one? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20386#discussion_r164909653 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java --- @@ -32,40 +32,44 @@ @InterfaceStability.Evolving public interface StreamWriter extends DataSourceWriter { /** - * Commits this writing job for the specified epoch with a list of commit messages. The commit - * messages are collected from successful data writers and are produced by - * {@link DataWriter#commit()}. + * Commits this writing job for the specified epoch. * - * If this method fails (by throwing an exception), this writing job is considered to have been - * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. + * When this method is called, the number of commit messages added by + * {@link #add(WriterCommitMessage)} equals to the number of input data partitions. + * + * If this method fails (by throwing an exception), this writing job is considered to to have been + * failed, and {@link #abort()} would be called. The state of the destination + * is undefined and @{@link #abort()} may not be able to deal with it. * * To support exactly-once processing, writer implementations should ensure that this method is * idempotent. The execution engine may call commit() multiple times for the same epoch --- End diff -- I realize this isn't part of this commit, but why would an exactly-once guarantee require idempotent commits? Processing the same data twice with an idempotent guarantee is not the same thing as exactly-once. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20386#discussion_r164909225 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java --- @@ -32,40 +32,44 @@ @InterfaceStability.Evolving public interface StreamWriter extends DataSourceWriter { /** - * Commits this writing job for the specified epoch with a list of commit messages. The commit - * messages are collected from successful data writers and are produced by - * {@link DataWriter#commit()}. + * Commits this writing job for the specified epoch. * - * If this method fails (by throwing an exception), this writing job is considered to have been - * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. + * When this method is called, the number of commit messages added by + * {@link #add(WriterCommitMessage)} equals to the number of input data partitions. + * + * If this method fails (by throwing an exception), this writing job is considered to to have been + * failed, and {@link #abort()} would be called. The state of the destination + * is undefined and @{@link #abort()} may not be able to deal with it. * * To support exactly-once processing, writer implementations should ensure that this method is * idempotent. The execution engine may call commit() multiple times for the same epoch * in some circumstances. */ - void commit(long epochId, WriterCommitMessage[] messages); + void commit(long epochId); /** - * Aborts this writing job because some data writers are failed and keep failing when retry, or - * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails. + * Aborts this writing job because some data writers are failed and keep failing when retry, + * or the Spark job fails with some unknown reasons, + * or {@link #commit()} / {@link #add(WriterCommitMessage)} fails * * If this method fails (by throwing an exception), the underlying data source may require manual * cleanup. * - * Unless the abort is triggered by the failure of commit, the given messages should have some - * null slots as there maybe only a few data writers that are committed before the abort - * happens, or some data writers were committed but their commit messages haven't reached the - * driver when the abort is triggered. So this is just a "best effort" for data sources to - * clean up the data left by data writers. + * Unless the abort is triggered by the failure of commit, the number of commit + * messages added by {@link #add(WriterCommitMessage)} should be smaller than the number + * of input data partitions, as there may be only a few data writers that are committed + * before the abort happens, or some data writers were committed but their commit messages + * haven't reached the driver when the abort is triggered. So this is just a "best effort" --- End diff -- Commit messages in flight should be handled and aborted. Otherwise, this isn't a "best effort". Best effort means that Spark does everything that is feasible to ensure that commit messages are added before aborting, and that should include race conditions from RPC. The case where "best effort" might miss a message is if the message is created, but a node fails before it is sent to the driver. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20386#discussion_r164908529 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -63,32 +68,42 @@ DataWriterFactory createWriterFactory(); /** - * Commits this writing job with a list of commit messages. The commit messages are collected from - * successful data writers and are produced by {@link DataWriter#commit()}. + * Handles a commit message which is collected from a successful data writer. + * + * Note that, implementations might need to cache all commit messages before calling + * {@link #commit()} or {@link #abort()}. --- End diff -- In what case would an implementation not cache and commit all at once? What is the point of a commit if not to make sure all of the data shows up at the same time? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20386#discussion_r164907626 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java --- @@ -32,40 +32,44 @@ @InterfaceStability.Evolving public interface StreamWriter extends DataSourceWriter { /** - * Commits this writing job for the specified epoch with a list of commit messages. The commit - * messages are collected from successful data writers and are produced by - * {@link DataWriter#commit()}. + * Commits this writing job for the specified epoch. * - * If this method fails (by throwing an exception), this writing job is considered to have been - * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. + * When this method is called, the number of commit messages added by + * {@link #add(WriterCommitMessage)} equals to the number of input data partitions. --- End diff -- What does this mean? It isn't clear to me what "the number of input partitions" means, or why it isn't obvious that it is equal to the number of pending `WriterCommitMessage` instances passed to add. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20386#discussion_r164907397 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java --- @@ -32,40 +32,44 @@ @InterfaceStability.Evolving public interface StreamWriter extends DataSourceWriter { /** - * Commits this writing job for the specified epoch with a list of commit messages. The commit - * messages are collected from successful data writers and are produced by - * {@link DataWriter#commit()}. + * Commits this writing job for the specified epoch. * - * If this method fails (by throwing an exception), this writing job is considered to have been - * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. + * When this method is called, the number of commit messages added by + * {@link #add(WriterCommitMessage)} equals to the number of input data partitions. + * + * If this method fails (by throwing an exception), this writing job is considered to to have been + * failed, and {@link #abort()} would be called. The state of the destination + * is undefined and @{@link #abort()} may not be able to deal with it. --- End diff -- Nit: javadoc typo. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20386 @cloud-fan, is the intent to get this into 2.3.0? If so, I'll make time to review it today. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20427: [SPARK-23260][SPARK-23262][SQL] several data sour...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20427#discussion_r164807449 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java --- @@ -25,7 +25,7 @@ * session. */ @InterfaceStability.Evolving -public interface SessionConfigSupport { +public interface SessionConfigSupport extends DataSourceV2 { --- End diff -- Ping me on the new PR. I'm happy to review it (though it is non-binding). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20427: [SPARK-23260][SPARK-23262][SQL] several data sour...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20427#discussion_r164806676 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java --- @@ -25,7 +25,7 @@ * session. */ @InterfaceStability.Evolving -public interface SessionConfigSupport { +public interface SessionConfigSupport extends DataSourceV2 { --- End diff -- Mixing large migration commits like this one with unrelated changes makes it harder to pick or revert changes without unintended side-effects. What happens if we realize that this rename was a bad idea? Reverting this commit would also revert the constraint that SessionConfigSupport extends DataSourceV2. Similarly, if we realize that these mix-ins don't need to extend DataSourceV2, then we would have to find and remove them all instead of reverting a commit. That might even sound okay, but when you're picking commits deliberately to patch branches, you need to make as few changes as possible and cherry-pick conflicts make that much harder. The fact that you're rushing to get commits into 2.3 is even more concerning and reason to be careful, not a reason to relax our standards. Please move this to its own PR and fix all of the interfaces at once. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20427: [SPARK-23260][SQL] remove V2 from the class name ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20427#discussion_r164621094 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java --- @@ -25,7 +25,7 @@ * session. */ @InterfaceStability.Evolving -public interface SessionConfigSupport { +public interface SessionConfigSupport extends DataSourceV2 { --- End diff -- It's best to keep commits clean and focused. I'd say create a new JIRA for it and do all of the mix-ins at once. +1 when this is separated to its own PR. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 > It's hard to improve PhysicalOperation to support more operators and specific push down orders, so I created the new one I'm concerned about the new one. The projection support seems really brittle because it calls out specific logical nodes and scans the entire plan. If we are doing push-down wrong on the current v1 and Hive code paths, then I'd like to see a proposal for fixing that without these drawbacks. I like that this PR pushes projections and filters just like the other paths. We should start there and add additional push-down as necessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 > [The push-down rule may be run more than once if filters are not pushed through projections] looks weird, do you have a query to reproduce this issue? One of the DataSourceV2 tests hit this. I thought it was a good thing to push a single node down at a time and not depend on order. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 > I'd suggest that we just propogate the paths parameter to options, and data source implementations are free to interprete the path option to whatever they want, e.g. table and database names. What about code paths that expect table names? In our branch, we've added support for converting Hive relations (which have a `TableIdentifier`, not a path) and using `insertInto`. Table names are paths are the two main ways to identify tables and I think both should be supported. This is a new API, so it doesn't matter that `load` and `save` currently use paths. We can easily update that support for tables. If we don't, then there will be no common way to refer to tables: some implementations will use `table`, some will pass `db` separately, and some might use `database`. Standardizing this and adding support in Spark will produce more consistent behavior across data sources. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 > I'm ok to make it immutable if there is an significant benefit. Mutable nodes violate a basic assumption of catalyst, that trees are immutable. Here's a good quote from the SIGMOD paper (by @rxin, @yhuai, and @marmbrus et al.): > In our experience, functional transformations on immutable trees make the whole optimizer very easy to reason about and debug. They also enable parallelization in the optimizer, although we do not yet exploit this. Mixing mutable nodes into supposedly immutable trees is a bad idea. Other nodes in the tree assume that children do not change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20427: [SPARK-23260][SQL] remove V2 from the class name ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20427#discussion_r164544288 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java --- @@ -25,7 +25,7 @@ * session. */ @InterfaceStability.Evolving -public interface SessionConfigSupport { +public interface SessionConfigSupport extends DataSourceV2 { --- End diff -- Why does this need to extend DataSourceV2? Why add this in a commit that appears to be nothing more than a rename? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20427: [SPARK-23260][SQL] remove V2 from the class name ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20427#discussion_r164543216 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java --- @@ -18,23 +18,23 @@ package org.apache.spark.sql.sources.v2; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; /** * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to * provide data reading ability and scan the data from the data source. */ @InterfaceStability.Evolving -public interface ReadSupport { +public interface ReadSupport extends DataSourceV2 { /** - * Creates a {@link DataSourceV2Reader} to scan the data from this data source. + * Creates a {@link DataSourceReader} to scan the data from this data source. * * If this method fails (by throwing an exception), the action would fail and no Spark job was * submitted. * * @param options the options for the returned data source reader, which is an immutable *case-insensitive string-to-string map. */ - DataSourceV2Reader createReader(DataSourceV2Options options); + DataSourceReader createReader(DataSourceV2Options options); --- End diff -- Why not rename options as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFactory i...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20397 This is more confusing, not less. Look at @jiangxb1987's comment above: "We shall create only one DataReaderFactory, and have that create multiple data readers." It is not clear why the API requires a list of factories, instead of using just one. If this is renamed to factory, is it a requirement that the factory can create more than one data reader for the same task? To the point about serializing and sending to executors, "factory" doesn't imply that any more than "task" does. The fact that these are serialized needs to be clear in documentation. The read and write side behave differently. They do not need to mirror one another's naming when that makes names less precise. This isn't forcing users to look at a subtle difference. It is just breaking the (wrong) assumption that both read and write sides have the same behavior. @rxin, any opinion here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFactory i...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20397 > I think the renaming is worth to remove future confusions. What future confusion? I understand that the difference isn't obvious, but making the names less accurate isn't a good fix. The read and write sides don't have to mirror one another. They behave differently and that's okay. Names should be based on what the classes actually do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFactory i...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20397 One last point: should significant changes to public APIs like this go in just before or just after a release? 2.3.0 candidates have used ReadTask up to now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFactory i...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20397 @cloud-fan, thanks for pinging me on this. -1: I don't think there's a compelling benefit to justify this change, and I think it makes the API more confusing. I think we should revert this. This class doesn't actually behave as a factory and is used more like an Iterable: it is only used to instantiate one DataReader and carries no explicit guarantee that it can be reused. In addition, the piece of work that each one represents is a task, which becomes an actual task when the stage runs. I would much rather keep the ReadTask name to make that connection clear. The write side does behave like a factory, so the name is appropriate there. There is little value to uniform names if the names actually make the API more confusing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Us...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r164169060 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,15 +17,149 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, ReadSupport, ReadSupportWithSchema, WriteSupport} import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceV2Reader) extends LeafNode with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, --- End diff -- That's why these are options. Passing either path or table name is the most common case, which we should have good support for. If tables are identified in other ways, that's supported. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Us...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r163913161 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,15 +17,149 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, ReadSupport, ReadSupportWithSchema, WriteSupport} import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceV2Reader) extends LeafNode with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, --- End diff -- I guess another way to say this is that it's better to set reliable path, database, and table parameters after passing the explicitly, than to require all the places where DataSourceV2Relations are created do the same thing. Better to standardize passing these options in `v2Options`, and it would be even better to pass these directly to the readers and writers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Us...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r163909751 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,15 +17,149 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, ReadSupport, ReadSupportWithSchema, WriteSupport} import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceV2Reader) extends LeafNode with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, --- End diff -- I'm not sure I understand what you mean. When something is pushed, it creates a new immutable relation, so I think it has to be added to the relation. But I'm not sure that many things will be pushed besides the projection and filters. What are you thinking that we would need to add? Fragments of logical plan? Assuming we add the ability to push parts of the logical plan, then this would need to have a reference to the part that was pushed down. I'm not sure that would be this relation class, a subclass, or something else, but I would be fine adding a third push-down option here. The number of things to push down isn't very large, is it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Us...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r163908263 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,15 +17,149 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, ReadSupport, ReadSupportWithSchema, WriteSupport} import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceV2Reader) extends LeafNode with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, --- End diff -- We could keep these in options, but because they are the main two ways to identify tables, they should be easier to work with. I'd even suggest adding them to the DataSourceV2 read and write APIs. Another benefit of adding these is that it is easier to use DataSourceV2Relation elsewhere. In our Spark build, I've added a rule to convert Hive relations to DataSourceV2Relation based on a table property. That's cleaner because we can pass the TableIdentifier instead of adding options to the map. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SPARK-23204][SQL]: DataSourceV2: Use immut...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20387 @cloud-fan, please have a look at these changes. This will require follow-up for the Streaming side. I have yet to review the streaming interfaces for `DataSourceV2`, so I haven't made any changes there. In our Spark build, I've also moved the write path to use DataSourceV2Relation, which I intend to do in a follow-up to this issue. @rxin FYI. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: SPARK-22386: DataSourceV2: Use immutable logical ...
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/20387 SPARK-22386: DataSourceV2: Use immutable logical plans. ## What changes were proposed in this pull request? DataSourceV2 should use immutable catalyst trees instead of wrapping a mutable DataSourceV2Reader. This commit updates DataSourceV2Relation and consolidates much of the DataSourceV2 API requirements for the read path in it. Instead of wrapping a reader that changes, the relation lazily produces a reader from its configuration. This commit also updates the predicate and projection push-down. Instead of the implementation from SPARK-22197, this reuses the rule matching from the Hive and DataSource read paths (using `PhysicalOperation`) and copies most of the implementation of `SparkPlanner.pruneFilterProject`, with updates for DataSourceV2. By reusing the implementation from other read paths, this should have fewer regressions from other read paths and is less code to maintain. The new push-down rules also support the following edge cases: * The output of DataSourceV2Relation should be what is returned by the reader, in case the reader can only partially satisfy the requested schema projection * The requested projection passed to the DataSourceV2Reader should include filter columns * The push-down rule may be run more than once if filters are not pushed through projections ## How was this patch tested? Existing push-down and read tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-22386-push-down-immutable-trees Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20387.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20387 commit d3233e1a8b1d4d153146b1a536dee34246920b0d Author: Ryan Blue <blue@...> Date: 2018-01-17T21:58:12Z SPAKR-22386: DataSourceV2: Use immutable logical plans. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19861: [SPARK-22387][SQL] Propagate session configs to data sou...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19861 Thanks for the example, @cloud-fan. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19861: [SPARK-22387][SQL] Propagate session configs to data sou...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19861 @jiangxb1987, I understand what this does. I just wanted an example use case where it was necessary. What was the motivating *use case*? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19861: [SPARK-22387][SQL] Propagate session configs to data sou...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19861 @jiangxb1987, @cloud-fan, what was the use case you needed to add this for? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20201: [SPARK-22389][SQL] data source v2 partitioning reporting...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20201 @cloud-fan, please ping me to review PRs for DataSourceV2. Our new table format uses it and we're preparing some changes, so I want to make sure we're heading in the same direction for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...
Github user rdblue closed the pull request at: https://github.com/apache/spark/pull/19568 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...
GitHub user rdblue reopened a pull request: https://github.com/apache/spark/pull/19568 SPARK-22345: Fix sort-merge joins with conditions and codegen. ## What changes were proposed in this pull request? This adds a joined row to sort-merge join codegen. That joined row is used to generate code for filter expressions, which may fall back to using the result row. Previously, the right side of the join was used, which is incorrect (the non-codegen implementations use a joined row). ## How was this patch tested? Current tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-22345-fix-sort-merge-codegen Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19568.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19568 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19568: SPARK-22345: Fix sort-merge joins with conditions and co...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19568 @gatorsmile, I think it would be better to fix codegen than to prevent it from happening with an assertion. If `CodegenFallback` can produce fallback code, why not allow it to when necessary? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] incubator-toree pull request #147: Support for --spark-context-initializatio...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/147#discussion_r152119598 --- Diff: kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala --- @@ -193,6 +199,19 @@ class CommandLineOptions(args: Seq[String]) { Some(userDefined.asJava) } + private def onlyPositiveOtherwiseDefault(spec: OptionSpec[Int]): Option[Int] = { --- End diff -- I don't think it is a good idea to silently ignore user input. If the value passed is -1, for example, then the user gets 100ms. If what the user requests isn't possible, then this should fail and print the help message. ---
[GitHub] spark pull request #19623: [SPARK-22078][SQL] clarify exception behaviors fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19623#discussion_r148875607 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -50,28 +53,34 @@ /** * Creates a writer factory which will be serialized and sent to executors. + * + * If this method fails (by throwing an exception), the action would fail and no Spark job was + * submitted. */ DataWriterFactory createWriterFactory(); /** * Commits this writing job with a list of commit messages. The commit messages are collected from - * successful data writers and are produced by {@link DataWriter#commit()}. If this method - * fails(throw exception), this writing job is considered to be failed, and - * {@link #abort(WriterCommitMessage[])} will be called. The written data should only be visible - * to data source readers if this method succeeds. + * successful data writers and are produced by {@link DataWriter#commit()}. + * + * If this method fails (by throwing an exception), this writing job is considered to to have been + * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination + * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. * * Note that, one partition may have multiple committed data writers because of speculative tasks. * Spark will pick the first successful one and get its commit message. Implementations should be --- End diff -- I haven't read Steve's points here entirely, but I agree that Spark should be primarily responsible for task commit coordination. Most implementations would be fine using the current [output commit coordinator](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala), which does a good job balancing the trade-offs that you've been discussing. It ensures that only one task is authorized to commit and has well-defined failure cases (when a network partition prevents the authorized committer from responding before its commit authorization times out). I think that Spark should use the current commit coordinator unless an implementation opts out of using it (and I'm not sure that opting out is a use case we care to support at this point). It's fine if Spark documents how its coordinator works and there are some drawbacks, but expecting implementations to handle their own commit coordination (which requires RPC for Spark) is, I think, unreasonable. Let's use the one we have by default, however imperfect. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19623: [SPARK-22078][SQL] clarify exception behaviors fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19623#discussion_r148849054 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java --- @@ -36,14 +36,24 @@ /** * The preferred locations where this read task can run faster, but Spark does not guarantee that * this task will always run on these locations. The implementations should make sure that it can - * be run on any location. The location is a string representing the host name of an executor. + * be run on any location. The location is a string representing the host name. + * + * Note that if a host name cannot be recognized by Spark, it will be ignored as it was not in + * the returned locations. By default this method returns empty string, which means this task --- End diff -- This isn't the empty string, it is a 0-length array. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19623: [SPARK-22078][SQL] clarify exception behaviors fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19623#discussion_r148848790 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java --- @@ -34,11 +35,17 @@ /** * Proceed to next record, returns false if there is no more records. + * + * If this method fails (by throwing an exception), the corresponding Spark task would fail and + * get retried until hitting the maximum retry times. */ - boolean next(); + boolean next() throws IOException; --- End diff -- Should clarify when it is okay to throw IOException with `@throws`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19623: [SPARK-22078][SQL] clarify exception behaviors fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19623#discussion_r148848619 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java --- @@ -30,6 +30,9 @@ /** * Creates a {@link DataSourceV2Reader} to scan the data from this data source. * + * If this method fails (by throwing an exception), the action would fail and no Spark job was --- End diff -- Are there recommended exceptions to throw? It would be great if this specified what exception classes implementations should use for problems with `@throws` javadoc entries. For example, `@throws AnalysisException If ...`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19568: SPARK-22345: Fix sort-merge joins with conditions and co...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19568 @DonnyZone, I don't know of any cases that use codgen after the fix for `CodegenFallback`, but I think this is still a good idea. If Spark is going to generate code, it should generate correct code. That means either we remove the codegen implementation from `CodegenFallback`, or we fix the row passed in by sort-merge join. I'd rather fix sort-merge join because we may want to change the behavior to codegen as much of the condition as possible later on. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13206: [SPARK-15420] [SQL] Add repartition and sort to prepare ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/13206 We still maintain a version of this for our Spark builds to avoid an extra sort in Hive. If someone is willing to review it, I can probably find the time to rebase it on master. I think the year this sat initially was just because the 2.0 release was happening at the same time and there wasn't much bandwidth for reviews. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19568#discussion_r147471530 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala --- @@ -124,7 +125,8 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { rightPlan: SparkPlan) = { val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, boundCondition, leftPlan, rightPlan) - EnsureRequirements(spark.sessionState.conf).apply(sortMergeJoin) + EnsureRequirements(spark.sessionState.conf) + .apply(ProjectExec(sortMergeJoin.output, sortMergeJoin)) --- End diff -- In 2.1.1, an extra project causes `WholeStageCodegenExec` to not detect that the expression contains `CodegenFallback`. This is no longer the case. Like I said, there is no longer a good way to test what happens when `CodegenFallback` generates code. If there were, I'd use that here to test the case. I guess I could add a testing case to `WholeStageCodegenExec` to make sure the code is generated correctly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19568#discussion_r147471054 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala --- @@ -228,6 +230,27 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { ) ) + testInnerJoin( --- End diff -- This test fails in 2.1.1 and versions before https://github.com/apache/spark/commit/6b6dd682e84d3b03d0b15fbd81a0d16729e521d2. I'm not sure how to exercise the code generated by CodegenFallback with that fix, but this test is valid for the 2.1.1 branch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19568#discussion_r14733 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -585,21 +585,26 @@ case class SortMergeJoinExec( val iterator = ctx.freshName("iterator") val numOutput = metricTerm(ctx, "numOutputRows") +val joinedRow = ctx.freshName("joined") val (beforeLoop, condCheck) = if (condition.isDefined) { // Split the code of creating variables based on whether it's used by condition or not. val loaded = ctx.freshName("loaded") val (leftBefore, leftAfter) = splitVarsByCondition(left.output, leftVars) val (rightBefore, rightAfter) = splitVarsByCondition(right.output, rightVars) + // Generate code for condition + ctx.INPUT_ROW = joinedRow --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19568#discussion_r146975643 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -585,21 +585,26 @@ case class SortMergeJoinExec( val iterator = ctx.freshName("iterator") val numOutput = metricTerm(ctx, "numOutputRows") +val joinedRow = ctx.freshName("joined") --- End diff -- The second problem was fixed in this commit: https://github.com/apache/spark/commit/6b6dd682e84d3b03d0b15fbd81a0d16729e521d2 I still think that the codegen problem should be fixed. Detecting `CodgenFallback` is imperfect, but will still generate code and run it. I think we should either remove codegen from `CodegenFallback` or add this fix to ensure that code works, even if we don't expect to run it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19568#discussion_r146974005 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -585,21 +585,26 @@ case class SortMergeJoinExec( val iterator = ctx.freshName("iterator") val numOutput = metricTerm(ctx, "numOutputRows") +val joinedRow = ctx.freshName("joined") --- End diff -- It ended up being a bit more complicated. There are two problems. The first is what this fixes, which is that the INPUT_ROW in the codegen context points to the wrong row. This is fixed and now has a test that fails if you uncomment the line that sets INPUT_ROW. The second problem is in the check for `CodegenFallback` fails to check whether the condition supports codegen in some plans. To get the test to fail, I had to add a projection to exercise the [path where this happens](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L524). I'll add a second commit for this problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19568#discussion_r146970055 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -615,6 +620,7 @@ case class SortMergeJoinExec( } s""" + |$joinedRow = new JoinedRow(); --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17813: [SPARK-20540][CORE] Fix unstable executor request...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/17813#discussion_r146939350 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -589,8 +605,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // take into account executors that are pending to be added or removed. val adjustTotalExecutors = if (!replace) { - doRequestTotalExecutors( -numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) + requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0) + if (requestedTotalExecutors != --- End diff -- This is just informational. The problem is that the state of the allocation manager isn't synced with the scheduler. Instead, the allocator sends messages to try to control the scheduler backend to get the same state. For example, instead of telling the scheduler backend that the desired number of executors is 10, the allocator sends a message to add 2 executors. When this gets out of sync because of failures or network delay, you end up with these messages. When you see these, make sure you're just out of sync (and will eventually get back in sync), and not in a state where the scheduler and allocator can't reconcile the required number of executors. That's what this PR tried to fix. The long-term solution is to update the communication so that the allocator requests its ideal state, always telling the scheduler backend how many executors it currently needs, instead of killing or requesting more. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19568#discussion_r146928894 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -585,21 +585,26 @@ case class SortMergeJoinExec( val iterator = ctx.freshName("iterator") val numOutput = metricTerm(ctx, "numOutputRows") +val joinedRow = ctx.freshName("joined") --- End diff -- The joined row should always be used for correctness. We don't know what code the expression will generate, so we should plan on always passing the correct input row. Setting left and right on a joined row is a cheap operation, so I'd rather do it correctly than rely on something brittle like `isInstanceOf[CodegenFallback]`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19568#discussion_r146928318 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -615,6 +620,7 @@ case class SortMergeJoinExec( } s""" + |$joinedRow = new JoinedRow(); --- End diff -- Yeah, that's causing the test failures. This is a typo from some restructuring I did to get this upstream. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19568: SPARK-22345: Fix sort-merge joins with conditions and co...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19568 @dongjoon-hyun, yes, I'm currently working on it. I just wanted to get the rest up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19568: SPARK-22345: Fix sort-merge joins with conditions...
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/19568 SPARK-22345: Fix sort-merge joins with conditions and codegen. ## What changes were proposed in this pull request? This adds a joined row to sort-merge join codegen. That joined row is used to generate code for filter expressions, which may fall back to using the result row. Previously, the right side of the join was used, which is incorrect (the non-codegen implementations use a joined row). ## How was this patch tested? Current tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-22345-fix-sort-merge-codegen Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19568.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19568 commit 4afb088a4fa2127cab7467cc56f58cd77bd8c251 Author: Ryan Blue <b...@apache.org> Date: 2017-10-24T20:21:50Z SPARK-22345: Fix sort-merge joins with conditions and codegen. Code for the condition was generated to depend on the right row instead of the joined row. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19448 I have a lot of sympathy for the argument that infrastructure software shouldn't have too many backports and that those should be generally bug fixes. But, if I were working on a Spark distribution at a vendor, this is something I would definitely include because it's such a useful feature. I think that by not backporting this, we're just pushing that work downstream. Plus, the risk to adding this is low: the main behavior change is that users can specify a previously-banned committer for Parquet writes. Is it a bug fix? Probably not. But it fixes a big blocker. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144388430 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- I think once per write operation is fine. It's not like it is once per file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19448 Still +1 from me as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144331909 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- I think I'd prefer the warn & continue option. It does little good to fail so late in a job, when the caller has already indicated that they want to use a different committer. Let them write the data out since this isn't a correctness issue, and they can add a summary file later if they want. Basically, there's less annoyance and interruption by not writing a summary file than by failing a job and forcing the user to re-run near the end. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r144097061 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link DataWriter}. + * + * The writing procedure is: + * 1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create the data writer, and write the data of the partition with this + * writer. If all the data are written successfully, call {@link DataWriter#commit()}. If + * exception happens during the writing, call {@link DataWriter#abort()}. + * 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If + * some writers are aborted, or the job failed with an unknown reason, call + * {@link #abort(WriterCommitMessage[])}. + * + * Spark won't retry failed writing jobs, users should do it manually in their Spark applications if + * they want to retry. + * + * Please refer to the document of commit/abort methods for detailed specifications. + * + * Note that, this interface provides a protocol between Spark and data sources for transactional + * data writing, but the transaction here is Spark-level transaction, which may not be the + * underlying storage transaction. For example, Spark successfully writes data to a Cassandra data + * source, but Cassandra may need some more time to reach consistency at storage level. + */ +@InterfaceStability.Evolving +public interface DataSourceV2Writer { + + /** + * Creates a writer factory which will be serialized and sent to executors. + */ + DataWriterFactory createWriterFactory(); + + /** + * Commits this writing job with a list of commit messages. The commit messages are collected from + * successful data writers and are produced by {@link DataWriter#commit()}. If this method + * fails(throw exception), this writing job is considered to be failed, and + * {@link #abort(WriterCommitMessage[])} will be called. The written data should only be visible + * to data source readers if this method successes. + * + * Note that, one partition may have multiple committed data writers because of speculative tasks. + * Spark will pick the first successful one and get its commit message. Implementations should be + * aware of this and handle it correctly, e.g., have a mechanism to make sure only one data writer + * can commit successfully, or have a way to clean up the data of already-committed writers. + */ + void commit(WriterCommitMessage[] messages); + + /** + * Aborts this writing job because some data writers are failed to write the records and aborted, + * or the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} + * fails. If this method fails(throw exception), the underlying data source may have garbage that + * need to be cleaned manually, but these garbage should not be visible to data source readers. + * + * Unless the abortion is triggered by the failure of commit, the given messages should have s