GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/21029
remove type parameter in DataReaderFactory ## What changes were proposed in this pull request? This API change is inspired by the problems we meet when migrating streaming and file-based data sources. For the streaming side, we need a variant of the `DataReader/WriterFactory`(see [an example](https://docs.google.com/document/d/1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE/edit#)). This brings a lot of trouble for scanning/writing optimized data format like `InternalRow`, `ColumnarBatch`, etc. These special scanning/writing interfaces are defined like ``` interface SupportsScanColumnarBatch { List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories(); } ``` This can't work with `ContinuousDataReaderFactory` at all, or we have to do runtime type cast and make the variant extends `DataReader/WriterFactory`. We have the same problem on the write path too. For the file-based data source side, we have a problem with code duplication. Let's take ORC data source as an example. To support both unsafe row and columnar batch scan, we need something like ``` class OrcUnsafeRowDataReader extends DataReader[UnsafeRow] { ... } class OrcColumnarBatchDataReader extends DataReader[ColumnarBatch] { ... } class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] { def createDataReader ... } class OrcColumnarBatchFactory(...) extends DataReaderFactory[ColumnarBatch] { def createDataReader ... } class OrcDataSourceReader extends DataSourceReader { def createUnsafeRowFactories = ... // logic to prepare the parameters and create factories def createColumnarBatchFactories = ... // logic to prepare the parameters and create factories } ``` You can see that we have duplicated logic for preparing parameters and defining the factory. After this change, we can simplify the code to ``` class OrcReaderFactory(...) extends DataReaderFactory[ColumnarBatch] { def createUnsafeRowReader ... def createColumnarBatchReader ... } class OrcDataSourceReader extends DataSourceReader { def createReadFactories = ... // logic to prepare the parameters and create factories } ``` The proposed change is: remove the type parameter and embed the special scanning/writing format to the factory. e.g. ``` interface DataReaderFactory { DataFormat dateFormat; default DataReader<Row> createRowDataReader() { throw new IllegalStateException( "createRowDataReader must be implemented if the data format is ROW."); } default DataReader<UnsafeRow> createUnsafeRowDataReader() { throw new IllegalStateException( "createUnsafeRowDataReader must be implemented if the data format is UNSAFE_ROW."); } default DataReader<ColumnarBatch> createColumnarBatchDataReader() { throw new IllegalStateException( "createColumnarBatchDataReader must be implemented if the data format is COLUMNAR_BATCH."); } } ``` TODO: update document fix tests apply this change to the write path(next PR) ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark dsv2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21029.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21029 ---- commit d44105d88e3e5ca1e01ce96efe7019a47960d5be Author: Wenchen Fan <wenchen@...> Date: 2018-04-10T14:06:08Z remove type parameter in DataReaderFactory ---- --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org