Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/20387#discussion_r164633577
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
---
@@ -17,15 +17,149 @@
package org.apache.spark.sql.execution.datasources.v2
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options,
ReadSupport, ReadSupportWithSchema, WriteSupport}
import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
+import org.apache.spark.sql.types.StructType
case class DataSourceV2Relation(
- fullOutput: Seq[AttributeReference],
- reader: DataSourceV2Reader) extends LeafNode with
DataSourceReaderHolder {
+ source: DataSourceV2,
+ options: Map[String, String],
+ path: Option[String] = None,
+ table: Option[TableIdentifier] = None,
+ projection: Option[Seq[AttributeReference]] = None,
+ filters: Option[Seq[Expression]] = None,
+ userSchema: Option[StructType] = None) extends LeafNode {
+
+ override def simpleString: String = {
+ "DataSourceV2Relation(" +
+ s"source=$sourceName, schema=${schema.simpleString}, " +
+ s"filters=$pushedFilters options=$options)"
+ }
+
+ override lazy val schema: StructType = reader.readSchema()
+
+ override lazy val output: Seq[AttributeReference] = {
+ projection match {
+ case Some(attrs) =>
+ // use the projection attributes to avoid assigning new ids.
fields that are not projected
+ // will be assigned new ids, which is okay because they are not
projected.
+ val attrMap = attrs.map(a => a.name -> a).toMap
+ schema.map(f => attrMap.getOrElse(f.name,
+ AttributeReference(f.name, f.dataType, f.nullable,
f.metadata)()))
+ case _ =>
+ schema.toAttributes
+ }
+ }
+
+ private lazy val v2Options: DataSourceV2Options = {
+ // ensure path and table options are set correctly
+ val updatedOptions = new mutable.HashMap[String, String]
+ updatedOptions ++= options
- override def canEqual(other: Any): Boolean =
other.isInstanceOf[DataSourceV2Relation]
+ path match {
+ case Some(p) =>
+ updatedOptions.put("path", p)
+ case None =>
+ updatedOptions.remove("path")
+ }
+
+ table.map { ident =>
+ updatedOptions.put("table", ident.table)
+ ident.database match {
+ case Some(db) =>
+ updatedOptions.put("database", db)
+ case None =>
+ updatedOptions.remove("database")
+ }
+ }
+
+ new DataSourceV2Options(options.asJava)
+ }
+
+ private val sourceName: String = {
+ source match {
+ case registered: DataSourceRegister =>
+ registered.shortName()
+ case _ =>
+ source.getClass.getSimpleName
+ }
+ }
+
+ lazy val (
+ reader: DataSourceV2Reader,
+ unsupportedFilters: Seq[Expression],
+ pushedFilters: Seq[Expression]) = {
+ val newReader = userSchema match {
+ case Some(s) =>
+ asReadSupportWithSchema.createReader(s, v2Options)
--- End diff --
I like this idea. Although `DataSourceReader` is mutable, we can create a
new one every time when we wanna apply the operator pushdowns.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]