[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21029#discussion_r181735802 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java --- @@ -18,18 +18,26 @@ package org.apache.spark.sql.sources.v2.reader; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousDataReader; +import org.apache.spark.sql.vectorized.ColumnarBatch; /** * A mix-in interface for {@link DataReaderFactory}. Continuous data reader factories can * implement this interface to provide creating {@link DataReader} with particular offset. */ @InterfaceStability.Evolving -public interface ContinuousDataReaderFactory extends DataReaderFactory { - /** - * Create a DataReader with particular offset as its startOffset. - * - * @param offset offset want to set as the DataReader's startOffset. - */ - DataReader createDataReaderWithOffset(PartitionOffset offset); --- End diff -- cc @jose-torres , seems this method is never used. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21029#discussion_r180796522 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -95,21 +77,29 @@ case class DataSourceV2ScanExec( sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), sparkContext.env) .askSync[Unit](SetReaderPartitions(readerFactories.size)) - new ContinuousDataSourceRDD(sparkContext, sqlContext, readerFactories) -.asInstanceOf[RDD[InternalRow]] - -case r: SupportsScanColumnarBatch if r.enableBatchRead() => - new DataSourceRDD(sparkContext, batchReaderFactories).asInstanceOf[RDD[InternalRow]] - + if (readerFactories.exists(_.dataFormat() == DataFormat.COLUMNAR_BATCH)) { +throw new IllegalArgumentException( + "continuous stream reader does not support columnar read yet.") --- End diff -- Then the missing piece is codegen. This is difficult because the continuous stream reader does a lot of auxiliary work, so I don't know if it will happen in the near future. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21029#discussion_r180705366 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -143,7 +144,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) logDebug(generateDebugString(newBlocks.flatten, startOrdinal, endOrdinal)) newBlocks.map { block => -new MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory[UnsafeRow]] +new MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory] --- End diff -- Array is a java-friendly type. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21029#discussion_r180705219 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -95,21 +77,29 @@ case class DataSourceV2ScanExec( sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), sparkContext.env) .askSync[Unit](SetReaderPartitions(readerFactories.size)) - new ContinuousDataSourceRDD(sparkContext, sqlContext, readerFactories) -.asInstanceOf[RDD[InternalRow]] - -case r: SupportsScanColumnarBatch if r.enableBatchRead() => - new DataSourceRDD(sparkContext, batchReaderFactories).asInstanceOf[RDD[InternalRow]] - + if (readerFactories.exists(_.dataFormat() == DataFormat.COLUMNAR_BATCH)) { +throw new IllegalArgumentException( + "continuous stream reader does not support columnar read yet.") --- End diff -- We use a type erase hack, and lie to the Scala compiler that we are outputting InternalRow. At runtime, we cast the data to ColumnarBatch in codegen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21029#discussion_r180489842 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -143,7 +144,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) logDebug(generateDebugString(newBlocks.flatten, startOrdinal, endOrdinal)) newBlocks.map { block => -new MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory[UnsafeRow]] +new MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory] --- End diff -- I'd like that, but I don't know if that would make things harder for data source implementers working in Java. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21029#discussion_r180489396 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -95,21 +77,29 @@ case class DataSourceV2ScanExec( sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), sparkContext.env) .askSync[Unit](SetReaderPartitions(readerFactories.size)) - new ContinuousDataSourceRDD(sparkContext, sqlContext, readerFactories) -.asInstanceOf[RDD[InternalRow]] - -case r: SupportsScanColumnarBatch if r.enableBatchRead() => - new DataSourceRDD(sparkContext, batchReaderFactories).asInstanceOf[RDD[InternalRow]] - + if (readerFactories.exists(_.dataFormat() == DataFormat.COLUMNAR_BATCH)) { +throw new IllegalArgumentException( + "continuous stream reader does not support columnar read yet.") --- End diff -- I don't, because I'm not really sure how it works in the batch case. How does it work to do new DataSourceRDD(sparkContext, batchReaderFactories).asInstanceOf[RDD[InternalRow]] when the type parameter of batchReaderFactories doesn't match InternalRow? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21029#discussion_r180468202 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -143,7 +144,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) logDebug(generateDebugString(newBlocks.flatten, startOrdinal, endOrdinal)) newBlocks.map { block => -new MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory[UnsafeRow]] +new MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory] --- End diff -- cc @rdblue @jose-torres --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21029#discussion_r180468097 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -143,7 +144,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) logDebug(generateDebugString(newBlocks.flatten, startOrdinal, endOrdinal)) newBlocks.map { block => -new MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory[UnsafeRow]] +new MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory] --- End diff -- I have seen this pattern many time, the java `List` is a little trouble because it's invariance. Shall we change the interface to use array? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21029#discussion_r180467422 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -95,21 +77,29 @@ case class DataSourceV2ScanExec( sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), sparkContext.env) .askSync[Unit](SetReaderPartitions(readerFactories.size)) - new ContinuousDataSourceRDD(sparkContext, sqlContext, readerFactories) -.asInstanceOf[RDD[InternalRow]] - -case r: SupportsScanColumnarBatch if r.enableBatchRead() => - new DataSourceRDD(sparkContext, batchReaderFactories).asInstanceOf[RDD[InternalRow]] - + if (readerFactories.exists(_.dataFormat() == DataFormat.COLUMNAR_BATCH)) { +throw new IllegalArgumentException( + "continuous stream reader does not support columnar read yet.") --- End diff -- cc @jose-torres do you know what's missing for this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org