Github user rdblue commented on a diff in the pull request:
    --- Diff: 
    @@ -17,17 +17,131 @@
     package org.apache.spark.sql.execution.datasources.v2
    +import java.util.UUID
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import org.apache.spark.sql.{AnalysisException, SaveMode}
     import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
    -import org.apache.spark.sql.catalyst.expressions.AttributeReference
    -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
    -import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
    +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
    +import org.apache.spark.sql.execution.datasources.DataSourceStrategy
    +import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
    +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
    +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
    +import org.apache.spark.sql.types.StructType
     case class DataSourceV2Relation(
    -    output: Seq[AttributeReference],
    -    reader: DataSourceReader)
    -  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
    +    source: DataSourceV2,
    +    options: Map[String, String],
    +    projection: Option[Seq[AttributeReference]] = None,
    +    filters: Option[Seq[Expression]] = None,
    +    userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
    +  override def simpleString: String = {
    +    s"DataSourceV2Relation(source=$sourceName, " +
    +      s"schema=[${ => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
    +      s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
    +  }
    +  override lazy val schema: StructType = reader.readSchema()
    +  override lazy val output: Seq[AttributeReference] = {
    +    projection match {
    +      case Some(attrs) =>
    +        // use the projection attributes to avoid assigning new ids. 
fields that are not projected
    +        // will be assigned new ids, which is okay because they are not 
    +        val attrMap = => -> a).toMap
    + => attrMap.getOrElse(,
    +          AttributeReference(, f.dataType, f.nullable, 
    +      case _ =>
    +        schema.toAttributes
    --- End diff --
    I don't think it is correct to make `output` a constructor parameter here. 
The v2 read API allows implementations to return rows with a different schema 
than the one requested, so you don't know whether the projection and the output 
will actually match until you push the projection and look at the schema the 
reader returns.
    If `output` were a constructor parameter, then it would be easy 
accidentally create instances where the `output` doesn't match the actual rows 
returned by the source. That's why the current code uses `projection` to pass 
the requested columns, and always sets `output` correctly.
    To make the guarantee that the column ids don't change, we don't strictly 
need `output` to be a constructor param. In fact, right now the only time this 
matters is when the projection isn't set. Otherwise, the ids are taken from the 
projection. I've considered a couple of options, like caching the conversion 
from schema to attributes, but I think the easiest option is to make sure that 
`projection` is always set.


To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to