Github user rdblue commented on a diff in the pull request:
    --- Diff: 
    @@ -37,22 +100,129 @@ case class DataSourceV2Relation(
       override def newInstance(): DataSourceV2Relation = {
    -    copy(output =
    +    // projection is used to maintain id assignment.
    +    // if projection is not set, use output so the copy is not equal to 
the original
    +    copy(projection =
      * A specialization of DataSourceV2Relation with the streaming bit set to 
true. Otherwise identical
      * to the non-streaming relation.
    -class StreamingDataSourceV2Relation(
    +case class StreamingDataSourceV2Relation(
         output: Seq[AttributeReference],
    -    reader: DataSourceReader) extends DataSourceV2Relation(output, reader) 
    +    reader: DataSourceReader)
    +    extends LeafNode with DataSourceReaderHolder with 
MultiInstanceRelation {
       override def isStreaming: Boolean = true
    +  override def canEqual(other: Any): Boolean = 
    +  override def newInstance(): LogicalPlan = copy(output =
     object DataSourceV2Relation {
    -  def apply(reader: DataSourceReader): DataSourceV2Relation = {
    -    new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
    +  private implicit class SourceHelpers(source: DataSourceV2) {
    +    def asReadSupport: ReadSupport = {
    +      source match {
    +        case support: ReadSupport =>
    +          support
    +        case _: ReadSupportWithSchema =>
    +          // this method is only called if there is no user-supplied 
schema. if there is no
    +          // user-supplied schema and ReadSupport was not implemented, 
throw a helpful exception.
    +          throw new AnalysisException(s"Data source requires a 
user-supplied schema: $name")
    +        case _ =>
    +          throw new AnalysisException(s"Data source is not readable: 
    +      }
    +    }
    +    def asReadSupportWithSchema: ReadSupportWithSchema = {
    +      source match {
    +        case support: ReadSupportWithSchema =>
    +          support
    +        case _: ReadSupport =>
    --- End diff --
    For your second concern about checking ASAP: this will be done when the 
relation is first created because projection is required and is always based on 
the schema returned by a reader. To be more clear about when this should 
happen, I think that the requirement is for this to happen during job planning 
and, ideally, before filter push-down.
    For the case where the user supplies a schema that is identical to the 
source's schema: I think this might be a bad idea because it will cause 
confusion when source schemas change. Plus, I can't think of a situation where 
it is a good idea to pass a schema that is ignored.
    Here's an example of how this will be confusing: think of a job that 
supplies a schema identical to the table's schema and runs fine, so it goes 
into production. What happens when the table's schema changes? If someone adds 
a column to the table, then the job will start failing and report that the 
source doesn't support user-supplied schemas, even though it had previously 
worked just fine with a user-supplied schema. In addition, the change to the 
table is actually compatible with the old job because the new column will be 
removed by a projection.
    To fix this situation, it may be tempting to use the user-supplied schema 
as an initial projection. But that doesn't make sense because we don't need two 
projection mechanisms. If we used this as a second way to project, it would be 
confusing that you can't actually leave out columns (at least for CSV) and it 
would be odd that using this path you can coerce types, which should usually be 
done by Spark.
    I think it is best not to allow a user-supplied schema when it isn't 
supported by a source.


To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to