Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20387#discussion_r167385829
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
    @@ -37,22 +100,129 @@ case class DataSourceV2Relation(
       }
     
       override def newInstance(): DataSourceV2Relation = {
    -    copy(output = output.map(_.newInstance()))
    +    // projection is used to maintain id assignment.
    +    // if projection is not set, use output so the copy is not equal to 
the original
    +    copy(projection = projection.map(_.newInstance()))
       }
     }
     
     /**
      * A specialization of DataSourceV2Relation with the streaming bit set to 
true. Otherwise identical
      * to the non-streaming relation.
      */
    -class StreamingDataSourceV2Relation(
    +case class StreamingDataSourceV2Relation(
         output: Seq[AttributeReference],
    -    reader: DataSourceReader) extends DataSourceV2Relation(output, reader) 
{
    +    reader: DataSourceReader)
    +    extends LeafNode with DataSourceReaderHolder with 
MultiInstanceRelation {
       override def isStreaming: Boolean = true
    +
    +  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[StreamingDataSourceV2Relation]
    +
    +  override def newInstance(): LogicalPlan = copy(output = 
output.map(_.newInstance()))
     }
     
     object DataSourceV2Relation {
    -  def apply(reader: DataSourceReader): DataSourceV2Relation = {
    -    new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
    +  private implicit class SourceHelpers(source: DataSourceV2) {
    +    def asReadSupport: ReadSupport = {
    +      source match {
    +        case support: ReadSupport =>
    +          support
    +        case _: ReadSupportWithSchema =>
    +          // this method is only called if there is no user-supplied 
schema. if there is no
    +          // user-supplied schema and ReadSupport was not implemented, 
throw a helpful exception.
    +          throw new AnalysisException(s"Data source requires a 
user-supplied schema: $name")
    +        case _ =>
    +          throw new AnalysisException(s"Data source is not readable: 
$name")
    +      }
    +    }
    +
    +    def asReadSupportWithSchema: ReadSupportWithSchema = {
    +      source match {
    +        case support: ReadSupportWithSchema =>
    +          support
    +        case _: ReadSupport =>
    --- End diff --
    
    This is different from before: see 
https://github.com/apache/spark/pull/20387/files#diff-f70bda59304588cc3abfa3a9840653f4L214
    
    Even if `userSpecifiedSchema` is not none, it's still allowed to have 
`ReadSupport`, as long as its reader's schema is same as the user specified 
schema.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to