[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20726#discussion_r172403479 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java --- @@ -23,6 +23,10 @@ /** * A mix in interface for {@link DataSourceReader}. Data source readers can implement this * interface to report data partitioning and try to avoid shuffle at Spark side. + * + * Note that Spark will always infer a + * {@link org.apache.spark.sql.catalyst.plans.physical.SinglePartition} partitioning when the + * reader creates exactly 1 {@link DataReaderFactory}. --- End diff -- nit: no matter the reader implements this interface or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20726#discussion_r172328296 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -46,34 +48,46 @@ case class DataSourceV2ScanExec( new DataSourcePartitioning( s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) +case _ if readerFactories.size == 1 => SinglePartition + case _ => super.outputPartitioning } - private lazy val readerFactories: java.util.List[DataReaderFactory[UnsafeRow]] = reader match { -case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories() + private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader match { --- End diff -- That's a good point. Updated the PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20726#discussion_r172324207 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -46,34 +48,46 @@ case class DataSourceV2ScanExec( new DataSourcePartitioning( s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) +case _ if readerFactories.size == 1 => SinglePartition + case _ => super.outputPartitioning } - private lazy val readerFactories: java.util.List[DataReaderFactory[UnsafeRow]] = reader match { -case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories() + private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader match { --- End diff -- I think it is better to have a lazy val throw an exception if it is called at the wrong time (if it is private) than to add the cast because the exception at least validates that assumptions are met and can throw a reasonable error message. The cast might hide the problem, particularly over time as this code evolves. It would be reasonable to add another path that returns `Seq[DataReaderFactory[_]]` because that's the method contract, even though there is an assumption in the callers about how it will behave. As much of the contract between methods as possible should be expressed in types. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20726#discussion_r172322719 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -46,34 +48,46 @@ case class DataSourceV2ScanExec( new DataSourcePartitioning( s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) +case _ if readerFactories.size == 1 => SinglePartition + case _ => super.outputPartitioning } - private lazy val readerFactories: java.util.List[DataReaderFactory[UnsafeRow]] = reader match { -case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories() + private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader match { --- End diff -- We could, but then both usages of `readerFactories` (along with any future ones) would have to add a `case r: SupportsScanColumnarBatch if r.enableBatchRead()` check to figure out which val they're supposed to use. I'm not a huge fan of lazy vals that will throw errors if anyone tries to instantiate them. Both ways seem equally unclean to me, so I'm fine with switching if you think separating them is better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20726#discussion_r172320487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -46,34 +48,46 @@ case class DataSourceV2ScanExec( new DataSourcePartitioning( s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) +case _ if readerFactories.size == 1 => SinglePartition + case _ => super.outputPartitioning } - private lazy val readerFactories: java.util.List[DataReaderFactory[UnsafeRow]] = reader match { -case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories() + private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader match { --- End diff -- Why not separate the method into `batchReaderFactories` and `readerFactories`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20726#discussion_r172318033 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -46,34 +48,46 @@ case class DataSourceV2ScanExec( new DataSourcePartitioning( s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) +case _ if readerFactories.size == 1 => SinglePartition + case _ => super.outputPartitioning } - private lazy val readerFactories: java.util.List[DataReaderFactory[UnsafeRow]] = reader match { -case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories() + private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader match { --- End diff -- I agree that we should separate these cases, but I don't immediately see a good way to do so. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20726#discussion_r172317377 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -46,34 +48,46 @@ case class DataSourceV2ScanExec( new DataSourcePartitioning( s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) +case _ if readerFactories.size == 1 => SinglePartition + case _ => super.outputPartitioning } - private lazy val readerFactories: java.util.List[DataReaderFactory[UnsafeRow]] = reader match { -case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories() + private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader match { --- End diff -- Why not separate the cases for columnar batch and unsafe rows? That would avoid needing to cast this later to `Seq[DataReaderFactory[UnsafeRow]]` and `Seq[DataReaderFactory[ColumnarBatch]]`, which isn't a very clean solution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20726#discussion_r172316134 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -46,34 +48,46 @@ case class DataSourceV2ScanExec( new DataSourcePartitioning( s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) +case _ if readerFactories.size == 1 => SinglePartition --- End diff -- Makes sense to me; also added a comment to the API describing this behavior --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20726#discussion_r172307625 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -46,34 +48,46 @@ case class DataSourceV2ScanExec( new DataSourcePartitioning( s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) +case _ if readerFactories.size == 1 => SinglePartition --- End diff -- shall we move it before `case s: SupportsReportPartitioning`? the difference is, shall we always report `SinglePartition` if there is only one writer factory. By looking at `EnsureRequirements`, seems `SinglePartition` is more likely to satisfy the distribution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...
GitHub user jose-torres opened a pull request: https://github.com/apache/spark/pull/20726 [SPARK-23574][CORE] Report SinglePartition in DataSourceV2ScanExec when there's exactly 1 data reader factory. ## What changes were proposed in this pull request? Report SinglePartition in DataSourceV2ScanExec when there's exactly 1 data reader factory. ## How was this patch tested? existing unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/jose-torres/spark SPARK-23574 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20726.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20726 commit efb839759ddc1df1eec1b14500eebe5e4ca903c5 Author: Jose TorresDate: 2018-03-03T04:34:05Z SinglePartition check --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org