Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/20153#discussion_r159678434
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
---
@@ -37,40 +35,58 @@ import org.apache.spark.sql.types.StructType
*/
case class DataSourceV2ScanExec(
fullOutput: Seq[AttributeReference],
- @transient reader: DataSourceV2Reader) extends LeafExecNode with
DataSourceReaderHolder {
+ @transient reader: DataSourceV2Reader)
+ extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
override def canEqual(other: Any): Boolean =
other.isInstanceOf[DataSourceV2ScanExec]
- override def references: AttributeSet = AttributeSet.empty
-
- override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"))
-
- override protected def doExecute(): RDD[InternalRow] = {
- val readTasks: java.util.List[ReadTask[UnsafeRow]] = reader match {
- case r: SupportsScanUnsafeRow => r.createUnsafeRowReadTasks()
- case _ =>
- reader.createReadTasks().asScala.map {
- new RowToUnsafeRowReadTask(_, reader.readSchema()):
ReadTask[UnsafeRow]
- }.asJava
- }
+ override def producedAttributes: AttributeSet = AttributeSet(fullOutput)
+
+ private lazy val inputRDD: RDD[InternalRow] = reader match {
+ case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
+ assert(!reader.isInstanceOf[ContinuousReader],
+ "continuous stream reader does not support columnar read yet.")
+ new DataSourceRDD(sparkContext,
r.createBatchReadTasks()).asInstanceOf[RDD[InternalRow]]
--- End diff --
cc @zsxwing can streaming support columnar batch reader technically?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]