[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20387 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r169239000 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,80 @@ package org.apache.spark.sql.execution.datasources.v2 +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Seq[AttributeReference], +filters: Option[Seq[Expression]] = None, +userSpecifiedSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + import DataSourceV2Relation._ + + override def simpleString: String = { +s"DataSourceV2Relation(source=${source.name}, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = projection.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + } + + private lazy val v2Options: DataSourceOptions = makeV2Options(options) + + lazy val ( + reader: DataSourceReader, + unsupportedFilters: Seq[Expression], + pushedFilters: Seq[Expression]) = { +val newReader = userSpecifiedSchema match { + case Some(s) => +source.asReadSupportWithSchema.createReader(s, v2Options) + case _ => +source.asReadSupport.createReader(v2Options) +} + +DataSourceV2Relation.pushRequiredColumns(newReader, projection.toStructType) - override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] +val (remainingFilters, pushedFilters) = filters match { + case Some(filterSeq) => +DataSourceV2Relation.pushFilters(newReader, filterSeq) + case _ => +(Nil, Nil) +} + +(newReader, remainingFilters, pushedFilters) + } + + override def doCanonicalize(): LogicalPlan = { +val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation] + +// override output with canonicalized output to avoid attempting to configure a reader +val canonicalOutput: Seq[AttributeReference] = this.output +.map(a => QueryPlan.normalizeExprId(a, projection)) + +new DataSourceV2Relation(c.source, c.options, c.projection) { --- End diff -- This is hacky but I don't have a better idea now, let's revisit it later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r168933227 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala --- @@ -17,130 +17,55 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, AttributeSet, Expression, NamedExpression, PredicateHelper} -import org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.DataSourceStrategy -import org.apache.spark.sql.sources -import org.apache.spark.sql.sources.v2.reader._ -/** - * Pushes down various operators to the underlying data source for better performance. Operators are - * being pushed down with a specific order. As an example, given a LIMIT has a FILTER child, you - * can't push down LIMIT if FILTER is not completely pushed down. When both are pushed down, the - * data source should execute FILTER before LIMIT. And required columns are calculated at the end, - * because when more operators are pushed down, we may need less columns at Spark side. - */ -object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHelper { - override def apply(plan: LogicalPlan): LogicalPlan = { -// Note that, we need to collect the target operator along with PROJECT node, as PROJECT may -// appear in many places for column pruning. -// TODO: Ideally column pruning should be implemented via a plan property that is propagated -// top-down, then we can simplify the logic here and only collect target operators. -val filterPushed = plan transformUp { - case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, reader)) => -val (candidates, nonDeterministic) = - splitConjunctivePredicates(condition).partition(_.deterministic) - -val stayUpFilters: Seq[Expression] = reader match { - case r: SupportsPushDownCatalystFilters => -r.pushCatalystFilters(candidates.toArray) - - case r: SupportsPushDownFilters => -// A map from original Catalyst expressions to corresponding translated data source -// filters. If a predicate is not in this map, it means it cannot be pushed down. -val translatedMap: Map[Expression, sources.Filter] = candidates.flatMap { p => - DataSourceStrategy.translateFilter(p).map(f => p -> f) -}.toMap - -// Catalyst predicate expressions that cannot be converted to data source filters. -val nonConvertiblePredicates = candidates.filterNot(translatedMap.contains) - -// Data source filters that cannot be pushed down. An unhandled filter means -// the data source cannot guarantee the rows returned can pass the filter. -// As a result we must return it so Spark can plan an extra filter operator. -val unhandledFilters = r.pushFilters(translatedMap.values.toArray).toSet -val unhandledPredicates = translatedMap.filter { case (_, f) => - unhandledFilters.contains(f) -}.keys - -nonConvertiblePredicates ++ unhandledPredicates - - case _ => candidates -} - -val filterCondition = (stayUpFilters ++ nonDeterministic).reduceLeftOption(And) -val withFilter = filterCondition.map(Filter(_, r)).getOrElse(r) -if (withFilter.output == fields) { - withFilter -} else { - Project(fields, withFilter) -} -} - -// TODO: add more push down rules. - -val columnPruned = pushDownRequiredColumns(filterPushed, filterPushed.outputSet) -// After column pruning, we may have redundant PROJECT nodes in the query plan, remove them. -RemoveRedundantProject(columnPruned) - } - - // TODO: nested fields pruning - private def pushDownRequiredColumns( - plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = { -plan match { - case p @ Project(projectList, child) => -val required = projectList.flatMap(_.references) -p.copy(child = pushDownRequiredColumns(child, AttributeSet(required))) - - case f @ Filter(condition, child) => -val required = requiredByParent
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r168910531 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala --- @@ -17,130 +17,55 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, AttributeSet, Expression, NamedExpression, PredicateHelper} -import org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.DataSourceStrategy -import org.apache.spark.sql.sources -import org.apache.spark.sql.sources.v2.reader._ -/** - * Pushes down various operators to the underlying data source for better performance. Operators are - * being pushed down with a specific order. As an example, given a LIMIT has a FILTER child, you - * can't push down LIMIT if FILTER is not completely pushed down. When both are pushed down, the - * data source should execute FILTER before LIMIT. And required columns are calculated at the end, - * because when more operators are pushed down, we may need less columns at Spark side. - */ -object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHelper { - override def apply(plan: LogicalPlan): LogicalPlan = { -// Note that, we need to collect the target operator along with PROJECT node, as PROJECT may -// appear in many places for column pruning. -// TODO: Ideally column pruning should be implemented via a plan property that is propagated -// top-down, then we can simplify the logic here and only collect target operators. -val filterPushed = plan transformUp { - case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, reader)) => -val (candidates, nonDeterministic) = - splitConjunctivePredicates(condition).partition(_.deterministic) - -val stayUpFilters: Seq[Expression] = reader match { - case r: SupportsPushDownCatalystFilters => -r.pushCatalystFilters(candidates.toArray) - - case r: SupportsPushDownFilters => -// A map from original Catalyst expressions to corresponding translated data source -// filters. If a predicate is not in this map, it means it cannot be pushed down. -val translatedMap: Map[Expression, sources.Filter] = candidates.flatMap { p => - DataSourceStrategy.translateFilter(p).map(f => p -> f) -}.toMap - -// Catalyst predicate expressions that cannot be converted to data source filters. -val nonConvertiblePredicates = candidates.filterNot(translatedMap.contains) - -// Data source filters that cannot be pushed down. An unhandled filter means -// the data source cannot guarantee the rows returned can pass the filter. -// As a result we must return it so Spark can plan an extra filter operator. -val unhandledFilters = r.pushFilters(translatedMap.values.toArray).toSet -val unhandledPredicates = translatedMap.filter { case (_, f) => - unhandledFilters.contains(f) -}.keys - -nonConvertiblePredicates ++ unhandledPredicates - - case _ => candidates -} - -val filterCondition = (stayUpFilters ++ nonDeterministic).reduceLeftOption(And) -val withFilter = filterCondition.map(Filter(_, r)).getOrElse(r) -if (withFilter.output == fields) { - withFilter -} else { - Project(fields, withFilter) -} -} - -// TODO: add more push down rules. - -val columnPruned = pushDownRequiredColumns(filterPushed, filterPushed.outputSet) -// After column pruning, we may have redundant PROJECT nodes in the query plan, remove them. -RemoveRedundantProject(columnPruned) - } - - // TODO: nested fields pruning - private def pushDownRequiredColumns( - plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = { -plan match { - case p @ Project(projectList, child) => -val required = projectList.flatMap(_.references) -p.copy(child = pushDownRequiredColumns(child, AttributeSet(required))) - - case f @ Filter(condition, child) => -val required = requiredByPar
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r168910415 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -37,22 +100,147 @@ case class DataSourceV2Relation( } override def newInstance(): DataSourceV2Relation = { -copy(output = output.map(_.newInstance())) +// projection is used to maintain id assignment. +// if projection is not set, use output so the copy is not equal to the original +copy(projection = projection.map(_.newInstance())) } } /** * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical * to the non-streaming relation. */ -class StreamingDataSourceV2Relation( +case class StreamingDataSourceV2Relation( output: Seq[AttributeReference], -reader: DataSourceReader) extends DataSourceV2Relation(output, reader) { +reader: DataSourceReader) +extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation { override def isStreaming: Boolean = true + + override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation] + + override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) + + override def computeStats(): Statistics = reader match { +case r: SupportsReportStatistics => + Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes)) +case _ => + Statistics(sizeInBytes = conf.defaultSizeInBytes) + } } object DataSourceV2Relation { - def apply(reader: DataSourceReader): DataSourceV2Relation = { -new DataSourceV2Relation(reader.readSchema().toAttributes, reader) + private implicit class SourceHelpers(source: DataSourceV2) { +def asReadSupport: ReadSupport = { + source match { +case support: ReadSupport => + support +case _: ReadSupportWithSchema => + // this method is only called if there is no user-supplied schema. if there is no + // user-supplied schema and ReadSupport was not implemented, throw a helpful exception. + throw new AnalysisException(s"Data source requires a user-supplied schema: $name") +case _ => + throw new AnalysisException(s"Data source is not readable: $name") + } +} + +def asReadSupportWithSchema: ReadSupportWithSchema = { + source match { +case support: ReadSupportWithSchema => + support +case _: ReadSupport => + throw new AnalysisException( +s"Data source does not support user-supplied schema: $name") --- End diff -- here we should compare the schema before throwing an exception. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167754111 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -37,22 +100,129 @@ case class DataSourceV2Relation( } override def newInstance(): DataSourceV2Relation = { -copy(output = output.map(_.newInstance())) +// projection is used to maintain id assignment. +// if projection is not set, use output so the copy is not equal to the original +copy(projection = projection.map(_.newInstance())) } } /** * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical * to the non-streaming relation. */ -class StreamingDataSourceV2Relation( +case class StreamingDataSourceV2Relation( output: Seq[AttributeReference], -reader: DataSourceReader) extends DataSourceV2Relation(output, reader) { +reader: DataSourceReader) +extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation { override def isStreaming: Boolean = true + + override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation] + + override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) } object DataSourceV2Relation { - def apply(reader: DataSourceReader): DataSourceV2Relation = { -new DataSourceV2Relation(reader.readSchema().toAttributes, reader) + private implicit class SourceHelpers(source: DataSourceV2) { +def asReadSupport: ReadSupport = { + source match { +case support: ReadSupport => + support +case _: ReadSupportWithSchema => + // this method is only called if there is no user-supplied schema. if there is no + // user-supplied schema and ReadSupport was not implemented, throw a helpful exception. + throw new AnalysisException(s"Data source requires a user-supplied schema: $name") +case _ => + throw new AnalysisException(s"Data source is not readable: $name") + } +} + +def asReadSupportWithSchema: ReadSupportWithSchema = { + source match { +case support: ReadSupportWithSchema => + support +case _: ReadSupport => --- End diff -- There was a historical reason we do this: https://github.com/apache/spark/pull/15046 I agree it's more clear to not allow this since data source v2 is brand new. But this change worths a JIRA ticket and an individual PR, do you mind to create one? Or I can do that for you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167753210 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,80 @@ package org.apache.spark.sql.execution.datasources.v2 +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Seq[AttributeReference], +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { --- End diff -- because we call it `userSpecifiedSchema` in `DataFrameReader` and `DataSource`, I think it's more clear to make the name consistent. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167639047 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,80 @@ package org.apache.spark.sql.execution.datasources.v2 +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Seq[AttributeReference], +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { --- End diff -- I think this is clear. In what way do you think this will be misinterpreted? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167638689 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -37,22 +100,129 @@ case class DataSourceV2Relation( } override def newInstance(): DataSourceV2Relation = { -copy(output = output.map(_.newInstance())) +// projection is used to maintain id assignment. +// if projection is not set, use output so the copy is not equal to the original +copy(projection = projection.map(_.newInstance())) } } /** * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical * to the non-streaming relation. */ -class StreamingDataSourceV2Relation( +case class StreamingDataSourceV2Relation( output: Seq[AttributeReference], -reader: DataSourceReader) extends DataSourceV2Relation(output, reader) { +reader: DataSourceReader) +extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation { override def isStreaming: Boolean = true + + override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation] + + override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) } object DataSourceV2Relation { - def apply(reader: DataSourceReader): DataSourceV2Relation = { -new DataSourceV2Relation(reader.readSchema().toAttributes, reader) + private implicit class SourceHelpers(source: DataSourceV2) { +def asReadSupport: ReadSupport = { + source match { +case support: ReadSupport => + support +case _: ReadSupportWithSchema => + // this method is only called if there is no user-supplied schema. if there is no + // user-supplied schema and ReadSupport was not implemented, throw a helpful exception. + throw new AnalysisException(s"Data source requires a user-supplied schema: $name") +case _ => + throw new AnalysisException(s"Data source is not readable: $name") + } +} + +def asReadSupportWithSchema: ReadSupportWithSchema = { + source match { +case support: ReadSupportWithSchema => + support +case _: ReadSupport => --- End diff -- For your second concern about checking ASAP: this will be done when the relation is first created because projection is required and is always based on the schema returned by a reader. To be more clear about when this should happen, I think that the requirement is for this to happen during job planning and, ideally, before filter push-down. For the case where the user supplies a schema that is identical to the source's schema: I think this might be a bad idea because it will cause confusion when source schemas change. Plus, I can't think of a situation where it is a good idea to pass a schema that is ignored. Here's an example of how this will be confusing: think of a job that supplies a schema identical to the table's schema and runs fine, so it goes into production. What happens when the table's schema changes? If someone adds a column to the table, then the job will start failing and report that the source doesn't support user-supplied schemas, even though it had previously worked just fine with a user-supplied schema. In addition, the change to the table is actually compatible with the old job because the new column will be removed by a projection. To fix this situation, it may be tempting to use the user-supplied schema as an initial projection. But that doesn't make sense because we don't need two projection mechanisms. If we used this as a second way to project, it would be confusing that you can't actually leave out columns (at least for CSV) and it would be odd that using this path you can coerce types, which should usually be done by Spark. I think it is best not to allow a user-supplied schema when it isn't supported by a source. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167385858 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -37,22 +100,129 @@ case class DataSourceV2Relation( } override def newInstance(): DataSourceV2Relation = { -copy(output = output.map(_.newInstance())) +// projection is used to maintain id assignment. +// if projection is not set, use output so the copy is not equal to the original +copy(projection = projection.map(_.newInstance())) } } /** * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical * to the non-streaming relation. */ -class StreamingDataSourceV2Relation( +case class StreamingDataSourceV2Relation( output: Seq[AttributeReference], -reader: DataSourceReader) extends DataSourceV2Relation(output, reader) { +reader: DataSourceReader) +extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation { override def isStreaming: Boolean = true + + override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation] + + override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) } object DataSourceV2Relation { - def apply(reader: DataSourceReader): DataSourceV2Relation = { -new DataSourceV2Relation(reader.readSchema().toAttributes, reader) + private implicit class SourceHelpers(source: DataSourceV2) { +def asReadSupport: ReadSupport = { + source match { +case support: ReadSupport => + support +case _: ReadSupportWithSchema => + // this method is only called if there is no user-supplied schema. if there is no + // user-supplied schema and ReadSupport was not implemented, throw a helpful exception. + throw new AnalysisException(s"Data source requires a user-supplied schema: $name") +case _ => + throw new AnalysisException(s"Data source is not readable: $name") + } +} + +def asReadSupportWithSchema: ReadSupportWithSchema = { + source match { +case support: ReadSupportWithSchema => + support +case _: ReadSupport => --- End diff -- another concern is: this check should be done ASAP so that we can fail earlier. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167385829 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -37,22 +100,129 @@ case class DataSourceV2Relation( } override def newInstance(): DataSourceV2Relation = { -copy(output = output.map(_.newInstance())) +// projection is used to maintain id assignment. +// if projection is not set, use output so the copy is not equal to the original +copy(projection = projection.map(_.newInstance())) } } /** * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical * to the non-streaming relation. */ -class StreamingDataSourceV2Relation( +case class StreamingDataSourceV2Relation( output: Seq[AttributeReference], -reader: DataSourceReader) extends DataSourceV2Relation(output, reader) { +reader: DataSourceReader) +extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation { override def isStreaming: Boolean = true + + override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation] + + override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) } object DataSourceV2Relation { - def apply(reader: DataSourceReader): DataSourceV2Relation = { -new DataSourceV2Relation(reader.readSchema().toAttributes, reader) + private implicit class SourceHelpers(source: DataSourceV2) { +def asReadSupport: ReadSupport = { + source match { +case support: ReadSupport => + support +case _: ReadSupportWithSchema => + // this method is only called if there is no user-supplied schema. if there is no + // user-supplied schema and ReadSupport was not implemented, throw a helpful exception. + throw new AnalysisException(s"Data source requires a user-supplied schema: $name") +case _ => + throw new AnalysisException(s"Data source is not readable: $name") + } +} + +def asReadSupportWithSchema: ReadSupportWithSchema = { + source match { +case support: ReadSupportWithSchema => + support +case _: ReadSupport => --- End diff -- This is different from before: see https://github.com/apache/spark/pull/20387/files#diff-f70bda59304588cc3abfa3a9840653f4L214 Even if `userSpecifiedSchema` is not none, it's still allowed to have `ReadSupport`, as long as its reader's schema is same as the user specified schema. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167385722 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,80 @@ package org.apache.spark.sql.execution.datasources.v2 +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Seq[AttributeReference], +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { --- End diff -- nit: may be more clear to call it `userSpecifiedSchema` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167374952 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, --- End diff -- I mentioned this below, but I should point it out on this thread, too: it is not correct to pass output to this relation. There's no guarantee that output will match the requested projection exactly, so in addition to the problem of leaking v2 details in the planner, this would make it easy to build a relation that doesn't correctly report its output. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167374648 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly --- End diff -- Removing it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167374579 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes --- End diff -- I don't think it is correct to make `output` a constructor parameter here. The v2 read API allows implementations to return rows with a different schema than the one requested, so you don't know whether the projection and the output will actually match until you push the projection and look at the schema the reader returns. If `output` were a constructor parameter, then it would be easy accidentally create instances where the `output` doesn't match the actual rows returned by the source. That's why the current code uses `projection` to pass the requested columns, and always sets `output` correctly. To make the guarantee that the column ids don't change, we don't strictly need `output` to be a constructor param. In fact, right now the only time this matters is when the projection isn't set. Otherwise, the ids are taken from the projection. I've considered a couple of options, like caching the conversion from schema to attributes, but I think the easiest option is to make sure that `projection` is always set. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167369570 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +new DataSourceOptions(options.asJava) + } + + private val sourceName: String = { +source match { + case registered: DataSourceRegister => +registered.shortName() + case _ => +source.getClass.getSimpleName +} + } + + lazy val ( + reader: DataSourceReader, + unsupportedFilters: Seq[Expression], + pushedFilters: Seq[Expression]) = { +val newReader = userSchema match { + case Some(s) => +asReadSupportWithSchema.createReader(s, v2Options) + case _ => +asReadSupport.createReader(v2Options) +} + +projection.foreach { attrs => + DataSourceV2Relation.pushRequiredColumns(newReader, attrs.toStructType) +} + +val (remainingFilters, pushedFilters) = filters match { + case Some(filterSeq) => +DataSourceV2Relation.pushFilters(newReader, filterSeq) + case _ => +(Nil, Nil) +} + +(newReader, remainingFilters, pushedFilters) + } + + private lazy val asReadSupport: ReadSupport = { --- End diff -- I think the implementation is pretty clear: ```java if (ds.isInstanceOf[ReadSupport] || ds.isInstanceOf[ReadSupportWithSchema]) { ... } else { loadV1Source(paths: _*) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167347513 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +new DataSourceOptions(options.asJava) + } + + private val sourceName: String = { +source match { + case registered: DataSourceRegister => +registered.shortName() + case _ => +source.getClass.getSimpleName +} + } + + lazy val ( + reader: DataSourceReader, + unsupportedFilters: Seq[Expression], + pushedFilters: Seq[Expression]) = { +val newReader = userSchema match { + case Some(s) => +asReadSupportWithSchema.createReader(s, v2Options) + case _ => +asReadSupport.createReader(v2Options) +} + +projection.foreach { attrs => + DataSourceV2Relation.pushRequiredColumns(newReader, attrs.toStructType) +} + +val (remainingFilters, pushedFilters) = filters match { + case Some(filterSeq) => +DataSourceV2Relation.pushFilters(newReader, filterSeq) + case _ => +(Nil, Nil) +} + +(newReader, remainingFilters, pushedFilters) + } + + private lazy val asReadSupport: ReadSupport = { --- End diff -- e.g. I can't find where is the old fallback logic (https://github.com/apache/spark/pull/20387/files#diff-f70bda59304588cc3abfa3a9840653f4L198) in the new code. It takes more time for reviewers to make sure the new code is corrected. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167346995 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala --- @@ -17,130 +17,55 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, AttributeSet, Expression, NamedExpression, PredicateHelper} -import org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.DataSourceStrategy -import org.apache.spark.sql.sources -import org.apache.spark.sql.sources.v2.reader._ -/** - * Pushes down various operators to the underlying data source for better performance. Operators are - * being pushed down with a specific order. As an example, given a LIMIT has a FILTER child, you - * can't push down LIMIT if FILTER is not completely pushed down. When both are pushed down, the - * data source should execute FILTER before LIMIT. And required columns are calculated at the end, - * because when more operators are pushed down, we may need less columns at Spark side. - */ -object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHelper { - override def apply(plan: LogicalPlan): LogicalPlan = { -// Note that, we need to collect the target operator along with PROJECT node, as PROJECT may -// appear in many places for column pruning. -// TODO: Ideally column pruning should be implemented via a plan property that is propagated -// top-down, then we can simplify the logic here and only collect target operators. -val filterPushed = plan transformUp { - case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, reader)) => -val (candidates, nonDeterministic) = - splitConjunctivePredicates(condition).partition(_.deterministic) - -val stayUpFilters: Seq[Expression] = reader match { - case r: SupportsPushDownCatalystFilters => -r.pushCatalystFilters(candidates.toArray) - - case r: SupportsPushDownFilters => -// A map from original Catalyst expressions to corresponding translated data source -// filters. If a predicate is not in this map, it means it cannot be pushed down. -val translatedMap: Map[Expression, sources.Filter] = candidates.flatMap { p => - DataSourceStrategy.translateFilter(p).map(f => p -> f) -}.toMap - -// Catalyst predicate expressions that cannot be converted to data source filters. -val nonConvertiblePredicates = candidates.filterNot(translatedMap.contains) - -// Data source filters that cannot be pushed down. An unhandled filter means -// the data source cannot guarantee the rows returned can pass the filter. -// As a result we must return it so Spark can plan an extra filter operator. -val unhandledFilters = r.pushFilters(translatedMap.values.toArray).toSet -val unhandledPredicates = translatedMap.filter { case (_, f) => - unhandledFilters.contains(f) -}.keys - -nonConvertiblePredicates ++ unhandledPredicates - - case _ => candidates -} - -val filterCondition = (stayUpFilters ++ nonDeterministic).reduceLeftOption(And) -val withFilter = filterCondition.map(Filter(_, r)).getOrElse(r) -if (withFilter.output == fields) { - withFilter -} else { - Project(fields, withFilter) -} -} - -// TODO: add more push down rules. - -val columnPruned = pushDownRequiredColumns(filterPushed, filterPushed.outputSet) -// After column pruning, we may have redundant PROJECT nodes in the query plan, remove them. -RemoveRedundantProject(columnPruned) - } - - // TODO: nested fields pruning - private def pushDownRequiredColumns( - plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = { -plan match { - case p @ Project(projectList, child) => -val required = projectList.flatMap(_.references) -p.copy(child = pushDownRequiredColumns(child, AttributeSet(required))) - - case f @ Filter(condition, child) => -val required = requiredByPar
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167346297 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +new DataSourceOptions(options.asJava) + } + + private val sourceName: String = { +source match { + case registered: DataSourceRegister => +registered.shortName() + case _ => +source.getClass.getSimpleName +} + } + + lazy val ( + reader: DataSourceReader, + unsupportedFilters: Seq[Expression], + pushedFilters: Seq[Expression]) = { +val newReader = userSchema match { + case Some(s) => +asReadSupportWithSchema.createReader(s, v2Options) + case _ => +asReadSupport.createReader(v2Options) +} + +projection.foreach { attrs => + DataSourceV2Relation.pushRequiredColumns(newReader, attrs.toStructType) +} + +val (remainingFilters, pushedFilters) = filters match { + case Some(filterSeq) => +DataSourceV2Relation.pushFilters(newReader, filterSeq) + case _ => +(Nil, Nil) +} + +(newReader, remainingFilters, pushedFilters) + } + + private lazy val asReadSupport: ReadSupport = { --- End diff -- Is there a problem? I think this is a clear way to to handle the cases without mixing concerns. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167346043 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala --- @@ -17,130 +17,55 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, AttributeSet, Expression, NamedExpression, PredicateHelper} -import org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.DataSourceStrategy -import org.apache.spark.sql.sources -import org.apache.spark.sql.sources.v2.reader._ -/** - * Pushes down various operators to the underlying data source for better performance. Operators are - * being pushed down with a specific order. As an example, given a LIMIT has a FILTER child, you - * can't push down LIMIT if FILTER is not completely pushed down. When both are pushed down, the - * data source should execute FILTER before LIMIT. And required columns are calculated at the end, - * because when more operators are pushed down, we may need less columns at Spark side. - */ -object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHelper { - override def apply(plan: LogicalPlan): LogicalPlan = { -// Note that, we need to collect the target operator along with PROJECT node, as PROJECT may -// appear in many places for column pruning. -// TODO: Ideally column pruning should be implemented via a plan property that is propagated -// top-down, then we can simplify the logic here and only collect target operators. -val filterPushed = plan transformUp { - case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, reader)) => -val (candidates, nonDeterministic) = - splitConjunctivePredicates(condition).partition(_.deterministic) - -val stayUpFilters: Seq[Expression] = reader match { - case r: SupportsPushDownCatalystFilters => -r.pushCatalystFilters(candidates.toArray) - - case r: SupportsPushDownFilters => -// A map from original Catalyst expressions to corresponding translated data source -// filters. If a predicate is not in this map, it means it cannot be pushed down. -val translatedMap: Map[Expression, sources.Filter] = candidates.flatMap { p => - DataSourceStrategy.translateFilter(p).map(f => p -> f) -}.toMap - -// Catalyst predicate expressions that cannot be converted to data source filters. -val nonConvertiblePredicates = candidates.filterNot(translatedMap.contains) - -// Data source filters that cannot be pushed down. An unhandled filter means -// the data source cannot guarantee the rows returned can pass the filter. -// As a result we must return it so Spark can plan an extra filter operator. -val unhandledFilters = r.pushFilters(translatedMap.values.toArray).toSet -val unhandledPredicates = translatedMap.filter { case (_, f) => - unhandledFilters.contains(f) -}.keys - -nonConvertiblePredicates ++ unhandledPredicates - - case _ => candidates -} - -val filterCondition = (stayUpFilters ++ nonDeterministic).reduceLeftOption(And) -val withFilter = filterCondition.map(Filter(_, r)).getOrElse(r) -if (withFilter.output == fields) { - withFilter -} else { - Project(fields, withFilter) -} -} - -// TODO: add more push down rules. - -val columnPruned = pushDownRequiredColumns(filterPushed, filterPushed.outputSet) -// After column pruning, we may have redundant PROJECT nodes in the query plan, remove them. -RemoveRedundantProject(columnPruned) - } - - // TODO: nested fields pruning - private def pushDownRequiredColumns( - plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = { -plan match { - case p @ Project(projectList, child) => -val required = projectList.flatMap(_.references) -p.copy(child = pushDownRequiredColumns(child, AttributeSet(required))) - - case f @ Filter(condition, child) => -val required = requiredByParent
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167345858 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes --- End diff -- This is currently handled by setting the projection to always get the same ids, but it may not cover all copy cases. I'll look into how we can reliably move output to the constructor params, but this needs to avoid forcing callers to configure a reader. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167344956 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala --- @@ -17,130 +17,55 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, AttributeSet, Expression, NamedExpression, PredicateHelper} -import org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.DataSourceStrategy -import org.apache.spark.sql.sources -import org.apache.spark.sql.sources.v2.reader._ -/** - * Pushes down various operators to the underlying data source for better performance. Operators are - * being pushed down with a specific order. As an example, given a LIMIT has a FILTER child, you - * can't push down LIMIT if FILTER is not completely pushed down. When both are pushed down, the - * data source should execute FILTER before LIMIT. And required columns are calculated at the end, - * because when more operators are pushed down, we may need less columns at Spark side. - */ -object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHelper { - override def apply(plan: LogicalPlan): LogicalPlan = { -// Note that, we need to collect the target operator along with PROJECT node, as PROJECT may -// appear in many places for column pruning. -// TODO: Ideally column pruning should be implemented via a plan property that is propagated -// top-down, then we can simplify the logic here and only collect target operators. -val filterPushed = plan transformUp { - case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, reader)) => -val (candidates, nonDeterministic) = - splitConjunctivePredicates(condition).partition(_.deterministic) - -val stayUpFilters: Seq[Expression] = reader match { - case r: SupportsPushDownCatalystFilters => -r.pushCatalystFilters(candidates.toArray) - - case r: SupportsPushDownFilters => -// A map from original Catalyst expressions to corresponding translated data source -// filters. If a predicate is not in this map, it means it cannot be pushed down. -val translatedMap: Map[Expression, sources.Filter] = candidates.flatMap { p => - DataSourceStrategy.translateFilter(p).map(f => p -> f) -}.toMap - -// Catalyst predicate expressions that cannot be converted to data source filters. -val nonConvertiblePredicates = candidates.filterNot(translatedMap.contains) - -// Data source filters that cannot be pushed down. An unhandled filter means -// the data source cannot guarantee the rows returned can pass the filter. -// As a result we must return it so Spark can plan an extra filter operator. -val unhandledFilters = r.pushFilters(translatedMap.values.toArray).toSet -val unhandledPredicates = translatedMap.filter { case (_, f) => - unhandledFilters.contains(f) -}.keys - -nonConvertiblePredicates ++ unhandledPredicates - - case _ => candidates -} - -val filterCondition = (stayUpFilters ++ nonDeterministic).reduceLeftOption(And) -val withFilter = filterCondition.map(Filter(_, r)).getOrElse(r) -if (withFilter.output == fields) { - withFilter -} else { - Project(fields, withFilter) -} -} - -// TODO: add more push down rules. - -val columnPruned = pushDownRequiredColumns(filterPushed, filterPushed.outputSet) -// After column pruning, we may have redundant PROJECT nodes in the query plan, remove them. -RemoveRedundantProject(columnPruned) - } - - // TODO: nested fields pruning - private def pushDownRequiredColumns( - plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = { -plan match { - case p @ Project(projectList, child) => -val required = projectList.flatMap(_.references) -p.copy(child = pushDownRequiredColumns(child, AttributeSet(required))) - - case f @ Filter(condition, child) => -val required = requiredByPar
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167344060 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes --- End diff -- Sorry I just recalled, there is a rule(undocumented) that leaf node should make `output` as a constructor parameter. The reason is that, if the plan gets transformed and copied, the `output` can not change, otherwise parent node's expressions may refer to non-exist attributes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167343401 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +new DataSourceOptions(options.asJava) + } + + private val sourceName: String = { +source match { + case registered: DataSourceRegister => +registered.shortName() + case _ => +source.getClass.getSimpleName +} + } + + lazy val ( + reader: DataSourceReader, + unsupportedFilters: Seq[Expression], + pushedFilters: Seq[Expression]) = { +val newReader = userSchema match { + case Some(s) => +asReadSupportWithSchema.createReader(s, v2Options) + case _ => +asReadSupport.createReader(v2Options) +} + +projection.foreach { attrs => + DataSourceV2Relation.pushRequiredColumns(newReader, attrs.toStructType) +} + +val (remainingFilters, pushedFilters) = filters match { + case Some(filterSeq) => +DataSourceV2Relation.pushFilters(newReader, filterSeq) + case _ => +(Nil, Nil) +} + +(newReader, remainingFilters, pushedFilters) + } + + private lazy val asReadSupport: ReadSupport = { --- End diff -- It's a little hard to map the new logic to what we have in https://github.com/apache/spark/pull/20387/files#diff-f70bda59304588cc3abfa3a9840653f4L201 . Can we keep the previous code style? i.e. `val reader = (ds, userSpecifiedSchema) match ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167343364 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, --- End diff -- This is a bad idea because it means that any place that creates this relation needs to know to create a reader and get its schema to pass in. That's confusing because some code will pass a user schema and some code will pass a projection. Keeping this logic inside of the relation ensures that it is done correctly and without leaking details from the v2 API into the rest of the planner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167342730 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly --- End diff -- the comment is out-dated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167342554 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, --- End diff -- Since we need to keep the "pushed columns" anyway, can we just use the `output` field? The only difference between `output` and `projection` is the default value, this can be solved by creating a temp reader and get its schema in `DataFrameReader`. We already did this in `DataStreamReader.load`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167320953 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { --- End diff -- I agree that it's a bad idea to run push-down here. I fixed this by implementing `doCanonicalize` and returning a node that overrides the `output` val. I think that is cleaner than pulling logic outside of the relation. There's no need for every place that creates a relation to need to get the output of a reader. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167295506 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +new DataSourceOptions(options.asJava) --- End diff -- I think you're missing that it's a bad idea to pass `DataSourceOptions` to the relation in the first place. As I've said, this class is specific to v2 readers and writers, and doesn't need to be leaked into logical plans. To make this a bit more clear: If we pass the the options to the relation, then every time a new code path instantiates the relation, someone has to know where that central place to create options is. You're trading the problem for one only slightly better. What if every relation required its own specific classes to pass the same information? That would be a mess. Rather than know what custom object to instantiate and where the helper for it is, it makes more sense to pass data to the relation. Then we can also use the info in match expressions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167288341 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +new DataSourceOptions(options.asJava) + } + + private val sourceName: String = { +source match { + case registered: DataSourceRegister => +registered.shortName() + case _ => +source.getClass.getSimpleName +} + } + + lazy val ( + reader: DataSourceReader, + unsupportedFilters: Seq[Expression], + pushedFilters: Seq[Expression]) = { +val newReader = userSchema match { + case Some(s) => +asReadSupportWithSchema.createReader(s, v2Options) + case _ => +asReadSupport.createReader(v2Options) +} + +projection.foreach { attrs => + DataSourceV2Relation.pushRequiredColumns(newReader, attrs.toStructType) +} + +val (remainingFilters, pushedFilters) = filters match { + case Some(filterSeq) => +DataSourceV2Relation.pushFilters(newReader, filterSeq) + case _ => +(Nil, Nil) +} + +(newReader, remainingFilters, pushedFilters) + } - override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] + def writer(dfSchema: StructType, mode: SaveMode): Option[DataSourceWriter] = { --- End diff -- There's a key difference here that you're missing. The streaming API was rushed and committed without adequate review input. The comment indicated that the commit was probably unfinished and, if so, the remaining changes could be put into a completely different commit. In contrast, this contained a few additional things that make `DataSourceV2Relation` complete and more useful for the next PRs. Thinking about how this relat
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167147910 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { --- End diff -- I pulled your code and played with it. So your PR does fix the bug, but in a hacky way. Let's me explain what happened. 1. `QueryPlan.canonicalized` is called, every expression in `DataSourceV2Relation` is canonicalized, including `DataSourceV2Relation.projection`. This means, the attributes in `projection` are all renamed to "none". 2. `DataSourceV2Relation.output` is called, which triggers the creation of the reader, and applies filter push down and column pruning. Note that because all attributes are renamed to "none", we are actually pushing invalid filters and columns to data sources. 3. line up `reader.schema` and `projection`, to get the actual output. Because all names are "none", it works. However step 2 is pretty dangerous, Spark doesn't define the behavior of pushing invalid filters and columns, especially what `reader.schema` should return after invalid columns are pushed down. I prefer my original fix, which put `output` in `DataSourceV2Relation`'s constructor parameters, and update it when doing column pruning in `PushDownOperatorsToDataSource`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167142433 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +new DataSourceOptions(options.asJava) --- End diff -- We all agree that duplicating the logic of creating `DataSourceOptions` in many places is a bad idea. Currently there are 2 proposals: 1. Have a central place to take care the data source v2 resolution logic, including option creating. This is the approach of data source v1, i.e. the class `DataSource`. 2. Similar to proposal 1, but make `DataSourceV2Relation` the central place. For now we don't know which one is better, it depends on how data source v2 evolves in the future. At this point of time, I think we should pick the simplest approach, which is passing the `DataSourceOptions` to `DataSourceV2Relation`. Then we just need a one-line change in `DataFrameReader`, and don't need to add `v2Options` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167141001 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +new DataSourceOptions(options.asJava) + } + + private val sourceName: String = { +source match { + case registered: DataSourceRegister => +registered.shortName() + case _ => +source.getClass.getSimpleName +} + } + + lazy val ( + reader: DataSourceReader, + unsupportedFilters: Seq[Expression], + pushedFilters: Seq[Expression]) = { +val newReader = userSchema match { + case Some(s) => +asReadSupportWithSchema.createReader(s, v2Options) + case _ => +asReadSupport.createReader(v2Options) +} + +projection.foreach { attrs => + DataSourceV2Relation.pushRequiredColumns(newReader, attrs.toStructType) +} + +val (remainingFilters, pushedFilters) = filters match { + case Some(filterSeq) => +DataSourceV2Relation.pushFilters(newReader, filterSeq) + case _ => +(Nil, Nil) +} + +(newReader, remainingFilters, pushedFilters) + } - override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] + def writer(dfSchema: StructType, mode: SaveMode): Option[DataSourceWriter] = { --- End diff -- I think we should avoid adding unused code that is needed in the future. The streaming data source v2 was a bad example and you already pointed it out. Hope we don't make the same mistake in the future. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167005107 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +new DataSourceOptions(options.asJava) + } + + private val sourceName: String = { +source match { + case registered: DataSourceRegister => +registered.shortName() + case _ => +source.getClass.getSimpleName +} + } + + lazy val ( + reader: DataSourceReader, + unsupportedFilters: Seq[Expression], + pushedFilters: Seq[Expression]) = { +val newReader = userSchema match { + case Some(s) => +asReadSupportWithSchema.createReader(s, v2Options) + case _ => +asReadSupport.createReader(v2Options) +} + +projection.foreach { attrs => + DataSourceV2Relation.pushRequiredColumns(newReader, attrs.toStructType) +} + +val (remainingFilters, pushedFilters) = filters match { + case Some(filterSeq) => +DataSourceV2Relation.pushFilters(newReader, filterSeq) + case _ => +(Nil, Nil) +} + +(newReader, remainingFilters, pushedFilters) + } - override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] + def writer(dfSchema: StructType, mode: SaveMode): Option[DataSourceWriter] = { --- End diff -- If you want this removed to get this commit in, please say so. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167004846 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { --- End diff -- That commit was not reverted when I rebased. The test is still present and passing: https://github.com/apache/spark/blob/181946d1f1c5889661544830a77bd23c4b4f685a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala#L320-L336 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167001183 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +new DataSourceOptions(options.asJava) --- End diff -- Also, keep in mind that this is a lazy val. It is only referenced when creating a reader or writer --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166998163 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { --- End diff -- I'll have a look. I didn't realize you'd committed that one already. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166997697 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +new DataSourceOptions(options.asJava) + } + + private val sourceName: String = { +source match { + case registered: DataSourceRegister => +registered.shortName() + case _ => +source.getClass.getSimpleName +} + } + + lazy val ( + reader: DataSourceReader, + unsupportedFilters: Seq[Expression], + pushedFilters: Seq[Expression]) = { +val newReader = userSchema match { + case Some(s) => +asReadSupportWithSchema.createReader(s, v2Options) + case _ => +asReadSupport.createReader(v2Options) +} + +projection.foreach { attrs => + DataSourceV2Relation.pushRequiredColumns(newReader, attrs.toStructType) +} + +val (remainingFilters, pushedFilters) = filters match { + case Some(filterSeq) => +DataSourceV2Relation.pushFilters(newReader, filterSeq) + case _ => +(Nil, Nil) +} + +(newReader, remainingFilters, pushedFilters) + } - override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] + def writer(dfSchema: StructType, mode: SaveMode): Option[DataSourceWriter] = { --- End diff -- No, it isn't. But a relation should be able to return a writer. This is going to be needed as we improve the logical plans used by v2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166997241 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +new DataSourceOptions(options.asJava) --- End diff -- As we've already discussed at length, I think it is a bad idea to create `DataSourceOptions` and pass it to the relation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166827975 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { --- End diff -- This reverts https://github.com/apache/spark/pull/20485 , can we still pass the test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166827720 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +new DataSourceOptions(options.asJava) --- End diff -- This makes us create a new `DataSourceOptions` everytime `DataSourceV2Relation` gets transformed and copied. Can we just create `DataSourceOptions` at `DataFrameReader` and pass it to `DataSourceV2Relation`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166827284 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +new DataSourceOptions(options.asJava) + } + + private val sourceName: String = { +source match { + case registered: DataSourceRegister => +registered.shortName() + case _ => +source.getClass.getSimpleName +} + } + + lazy val ( + reader: DataSourceReader, + unsupportedFilters: Seq[Expression], + pushedFilters: Seq[Expression]) = { +val newReader = userSchema match { + case Some(s) => +asReadSupportWithSchema.createReader(s, v2Options) + case _ => +asReadSupport.createReader(v2Options) +} + +projection.foreach { attrs => + DataSourceV2Relation.pushRequiredColumns(newReader, attrs.toStructType) +} + +val (remainingFilters, pushedFilters) = filters match { + case Some(filterSeq) => +DataSourceV2Relation.pushFilters(newReader, filterSeq) + case _ => +(Nil, Nil) +} + +(newReader, remainingFilters, pushedFilters) + } - override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] + def writer(dfSchema: StructType, mode: SaveMode): Option[DataSourceWriter] = { --- End diff -- This method is not used anywhere. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166691646 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,139 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, --- End diff -- You've missed the idea that I was trying to make with that point. The idea isn't to make "surgical" commits that hardly make any changes and must exactly match the JIRA description. If we did that, we would end up making only tiny commits, spending far too much time filling out JIRA issues, and buried under an unmanageable review backlog. The point is to think about whether the content of a commit would confuse someone managing a branch or cause avoidable conflicts when reverting or picking commits. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166526321 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,139 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, --- End diff -- Sorry I might not make it clear. What I proposed is to only include changes that are necessary for this PR. To make the plan immutable, what we need is: 1. keep required columns and pushed filters in the plan. So here we add 2 parameters: `projection` and `filters`. 2. keep things that are needed to create a `DataSourceReader`, i.e. `DataSourceV2`, `DataSourceOption` and `userSpecifiedSchema`. For 1 we are good. For 2, I think the most intuitive and surgical way is to create the `DataSourceOption` in `DataFrameReader`(the current behavior) and keep the `DataSourceOption` in `DataSourceV2Relation`. There may be an argument about putting these common information in the `simpleString`, I'm +1 on doing this, but this should be done after https://issues.apache.org/jira/browse/SPARK-23341 when the community have an agreement on an initial list of common information(may be more than just table and path). I really like your proposal about "donât mix partial, unrelated changes into a commit", and I hope all the open source community can follow it and make high quality PRs. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166394747 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, --- End diff -- I'm not saying that `DataSourceOptions` have to be handled in the relation. Just that the relation should use the same classes to pass data, like `TableIdentifier`, that are used by the rest of the planner. I agree with those benefits of doing this. Is there anything that needs to change in this PR? We can move where the options are created in a follow-up, but let me know if you think this would prevent this from getting merged. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166392277 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, --- End diff -- I agree that `DataFrameReader` should not be the only place that creates `DataSourceV2Relation`, so handling `DataSourceOptions` in `DataFrameReader` is a bad idea. But handling `DataSourceOptions` in `DataSourceV2Relation` is not the only option. Like data source v1(see `DataSource`), we can have a central place to deal with the data source stuff, which can also minimize the number of places that need to handle `DataSourceOptions`. The actual benefits I see are: 1. It's easier to write catalyst rules to match a `DataSourceV2Relation` with a specific table identifier. 2. It's easier to reuse `DataSourceV2Relation` if we have data source v3 later, because we don't have any v2 specific stuff in the constructor. For 1, I think we can solve it by defining a `DataSourceV2Relation.unapply` to match the table identifier. For 2, I think it may not worth to consider it at this point. There might be more benefits I was missing, please correct me if I was wrong, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166386661 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, --- End diff -- I think that `TableIdentifier` and a string-to-string `Map` should be passed to `DataSourceV2Relation` and that either the relation or `DataSourceOptions` should be responsible for creating `DataSourceOptions` with well-defined properties to to pass the table information to implementations. This minimizes the number of places that need to handle `DataSourceOptions` (which is specific to v2) and uses `TableIdentifier` and `Map` to match the rest of the planner nodes. For example, other read paths that can create `DataSourceV2Relation`, like resolution rules, use `TableIdentifier`. I'm not currently advocating a position on how to configure `DataFrameReader` or `DataFrameWriter` or how to handle schemas. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166168357 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, --- End diff -- Let me confirm your expectation for the data source v2 framework. First we define some standard information, like shema, path, table. We provide dedicated methods in `DataFrameReader` and `DataFrameWriter` for end users to specify them, e.g. `DataFrameReader.schame/path/table`. Then these information are passed to the read/write logical plan directly, i.e. `DataSourceV2Relation` and `WriteToDataSourceV2`. At the end the read/write logical plan construct the `DataSourceOption`, put these standard information to the option with standard option keys. Do I understand it correctly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166165255 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +"DataSourceV2Relation(" + + s"source=$sourceName${path.orElse(table).map(loc => s"($loc)").getOrElse("")}, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}] options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +path match { + case Some(p) => +updatedOptions.put("path", p) + case None => +updatedOptions.remove("path") +} + +table.map { ident => + updatedOptions.put("table", ident.table) --- End diff -- Opened [SPARK-23341](https://issues.apache.org/jira/browse/SPARK-23341) for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166165005 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +"DataSourceV2Relation(" + + s"source=$sourceName${path.orElse(table).map(loc => s"($loc)").getOrElse("")}, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}] options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +path match { + case Some(p) => +updatedOptions.put("path", p) + case None => +updatedOptions.remove("path") +} + +table.map { ident => + updatedOptions.put("table", ident.table) --- End diff -- I think we agree here. I want to avoid doing this outside of either `DataSourceOptions` or `DataSourceV2Relation`. If we can create `DataSourceOptions` from a `Option[TableIdentifier]` and add the `getTable` accessor, then that works for me. My main motivation is to avoid having this piece of code copied throughout the SQL planner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166163929 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +"DataSourceV2Relation(" + + s"source=$sourceName${path.orElse(table).map(loc => s"($loc)").getOrElse("")}, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}] options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +path match { + case Some(p) => +updatedOptions.put("path", p) + case None => +updatedOptions.remove("path") +} + +table.map { ident => + updatedOptions.put("table", ident.table) --- End diff -- this is something I wanna avoid. At the end we still need to define some standard option keys, like `path` and `table`. It would be good if we can provide some methods for end users and data source developers to easily write/read these standard options. I agree this can be done in a different PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166081367 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, --- End diff -- I should clarify this as well: I like the idea of standardizing how implementations access the table name from `DataSourceOptions`. However, I don't think that is sufficient to remove the table and path here in the definition of `DataSourceV2Relation` for the reasons above. I think we *should* follow this commit with a plan for how implementations access these options. For now, it is good to put the creation of those options in a single place. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r166030958 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, --- End diff -- @cloud-fan, sorry if it was not clear: Yes, I have considered it and I think it is a bad idea. I sent a note to the dev list about this issue, as well if you want more context. There are two main reasons: 1. Your proposal creates more places that are responsible for creating a `DataSourceOptions` with the right property names. All of the places where we have a `TableIdentifier` and want to convert to a `DataSourceV2Relation` need to copy the same logic and worry about using the same properties. What you propose is hard to maintain and error prone: what happens if we decide not to pass the database if it is `None` in a `TableIdentifier`? We would have to validate every place that creates a v2 relation. On the other hand, if we pass `TableIdentifier` here, we have one code path that converts. It is also easier for us to pass `TableIdentifier` to the data sources if we choose to update the API. 2. There is no reason to use `DataSourceOptions` outside of v2 at this point. This PR doesn't expose the v2-specific options class to other places in the codebase. Instead, it uses a map for generic options and classes that can be used in pattern matching where possible. And again, this has fewer places that create v2 internal classes, which is easier for maintenance. If you want to add those methods to the options class so that implementations can easily access path and table name, then we can do that in a follow-up PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r165981421 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,151 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, --- End diff -- have you considered about https://github.com/apache/spark/pull/20387#issuecomment-362148217 ? I feel it's better to define these common options in `DataSourceOptions`, so that data source implementations can also get these common options easily. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r165751660 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,15 +17,149 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, ReadSupport, ReadSupportWithSchema, WriteSupport} import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -fullOutput: Seq[AttributeReference], -reader: DataSourceV2Reader) extends LeafNode with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +path: Option[String] = None, +table: Option[TableIdentifier] = None, +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, --- End diff -- I like this pattern. I think it is important that the arguments to a query plan node are comprehensive so that it is easy to understand what is going on in the output of `explain()`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org