Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167142433 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,130 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( - output: Seq[AttributeReference], - reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { + source: DataSourceV2, + options: Map[String, String], + projection: Option[Seq[AttributeReference]] = None, + filters: Option[Seq[Expression]] = None, + userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { + s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { + projection match { + case Some(attrs) => + // use the projection attributes to avoid assigning new ids. fields that are not projected + // will be assigned new ids, which is okay because they are not projected. + val attrMap = attrs.map(a => a.name -> a).toMap + schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => + schema.toAttributes + } + } + + private lazy val v2Options: DataSourceOptions = { + // ensure path and table options are set correctly + val updatedOptions = new mutable.HashMap[String, String] + updatedOptions ++= options + + new DataSourceOptions(options.asJava) --- End diff -- We all agree that duplicating the logic of creating `DataSourceOptions` in many places is a bad idea. Currently there are 2 proposals: 1. Have a central place to take care the data source v2 resolution logic, including option creating. This is the approach of data source v1, i.e. the class `DataSource`. 2. Similar to proposal 1, but make `DataSourceV2Relation` the central place. For now we don't know which one is better, it depends on how data source v2 evolves in the future. At this point of time, I think we should pick the simplest approach, which is passing the `DataSourceOptions` to `DataSourceV2Relation`. Then we just need a one-line change in `DataFrameReader`, and don't need to add `v2Options` here.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org