Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/20397#discussion_r163920956
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
---
@@ -39,14 +39,14 @@ import org.apache.spark.util.{SystemClock, ThreadUtils}
class ContinuousDataSourceRDD(
sc: SparkContext,
sqlContext: SQLContext,
- @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+ @transient private val readerFactories:
java.util.List[DataReaderFactory[UnsafeRow]])
extends RDD[UnsafeRow](sc, Nil) {
private val dataQueueSize =
sqlContext.conf.continuousStreamingExecutorQueueSize
private val epochPollIntervalMs =
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
override protected def getPartitions: Array[Partition] = {
- readTasks.asScala.zipWithIndex.map {
+ readerFactories.asScala.zipWithIndex.map {
case (readTask, index) => new DataSourceRDDPartition(index, readTask)
--- End diff --
readTask -> readerFactory
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]