Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/20448#discussion_r164958102
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
---
@@ -17,16 +17,57 @@
package org.apache.spark.sql.execution.datasources.v2
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
AttributeSet, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2,
ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.StructType
+/**
+ * A logical plan representing a data source relation, which will be
planned to a data scan
+ * operator finally.
+ *
+ * @param output The output of this relation.
+ * @param source The instance of a data source v2 implementation.
+ * @param options The options specified for this scan, used to create the
`DataSourceReader`.
+ * @param userSpecifiedSchema The user specified schema, used to create
the `DataSourceReader`.
+ * @param filters The predicates which are pushed and handled by this data
source.
+ * @param existingReader An mutable reader carrying some temporary stats
during optimization and
+ * planning. It's always None before optimization,
and does not take part in
+ * the equality of this plan, which means this plan
is still immutable.
+ */
case class DataSourceV2Relation(
- fullOutput: Seq[AttributeReference],
- reader: DataSourceReader) extends LeafNode with DataSourceReaderHolder
{
+ output: Seq[AttributeReference],
+ source: DataSourceV2,
+ options: DataSourceOptions,
+ userSpecifiedSchema: Option[StructType],
+ filters: Set[Expression],
+ existingReader: Option[DataSourceReader]) extends LeafNode with
DataSourceV2QueryPlan {
+
+ override def references: AttributeSet = AttributeSet.empty
+
+ override def sourceClass: Class[_ <: DataSourceV2] = source.getClass
override def canEqual(other: Any): Boolean =
other.isInstanceOf[DataSourceV2Relation]
+ def reader: DataSourceReader = existingReader.getOrElse {
+ (source, userSpecifiedSchema) match {
+ case (ds: ReadSupportWithSchema, Some(schema)) =>
+ ds.createReader(schema, options)
+
+ case (ds: ReadSupport, None) =>
+ ds.createReader(options)
+
+ case (ds: ReadSupport, Some(schema)) =>
+ val reader = ds.createReader(options)
+ // Sanity check, this should be guaranteed by
`DataFrameReader.load`
+ assert(reader.readSchema() == schema)
+ reader
+
+ case _ => throw new IllegalStateException()
+ }
+ }
+
--- End diff --
data source v2 doesn't support tables yet, so we don't have this problem
now.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]