Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22009#discussion_r208440273
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
---
@@ -93,21 +81,17 @@ case class DataSourceV2ScanExec(
sparkContext,
sqlContext.conf.continuousStreamingExecutorQueueSize,
sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
- partitions).asInstanceOf[RDD[InternalRow]]
-
- case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
- new DataSourceRDD(sparkContext,
batchPartitions).asInstanceOf[RDD[InternalRow]]
+ partitions,
+ schema,
+
partitionReaderFactory.asInstanceOf[ContinuousPartitionReaderFactory])
--- End diff --
`DataSourceV2ScanExec` is shared between batch and streaming, so the
`partitionReaderFactory` here is a general type instead of the concrete
`ContinuousPartitionReaderFactory`. I think we can avoid this cast in the
future refactoring, when we have a dedicated scan plan for continuous streaming.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]