Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/20448#discussion_r165049675
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
---
@@ -17,36 +17,84 @@
package org.apache.spark.sql.execution.datasources.v2
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
AttributeSet, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2,
ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.types.StructType
+/**
+ * A logical plan representing a data source relation, which will be
planned to a data scan
+ * operator finally.
+ *
+ * @param output The output of this relation.
+ * @param source The instance of a data source v2 implementation.
+ * @param options The options specified for this scan, used to create the
`DataSourceReader`.
+ * @param userSpecifiedSchema The user specified schema, used to create
the `DataSourceReader`.
+ * @param filters The predicates which are pushed and handled by this data
source.
+ * @param existingReader A mutable reader carrying some temporary stats
during optimization and
+ * planning. It's always None before optimization,
and does not take part in
+ * the equality of this plan, which means this plan
is still immutable.
+ */
case class DataSourceV2Relation(
- fullOutput: Seq[AttributeReference],
- reader: DataSourceReader) extends LeafNode with DataSourceReaderHolder
{
+ output: Seq[AttributeReference],
+ source: DataSourceV2,
+ options: DataSourceOptions,
+ userSpecifiedSchema: Option[StructType],
+ filters: Set[Expression],
+ existingReader: Option[DataSourceReader]) extends LeafNode with
DataSourceV2QueryPlan {
+
+ override def references: AttributeSet = AttributeSet.empty
+
+ override def sourceClass: Class[_ <: DataSourceV2] = source.getClass
override def canEqual(other: Any): Boolean =
other.isInstanceOf[DataSourceV2Relation]
+ def reader: DataSourceReader = existingReader.getOrElse {
+ (source, userSpecifiedSchema) match {
+ case (ds: ReadSupportWithSchema, Some(schema)) =>
+ ds.createReader(schema, options)
+
+ case (ds: ReadSupport, None) =>
+ ds.createReader(options)
+
+ case (ds: ReadSupport, Some(schema)) =>
+ val reader = ds.createReader(options)
+ // Sanity check, this should be guaranteed by
`DataFrameReader.load`
+ assert(reader.readSchema() == schema)
+ reader
+
+ case _ => throw new IllegalStateException()
+ }
+ }
+
override def computeStats(): Statistics = reader match {
case r: SupportsReportStatistics =>
Statistics(sizeInBytes =
r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
case _ =>
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}
+
+ override def simpleString: String = s"Relation $metadataString"
}
/**
* A specialization of DataSourceV2Relation with the streaming bit set to
true. Otherwise identical
* to the non-streaming relation.
*/
-class StreamingDataSourceV2Relation(
- fullOutput: Seq[AttributeReference],
- reader: DataSourceReader) extends DataSourceV2Relation(fullOutput,
reader) {
+case class StreamingDataSourceV2Relation(
--- End diff --
Similar to `LogicalRelation`, I think we can simply add a `isStream`
parameter to `DataSourceV2Relation`. This can be addressed in a follow up PR.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]