Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20387#discussion_r167345858
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
    @@ -17,17 +17,131 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    +import java.util.UUID
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.{AnalysisException, SaveMode}
     import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
    -import org.apache.spark.sql.catalyst.expressions.AttributeReference
    -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
    -import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
    +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
    +import org.apache.spark.sql.execution.datasources.DataSourceStrategy
    +import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
    +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
    +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
    +import org.apache.spark.sql.types.StructType
     
     case class DataSourceV2Relation(
    -    output: Seq[AttributeReference],
    -    reader: DataSourceReader)
    -  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
    +    source: DataSourceV2,
    +    options: Map[String, String],
    +    projection: Option[Seq[AttributeReference]] = None,
    +    filters: Option[Seq[Expression]] = None,
    +    userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
    +
    +  override def simpleString: String = {
    +    s"DataSourceV2Relation(source=$sourceName, " +
    +      s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
    +      s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
    +  }
    +
    +  override lazy val schema: StructType = reader.readSchema()
    +
    +  override lazy val output: Seq[AttributeReference] = {
    +    projection match {
    +      case Some(attrs) =>
    +        // use the projection attributes to avoid assigning new ids. 
fields that are not projected
    +        // will be assigned new ids, which is okay because they are not 
projected.
    +        val attrMap = attrs.map(a => a.name -> a).toMap
    +        schema.map(f => attrMap.getOrElse(f.name,
    +          AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
    +      case _ =>
    +        schema.toAttributes
    --- End diff --
    
    This is currently handled by setting the projection to always get the same 
ids, but it may not cover all copy cases. I'll look into how we can reliably 
move output to the constructor params, but this needs to avoid forcing callers 
to configure a reader.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to