GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/21029

    remove type parameter in DataReaderFactory

    ## What changes were proposed in this pull request?
    
    This API change is inspired by the problems we meet when migrating 
streaming and file-based data sources.
    
    For the streaming side, we need a variant of the 
`DataReader/WriterFactory`(see [an 
example](https://docs.google.com/document/d/1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE/edit#)).
 This brings a lot of trouble for scanning/writing optimized data format like 
`InternalRow`, `ColumnarBatch`, etc.
    
    These special scanning/writing interfaces are defined like
    ```
    interface SupportsScanColumnarBatch {
      List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories();
    }
    ```
    This can't work with `ContinuousDataReaderFactory` at all, or we have to do 
runtime type cast and make the variant extends `DataReader/WriterFactory`. We 
have the same problem on the write path too.
    
    For the file-based data source side, we have a problem with code 
duplication. Let's take ORC data source as an example. To support both unsafe 
row and columnar batch scan, we need something like
    ```
    class OrcUnsafeRowDataReader extends DataReader[UnsafeRow] {
      ...
    }
    
    class OrcColumnarBatchDataReader extends DataReader[ColumnarBatch] {
      ...
    }
    
    class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] {
      def createDataReader ...
    }
    
    class OrcColumnarBatchFactory(...) extends DataReaderFactory[ColumnarBatch] 
{
      def createDataReader ...
    }
    
    class OrcDataSourceReader extends DataSourceReader {
      def createUnsafeRowFactories = ... // logic to prepare the parameters and 
create factories
    
      def createColumnarBatchFactories = ... // logic to prepare the parameters 
and create factories
    }
    ```
    You can see that we have duplicated logic for preparing parameters and 
defining the factory. After this change, we can simplify the code to
    ```
    class OrcReaderFactory(...) extends DataReaderFactory[ColumnarBatch] {
      def createUnsafeRowReader ...
    
      def createColumnarBatchReader ...
    }
    
    class OrcDataSourceReader extends DataSourceReader {
      def createReadFactories = ... // logic to prepare the parameters and 
create factories
    } 
    ```
    
    The proposed change is: remove the type parameter and embed the special 
scanning/writing format to the factory. e.g.
    ```
    interface DataReaderFactory {
      DataFormat dateFormat;
    
      default DataReader<Row> createRowDataReader() {
        throw new IllegalStateException(
          "createRowDataReader must be implemented if the data format is ROW.");
      }
    
      default DataReader<UnsafeRow> createUnsafeRowDataReader() {
        throw new IllegalStateException(
          "createUnsafeRowDataReader must be implemented if the data format is 
UNSAFE_ROW.");
      }
    
      default DataReader<ColumnarBatch> createColumnarBatchDataReader() {
        throw new IllegalStateException(
          "createColumnarBatchDataReader must be implemented if the data format 
is COLUMNAR_BATCH.");
      }
    }
    ```
    
    TODO:
    update document
    fix tests
    apply this change to the write path(next PR)
    
    
    ## How was this patch tested?
    
    existing tests

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark dsv2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21029.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21029
    
----
commit d44105d88e3e5ca1e01ce96efe7019a47960d5be
Author: Wenchen Fan <wenchen@...>
Date:   2018-04-10T14:06:08Z

    remove type parameter in DataReaderFactory

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to