Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23086#discussion_r237113069
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
    @@ -23,85 +23,55 @@ import org.apache.spark.sql.catalyst.expressions._
     import org.apache.spark.sql.catalyst.plans.physical
     import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
     import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
    -import org.apache.spark.sql.execution.streaming.continuous._
     import org.apache.spark.sql.sources.v2.DataSourceV2
     import org.apache.spark.sql.sources.v2.reader._
    -import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory,
 ContinuousReadSupport, MicroBatchReadSupport}
     
     /**
    - * Physical plan node for scanning data from a data source.
    + * Physical plan node for scanning a batch of data from a data source.
      */
     case class DataSourceV2ScanExec(
         output: Seq[AttributeReference],
         @transient source: DataSourceV2,
         @transient options: Map[String, String],
         @transient pushedFilters: Seq[Expression],
    -    @transient readSupport: ReadSupport,
    -    @transient scanConfig: ScanConfig)
    +    @transient batch: Batch)
       extends LeafExecNode with DataSourceV2StringFormat with 
ColumnarBatchScan {
     
       override def simpleString: String = "ScanV2 " + metadataString
     
       // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
       override def equals(other: Any): Boolean = other match {
    -    case other: DataSourceV2ScanExec =>
    -      output == other.output && readSupport.getClass == 
other.readSupport.getClass &&
    -        options == other.options
    +    case other: DataSourceV2ScanExec => this.batch == other.batch
         case _ => false
       }
     
    -  override def hashCode(): Int = {
    -    Seq(output, source, options).hashCode()
    -  }
    +  override def hashCode(): Int = batch.hashCode()
    +
    +  private lazy val partitions = batch.planInputPartitions()
    +
    +  private lazy val readerFactory = batch.createReaderFactory()
     
    -  override def outputPartitioning: physical.Partitioning = readSupport 
match {
    +  override def outputPartitioning: physical.Partitioning = batch match {
    --- End diff --
    
    add back https://github.com/apache/spark/pull/23086#discussion_r236858449


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to