Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20448#discussion_r164955297 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,16 +17,57 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.StructType +/** + * A logical plan representing a data source relation, which will be planned to a data scan + * operator finally. + * + * @param output The output of this relation. + * @param source The instance of a data source v2 implementation. + * @param options The options specified for this scan, used to create the `DataSourceReader`. + * @param userSpecifiedSchema The user specified schema, used to create the `DataSourceReader`. + * @param filters The predicates which are pushed and handled by this data source. + * @param existingReader An mutable reader carrying some temporary stats during optimization and + * planning. It's always None before optimization, and does not take part in + * the equality of this plan, which means this plan is still immutable. + */ case class DataSourceV2Relation( - fullOutput: Seq[AttributeReference], - reader: DataSourceReader) extends LeafNode with DataSourceReaderHolder { + output: Seq[AttributeReference], + source: DataSourceV2, + options: DataSourceOptions, + userSpecifiedSchema: Option[StructType], + filters: Set[Expression], + existingReader: Option[DataSourceReader]) extends LeafNode with DataSourceV2QueryPlan { + + override def references: AttributeSet = AttributeSet.empty + + override def sourceClass: Class[_ <: DataSourceV2] = source.getClass override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] + def reader: DataSourceReader = existingReader.getOrElse { + (source, userSpecifiedSchema) match { + case (ds: ReadSupportWithSchema, Some(schema)) => + ds.createReader(schema, options) + + case (ds: ReadSupport, None) => + ds.createReader(options) + + case (ds: ReadSupport, Some(schema)) => + val reader = ds.createReader(options) + // Sanity check, this should be guaranteed by `DataFrameReader.load` + assert(reader.readSchema() == schema) + reader + + case _ => throw new IllegalStateException() + } + } + --- End diff -- Do we need to override a `def doCanonicalize`?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org