Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20387#discussion_r167001183
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
    @@ -17,17 +17,130 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    +import java.util.UUID
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.{AnalysisException, SaveMode}
     import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
    -import org.apache.spark.sql.catalyst.expressions.AttributeReference
    -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
    -import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
    +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
    +import org.apache.spark.sql.execution.datasources.DataSourceStrategy
    +import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
    +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
    +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
    +import org.apache.spark.sql.types.StructType
     
     case class DataSourceV2Relation(
    -    output: Seq[AttributeReference],
    -    reader: DataSourceReader)
    -  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
    +    source: DataSourceV2,
    +    options: Map[String, String],
    +    projection: Option[Seq[AttributeReference]] = None,
    +    filters: Option[Seq[Expression]] = None,
    +    userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
    +
    +  override def simpleString: String = {
    +    s"DataSourceV2Relation(source=$sourceName, " +
    +      s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
    +      s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
    +  }
    +
    +  override lazy val schema: StructType = reader.readSchema()
    +
    +  override lazy val output: Seq[AttributeReference] = {
    +    projection match {
    +      case Some(attrs) =>
    +        // use the projection attributes to avoid assigning new ids. 
fields that are not projected
    +        // will be assigned new ids, which is okay because they are not 
projected.
    +        val attrMap = attrs.map(a => a.name -> a).toMap
    +        schema.map(f => attrMap.getOrElse(f.name,
    +          AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
    +      case _ =>
    +        schema.toAttributes
    +    }
    +  }
    +
    +  private lazy val v2Options: DataSourceOptions = {
    +    // ensure path and table options are set correctly
    +    val updatedOptions = new mutable.HashMap[String, String]
    +    updatedOptions ++= options
    +
    +    new DataSourceOptions(options.asJava)
    --- End diff --
    
    Also, keep in mind that this is a lazy val. It is only referenced when 
creating a reader or writer


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to