Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237113069 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -23,85 +23,55 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} -import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport, MicroBatchReadSupport} /** - * Physical plan node for scanning data from a data source. + * Physical plan node for scanning a batch of data from a data source. */ case class DataSourceV2ScanExec( output: Seq[AttributeReference], @transient source: DataSourceV2, @transient options: Map[String, String], @transient pushedFilters: Seq[Expression], - @transient readSupport: ReadSupport, - @transient scanConfig: ScanConfig) + @transient batch: Batch) extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { override def simpleString: String = "ScanV2 " + metadataString // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { - case other: DataSourceV2ScanExec => - output == other.output && readSupport.getClass == other.readSupport.getClass && - options == other.options + case other: DataSourceV2ScanExec => this.batch == other.batch case _ => false } - override def hashCode(): Int = { - Seq(output, source, options).hashCode() - } + override def hashCode(): Int = batch.hashCode() + + private lazy val partitions = batch.planInputPartitions() + + private lazy val readerFactory = batch.createReaderFactory() - override def outputPartitioning: physical.Partitioning = readSupport match { + override def outputPartitioning: physical.Partitioning = batch match { --- End diff -- add back https://github.com/apache/spark/pull/23086#discussion_r236858449
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org