Github user jiangxb1987 commented on a diff in the pull request:
https://github.com/apache/spark/pull/20153#discussion_r160880016
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
---
@@ -37,40 +35,58 @@ import org.apache.spark.sql.types.StructType
*/
case class DataSourceV2ScanExec(
fullOutput: Seq[AttributeReference],
- @transient reader: DataSourceV2Reader) extends LeafExecNode with
DataSourceReaderHolder {
+ @transient reader: DataSourceV2Reader)
+ extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
override def canEqual(other: Any): Boolean =
other.isInstanceOf[DataSourceV2ScanExec]
- override def references: AttributeSet = AttributeSet.empty
-
- override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"))
-
- override protected def doExecute(): RDD[InternalRow] = {
- val readTasks: java.util.List[ReadTask[UnsafeRow]] = reader match {
- case r: SupportsScanUnsafeRow => r.createUnsafeRowReadTasks()
- case _ =>
- reader.createReadTasks().asScala.map {
- new RowToUnsafeRowReadTask(_, reader.readSchema()):
ReadTask[UnsafeRow]
- }.asJava
- }
+ override def producedAttributes: AttributeSet = AttributeSet(fullOutput)
+
+ private lazy val inputRDD: RDD[InternalRow] = reader match {
+ case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
+ assert(!reader.isInstanceOf[ContinuousReader],
+ "continuous stream reader does not support columnar read yet.")
+ new DataSourceRDD(sparkContext,
r.createBatchReadTasks()).asInstanceOf[RDD[InternalRow]]
+
+ case _ =>
+ val readTasks: java.util.List[ReadTask[UnsafeRow]] = reader match {
+ case r: SupportsScanUnsafeRow => r.createUnsafeRowReadTasks()
+ case _ =>
+ reader.createReadTasks().asScala.map {
+ new RowToUnsafeRowReadTask(_, reader.readSchema()):
ReadTask[UnsafeRow]
+ }.asJava
+ }
+
+ reader match {
--- End diff --
This looks a bit messy, can we move `readTasks` out as a lazy val then we
may have:
```
private lazy val readTasks = ......
private lazy val inputRDD: RDD[InternalRow] = reader match {
case r: SupportsScanColumnarBatch if r.enableBatchRead() => ......
case _: ContinuousReader => ......
case _ => ......
}
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]