Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20387#discussion_r164633577
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
    @@ -17,15 +17,149 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.catalyst.expressions.AttributeReference
    +import java.util.UUID
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.{AnalysisException, SaveMode}
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
     import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
    +import org.apache.spark.sql.execution.datasources.DataSourceStrategy
    +import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
    +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
     import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
    +import org.apache.spark.sql.types.StructType
     
     case class DataSourceV2Relation(
    -    fullOutput: Seq[AttributeReference],
    -    reader: DataSourceV2Reader) extends LeafNode with 
DataSourceReaderHolder {
    +    source: DataSourceV2,
    +    options: Map[String, String],
    +    path: Option[String] = None,
    +    table: Option[TableIdentifier] = None,
    +    projection: Option[Seq[AttributeReference]] = None,
    +    filters: Option[Seq[Expression]] = None,
    +    userSchema: Option[StructType] = None) extends LeafNode {
    +
    +  override def simpleString: String = {
    +    "DataSourceV2Relation(" +
    +      s"source=$sourceName, schema=${schema.simpleString}, " +
    +      s"filters=$pushedFilters options=$options)"
    +  }
    +
    +  override lazy val schema: StructType = reader.readSchema()
    +
    +  override lazy val output: Seq[AttributeReference] = {
    +    projection match {
    +      case Some(attrs) =>
    +        // use the projection attributes to avoid assigning new ids. 
fields that are not projected
    +        // will be assigned new ids, which is okay because they are not 
projected.
    +        val attrMap = attrs.map(a => a.name -> a).toMap
    +        schema.map(f => attrMap.getOrElse(f.name,
    +          AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
    +      case _ =>
    +        schema.toAttributes
    +    }
    +  }
    +
    +  private lazy val v2Options: DataSourceV2Options = {
    +    // ensure path and table options are set correctly
    +    val updatedOptions = new mutable.HashMap[String, String]
    +    updatedOptions ++= options
     
    -  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2Relation]
    +    path match {
    +      case Some(p) =>
    +        updatedOptions.put("path", p)
    +      case None =>
    +        updatedOptions.remove("path")
    +    }
    +
    +    table.map { ident =>
    +      updatedOptions.put("table", ident.table)
    +      ident.database match {
    +        case Some(db) =>
    +          updatedOptions.put("database", db)
    +        case None =>
    +          updatedOptions.remove("database")
    +      }
    +    }
    +
    +    new DataSourceV2Options(options.asJava)
    +  }
    +
    +  private val sourceName: String = {
    +    source match {
    +      case registered: DataSourceRegister =>
    +        registered.shortName()
    +      case _ =>
    +        source.getClass.getSimpleName
    +    }
    +  }
    +
    +  lazy val (
    +      reader: DataSourceV2Reader,
    +      unsupportedFilters: Seq[Expression],
    +      pushedFilters: Seq[Expression]) = {
    +    val newReader = userSchema match {
    +      case Some(s) =>
    +        asReadSupportWithSchema.createReader(s, v2Options)
    --- End diff --
    
    I like this idea. Although `DataSourceReader` is mutable, we can create a 
new one every time when we wanna apply the operator pushdowns.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to