Github user rdblue commented on a diff in the pull request:
https://github.com/apache/spark/pull/22009#discussion_r208638252
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
---
@@ -39,52 +36,43 @@ case class DataSourceV2ScanExec(
@transient source: DataSourceV2,
@transient options: Map[String, String],
@transient pushedFilters: Seq[Expression],
- @transient reader: DataSourceReader)
+ @transient readSupport: ReadSupport,
+ @transient scanConfig: ScanConfig)
extends LeafExecNode with DataSourceV2StringFormat with
ColumnarBatchScan {
override def simpleString: String = "ScanV2 " + metadataString
// TODO: unify the equal/hashCode implementation for all data source v2
query plans.
override def equals(other: Any): Boolean = other match {
case other: DataSourceV2ScanExec =>
- output == other.output && reader.getClass == other.reader.getClass
&& options == other.options
+ output == other.output && readSupport.getClass ==
other.readSupport.getClass &&
+ options == other.options
case _ => false
}
override def hashCode(): Int = {
Seq(output, source, options).hashCode()
}
- override def outputPartitioning: physical.Partitioning = reader match {
- case r: SupportsScanColumnarBatch if r.enableBatchRead() &&
batchPartitions.size == 1 =>
- SinglePartition
-
- case r: SupportsScanColumnarBatch if !r.enableBatchRead() &&
partitions.size == 1 =>
- SinglePartition
-
- case r if !r.isInstanceOf[SupportsScanColumnarBatch] &&
partitions.size == 1 =>
+ override def outputPartitioning: physical.Partitioning = readSupport
match {
+ case _ if partitions.length == 1 =>
SinglePartition
case s: SupportsReportPartitioning =>
new DataSourcePartitioning(
- s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
+ s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a
-> a.name)))
case _ => super.outputPartitioning
}
- private lazy val partitions: Seq[InputPartition[InternalRow]] = {
- reader.planInputPartitions().asScala
- }
+ private lazy val partitions: Seq[InputPartition] =
readSupport.planInputPartitions(scanConfig)
- private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] =
reader match {
- case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
- assert(!reader.isInstanceOf[ContinuousReader],
- "continuous stream reader does not support columnar read yet.")
- r.planBatchInputPartitions().asScala
- }
+ private lazy val partitionReaderFactory =
readSupport.createReaderFactory(scanConfig)
- private lazy val inputRDD: RDD[InternalRow] = reader match {
- case _: ContinuousReader =>
+ private lazy val inputRDD: RDD[InternalRow] = readSupport match {
+ case _: ContinuousReadSupport =>
+ assert(!partitionReaderFactory.supportColumnarReads(),
--- End diff --
This is a slightly different case. Can Spark choose not to use columnar
reads if the source returns true for `supportsColumnarReads`? If so, then this
isn't a problem.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]